# Amazon SageMaker FeatureStore - IS PoC

Kernel `Python 3 (Data Science)` works well with this notebook.

In this notebook, we will load device embedding data into AWS Feature Store.
A preliminary <A HREF="https://github.com/SupersonicAds/sonic-ftrl-api/blob/27d8815cf22bcc28c1b4de1bde5ef87bcae459a1/sequencing/sequence_modeling.ipynb">ETL</A> will produce sequences of devices.
These sequences will later be converted to vector embedding/representations by https://github.com/SupersonicAds/sonic-ftrl-api/blob/27d8815cf22bcc28c1b4de1bde5ef87bcae459a1/sequencing/representation_creator.ipynb. The result is a dictionary, having a vector of values (usually 70/300/512 in length) for each entity (device model). The results will be stored in pickle format where first vector (data[0]) is a list of embeddings and the 2nd vector (data[1]) is a list of corresponding device models.

This notebook will load said data into the FeatureStore in a "date aware fashion", meaning, we will time stamp records during the ingestion process (All records ingested during the same run will have the same timestamp).

## Initial setup

### Fix <A HREF="https://github.com/hdmf-dev/hdmf/issues/617">panda/numpy incompatibility issues</A>

In [None]:
import sys
#!{sys.executable} -m pip install --upgrade pip
#!{sys.executable} -m pip install wheel
#!{sys.executable} -m pip install sagemaker pandas numpy numba s3fs --upgrade
!{sys.executable} -m pip install sagemaker pandas numpy numba --upgrade

### Set up boto client and the SageMaker Python SDK.

In [None]:
import boto3
import json
import sagemaker
from sagemaker.session import Session

region = boto3.Session().region_name
boto_client_s3 = boto3.client('s3', region_name=region)

boto_session = boto3.Session(region_name=region)

boto_client_sagemaker = boto_session.client(service_name='sagemaker', region_name=region)
boto_client_featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=boto_client_sagemaker,
    sagemaker_featurestore_runtime_client=boto_client_featurestore_runtime
)

## Load and prepare the data

### Load representation data first

In [None]:
import numpy as np 
import pandas as pd
import pickle

my_bucket = 'sagemaker-studio-ilya-test-20211221'
my_file = 'input_data/xvocab.pkl'

#boto_client_s3 = boto3.client('s3')
response = boto_client_s3.get_object(Bucket=my_bucket, Key=my_file)
body = response['Body']
data = pickle.loads(body.read())

In [None]:
# Some debugging info

d = {
    'DeviceID': data[1],
    'embeddings': data[0].tolist()
}
my_sample_data = pd.DataFrame(data=d)
print ("head():\n", my_sample_data.head())
print ("dtypes:\n",my_sample_data.dtypes)
print ("columns():\n")
my_sample_data.columns

In [None]:
my_sample_data.info()

### Load the data schema

In [None]:
import time

my_bucket = 'sagemaker-studio-ilya-test-20211221'
my_file = 'input_data/xvocab.schema.json'

#boto_client_s3 = boto3.client('s3')
response = boto_client_s3.get_object(Bucket=my_bucket, Key=my_file)
body = response['Body']
schema = json.loads(body.read())
print ("schema:", schema)

### Create dataframe initialization record.
First, we will create a dictionary which will be used to initialize Panda DataFrame object.
Then we will create a DataFrame object and cast its each 'object' dtype column to string do it's ready for SageMaker FeatureStore SDK.

In [None]:
def create_df_initialization_record(schema,data,current_time_sec=None):
    answer = {}
    if current_time_sec == None:
        current_time_sec = int(round(time.time()))
    for col in schema['features']:
        if col["transformation"] == "tolist":
            index = col["index"] # this column's index within `data` array
            name = col['name']
            value = data[index].tolist()
            answer[name] = value
        elif col["transformation"] == "time_now":
            print ("skipping timenow for now")
            name = col['name']
            value = pd.Series([current_time_sec]*len(data[0]), dtype="float64")
            answer[name] = value
        else:
            index = col["index"]
            name = col['name']
            value = data[index]
            answer[name] = value
    return answer

def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")

# Create a dictionary similar to this but using dynamically defined (via schema) columns
# d2 = {
#     'DeviceID': data[1],
#     'embeddings': data[0].tolist()
# }
current_time_sec = int(round(time.time()))
d2 = create_df_initialization_record(schema,data,current_time_sec)

# Create a Panda DataFrame and cast it to satisfy SageMaker SDK requirements
my_sample_data2 = pd.DataFrame(data=d2)

In [None]:
print ("head():\n", my_sample_data2.head())
print ("dtypes:\n",my_sample_data2.dtypes)
print ("columns():\n")
my_sample_data2.columns

In [None]:
my_sample_data2.info()

In [None]:
cast_object_to_string(my_sample_data2)
my_sample_data2.head()

## Setup SageMaker FeatureStore

### S3 Bucket Setup For The OfflineStore

SageMaker FeatureStore writes the data in the OfflineStore of a FeatureGroup to a S3 bucket owned by you. To be able to write to your S3 bucket, SageMaker FeatureStore assumes an IAM role which has access to it. The role is also owned by you.
Note that the same bucket can be re-used across FeatureGroups. Data in the bucket is partitioned by FeatureGroup.

Set the default s3 bucket name and it will be referenced throughout the notebook.

In [None]:
# You can modify the following to use a bucket of your choosing
default_s3_bucket_name = feature_store_session.default_bucket() # default S3 bucket defined during SageMaker domain creation.
default_s3_bucket_name = "sagemaker-studio-ilya-test-20211221" # we do not use a default S3 bucket defined during SageMaker domain creation.
prefix = 'sagemaker-basic-featurestore-vecors-demo'

print(default_s3_bucket_name)

### Set up the IAM role. 
This role gives SageMaker FeatureStore access to your S3 bucket. 

<div class="alert alert-block alert-warning">
<b>Note:</b> In this example we use the default SageMaker role, assuming it has both <b>AmazonSageMakerFullAccess</b> and <b>AmazonSageMakerFeatureStoreAccess</b> managed policies. If not, please make sure to attach them to the role before proceeding.
</div>

In [None]:
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
  # sagemaker_session = sagemaker.Session()
  # role = sagemaker.get_execution_role()
role = get_execution_role()
if role != 'arn:aws:iam::032106861074:role/service-role/AmazonSageMaker-ExecutionRole-20181031T162966':
    print(f"Warning: you are running using '{role}' role.Trying to switch to AmazonSageMaker-ExecutionRole-20181031T162966")
    role = 'arn:aws:iam::032106861074:role/service-role/AmazonSageMaker-ExecutionRole-20181031T162966'
role
print (role)

### Define the FeatureGroup and create it if necessary

#### Define FeatureGroups
The FeatureGroup name will include the timestamp; all other information should be pulled from the schema file.

In [None]:
from time import gmtime, strftime, sleep
from sagemaker.feature_store.feature_group import FeatureGroup

my_sample_feature_group_name = 'deviceid-feature-group-' + strftime('%d-%H-%M-%S', gmtime()) # not sure yet how to deal with it best
my_sample_feature_group_name = 'deviceid-feature-group' # we are going to store features from different runs in a single group and timestamp the features instead
my_sample_feature_group = FeatureGroup(name=my_sample_feature_group_name, sagemaker_session=feature_store_session)

# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
my_sample_feature_group.load_feature_definitions(data_frame=my_sample_data2); # output is suppressed

# record identifier and event time feature names
record_identifier_feature_name = schema['column_record_id'] # "DeviceID"
event_time_feature_name = schema['column_event_time'] # "EventTime"
Tags = schema['tags']


#### Create FeatureGroups in SageMaker FeatureStore

In [None]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

def featuregroup_already_exists(feature_group, boto_client_sagemaker):
    response = boto_client_sagemaker.list_feature_groups()
    #print (response)
    list = [ item['FeatureGroupName'] for item in response["FeatureGroupSummaries"]]
    print(f"Looking for feature group '{my_sample_feature_group.name}' in list {list}")
    if my_sample_feature_group.name in list:
        return True
    return False

if featuregroup_already_exists(my_sample_feature_group,boto_client_sagemaker):
    print ("Skipping creation of Feature Group '{my_sample_feature_group.name}' as it already exists.")
else:
    my_sample_feature_group.create(
        s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
        record_identifier_name=record_identifier_feature_name,
        event_time_feature_name=event_time_feature_name,
        tags=Tags,
        role_arn=role,
        enable_online_store=True
    )
    wait_for_feature_group_creation_complete(feature_group=my_sample_feature_group)

Confirm the FeatureGroup has been created by using the DescribeFeatureGroup and ListFeatureGroups APIs.

In [None]:
feature_group_describe_response = my_sample_feature_group.describe()
feature_group_describe_response

## Data ingestion and manipulation

#### PutRecords into FeatureGroup

After the FeatureGroups have been created, we can put data into the FeatureGroups by using the PutRecord API. This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to S3 in chunks. The files will be written to the offline store within a few minutes of ingestion.

In [None]:
my_sample_feature_group.ingest(
    data_frame=my_sample_data2, max_workers=8, wait=True
)

To confirm that data has been ingested, we can quickly retrieve a record from the online store:

In [None]:
# record_identifier_value = str(1)
record_identifier_value = "iphone13_4"

record = boto_client_featurestore_runtime.get_record(FeatureGroupName=my_sample_feature_group_name, RecordIdentifierValueAsString=record_identifier_value)
record

#### Create Pandas DataFrame from FeatureStore response - single record

In [None]:
#record_identifier_value = str(3)
record_identifier_value = "iphone12_1"

record = boto_client_featurestore_runtime.get_record(FeatureGroupName=my_sample_feature_group_name, RecordIdentifierValueAsString=record_identifier_value)["Record"]
record

In [None]:
def map_feature_name_value(record):
    result_dict = {}
    for feature in record:
        result_dict[feature["FeatureName"]] = [feature["ValueAsString"]]
    return result_dict

In [None]:
record_as_dict = map_feature_name_value(record)
record_as_dict

In [None]:
df = pd.DataFrame(data=record_as_dict)
df.head()

## Batch fetch multiple Product records from the Online Feature Store

Fetch a list of selected items from the feature group.
##### Up to 100 records can be fetched from an online Feature Store in a single batch operation.

In [None]:
identifiers = [
    {
        'FeatureGroupName': my_sample_feature_group_name,
        'RecordIdentifiersValueAsString': ["iphone11_8", "iphone13_4","m1031g2", "hisense f15"]
    }
]
        
batch_get_record_response = boto_client_featurestore_runtime.batch_get_record(Identifiers=identifiers)
records = batch_get_record_response['Records']
records

#### Create Pandas DataFrame from FeatureStore response - multiple records

In [None]:
feature_definitions = my_sample_feature_group.describe()["FeatureDefinitions"]
feature_definitions

In [None]:
def map_feature_name_value(records):
    result_dict = {}
    for feature in feature_definitions:
        result_dict[feature["FeatureName"]] = []

    for record in records:
        for feature in record["Record"]:
            result_dict[feature["FeatureName"]].append(feature["ValueAsString"])
    return result_dict

In [None]:
records_as_dict = map_feature_name_value(records)

In [None]:
batch = pd.DataFrame(data=records_as_dict)
batch

In [None]:
my_sample_data2

## Load outcomes and run a join


In [None]:
my_bucket = 'sagemaker-studio-ilya-test-20211221'
my_file = 'input_data/csv_outcomes_is_ilya2.csv'

outcomes = pd.read_csv(f"s3://{my_bucket}/{my_file}")
#outcomes = pd.read_csv('s3://sagemaker-studio-ilya-test-20211221/input_data/csv_outcomes_is_ilya2.csv')
outcomes.head()

In [None]:
selected_columns = outcomes[["supply_app_bundle_id","device_id", "device_model"]]
selected_columns


In [None]:
my_sample_data2.head()
a = my_sample_data2.rename(columns={'DeviceID': 'device_model'})
a.loc[a['device_model'].isin(['sm-g973f','dammar','kyv48','b50pro','ptb10r'])]

In [None]:
result= pd.merge(selected_columns,a,on="device_model")
result

## Cleanup Resources

In [None]:
my_sample_feature_group.delete()