In [211]:
import pandas as pd
import random, string
from time import gmtime, strftime, sleep
import datetime
import boto3
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker import get_execution_role
import time

In [212]:
def str_time_prop(start, end, format):
    stime = time.mktime(time.strptime(start, format))
    etime = time.mktime(time.strptime(end, format))
    ptime = stime + random.random() * (etime - stime)
    return time.strftime(format, time.localtime(ptime))

def random_date(start, end):
    return str_time_prop(start, end, "%Y-%m-%dT%H:%M:%SZ")

In [213]:
df = pd.DataFrame()
random.seed(42)

for i in range(3):
    df_tmp = pd.read_csv('s3://sagemaker-sample-files/datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_transactions.csv')
    
    if i == 0:
        start = "2021-01-01T20:00:00Z"
        end = "2021-01-02T10:00:00Z"
        s = list(range(0, 2000))
    elif i == 1:
        start = "2021-01-04T20:00:00Z"
        end = "2021-01-05T10:00:00Z"
        s = list(range(2000, 4000))
    elif i == 2:
        start = "2021-01-07T20:00:00Z"
        end = "2021-01-08T10:00:00Z"
        s = list(range(4000, 6000))
    
    df_tmp['EventTime'] = df_tmp.apply(lambda x: random_date(start, end), axis=1)
    df_tmp = df_tmp.set_index([s])
    df = df.append(df_tmp)

In [214]:
df[df['TransactionID'] == 3343087]

Unnamed: 0,TransactionID,isFraud,TransactionDT,TransactionAmt,card1,card2,card3,card4,card5,card6,...,N1,N2,N3,N4,N5,N6,N7,N8,N9,EventTime
0,3343087,0,8810855,29.0,12469,360.0,150.0,mastercard,126.0,debit,...,F,F,T,T,T,T,T,F,T,2021-01-02T04:57:07Z
2000,3343087,0,8810855,29.0,12469,360.0,150.0,mastercard,126.0,debit,...,F,F,T,T,T,T,T,F,T,2021-01-04T22:00:20Z
4000,3343087,0,8810855,29.0,12469,360.0,150.0,mastercard,126.0,debit,...,F,F,T,T,T,T,T,F,T,2021-01-07T21:38:42Z


In [215]:
idx = df.groupby(['TransactionID'])['EventTime'].transform(max) == df['EventTime']
df_online = df[idx].copy()
df_offline = df[~idx].copy()

In [216]:
def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")

In [217]:
cast_object_to_string(df_offline)

In [218]:
cast_object_to_string(df_online)

In [219]:
role = get_execution_role()
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)
featuregroup_name = 'transactions-fg-manual-ingest-online-sync'
account_id = boto3.client('sts').get_caller_identity()["Account"]

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

In [220]:
feature_group = FeatureGroup(name=featuregroup_name, sagemaker_session=feature_store_session)
feature_group.load_feature_definitions(data_frame=df_offline)

[FeatureDefinition(feature_name='TransactionID', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='isFraud', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='TransactionDT', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='TransactionAmt', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='card1', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='card2', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='card3', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='card4', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='card5', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='card6', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinit

In [221]:
record_identifier_feature_name = "TransactionID"
event_time_feature_name = "EventTime"

bucket = feature_store_session.default_bucket()
s3_folder = 'feature-store-manual-ingestion-online-sync'

def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

feature_group.create(
    s3_uri=f"s3://{bucket}/{s3_folder}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

wait_for_feature_group_creation_complete(feature_group=feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup transactions-fg-manual-ingest-online-sync successfully created.


## Backfilling historical records into S3

In [222]:
query = feature_group.athena_query()
fg_table = query.table_name

In [223]:
df_offline['key'] = df_offline['EventTime'].apply(lambda x: datetime.datetime.strptime(x, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y-%m-%d-%H"))

In [224]:
fg_timestamp = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
df_offline['write_time'] = df_offline['api_invocation_time'] = pd.to_datetime(fg_timestamp)
df_offline['is_deleted'] = False

In [225]:
def create_s3_paths(df):
    df_copy = df.copy()
    df_copy['EventTime'] = df_copy['EventTime'].astype(object)
    s3_paths = {}
    
    # loop over the unique keys
    for v in df['key'].unique():
        year, month, day, hour = v.split('-')
        # create path
        path = f"s3://{bucket}/{s3_folder}/{account_id}/sagemaker/{region}/offline-store/{fg_table}/data/year={year}/month={month}/day={day}/hour={hour}/"
        # identify the last entry for each group and retrieve minute and second
        _, minute, second = df_copy[df_copy['key'] == v].groupby('key')['EventTime'].max().iloc[0].split(':')
        # create filename
        filename = f"{year}{month}{day}T{hour}{minute}{second}_"
        filename += ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(16))
        filename += '.parquet'
        # append path + filename to dictionary
        s3_paths[v] = path + filename
        
    return s3_paths

In [226]:
# create a key for each row in the data that contains year, month, day, hour
df_offline['key'] = df_offline['EventTime'].apply(lambda x: datetime.datetime.strptime(x, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y-%m-%d-%H"))
# create the correspong S3 paths and filenames
file_path_names = create_s3_paths(df_offline)

In [227]:
for key, data in df_offline.groupby('key'):
    data = data.drop('key', axis=1)
    data.to_parquet(file_path_names[key])

In [228]:
query_string = f'SELECT * FROM "{fg_table}"'

query.run(query_string=query_string, output_location=f's3://{bucket}/{s3_folder}/query_results/')
query.wait()
dataset = query.as_dataframe()

dataset.head()

Unnamed: 0,transactionid,isfraud,transactiondt,transactionamt,card1,card2,card3,card4,card5,card6,...,n4,n5,n6,n7,n8,n9,eventtime,write_time,api_invocation_time,is_deleted
0,3166559,0,3949136,35.95,5017,432.0,150.0,visa,226.0,debit,...,T,F,T,F,T,F,2021-01-05T01:58:50Z,2021-04-28 14:04:26.000,2021-04-28 14:04:26.000,False
1,3518889,0,13998628,49.0,3821,111.0,150.0,mastercard,219.0,credit,...,F,F,F,F,F,T,2021-01-05T01:49:42Z,2021-04-28 14:04:26.000,2021-04-28 14:04:26.000,False
2,3340453,0,8720681,49.0,17188,321.0,150.0,visa,226.0,debit,...,F,F,T,F,T,T,2021-01-05T01:16:13Z,2021-04-28 14:04:26.000,2021-04-28 14:04:26.000,False
3,3160102,0,3764141,335.0,3484,372.0,150.0,mastercard,117.0,debit,...,T,F,F,F,F,T,2021-01-05T01:19:45Z,2021-04-28 14:04:26.000,2021-04-28 14:04:26.000,False
4,3433814,0,11381859,171.0,12544,321.0,150.0,visa,226.0,debit,...,F,T,F,F,F,T,2021-01-05T01:01:46Z,2021-04-28 14:04:26.000,2021-04-28 14:04:26.000,False


## Writing most recent records into Online Store

In [229]:
feature_group.ingest(
    data_frame=df_online, max_workers=10, wait=True
)

IngestionManagerPandas(feature_group_name='transactions-fg-manual-ingest-online-sync', sagemaker_session=<sagemaker.session.Session object at 0x7fa5266c99e8>, data_frame=      TransactionID  isFraud  TransactionDT  TransactionAmt  card1  card2  \
4000        3343087        0        8810855          29.000  12469  360.0   
4001        3307318        0        7955295         107.950  16188  178.0   
4002        3555327        0       15084339         159.950   1825  555.0   
4003        3310736        0        8017157         159.950  10057  225.0   
4004        3034711        0        1127470         117.000  11444  555.0   
...             ...      ...            ...             ...    ...    ...   
5995        3252738        1        6443158         200.000   6019  583.0   
5996        3548960        1       14873644           6.517  10175  176.0   
5997        3319928        1        8196787          67.067  14276  177.0   
5998        3256349        1        6539367          59.000 

In [230]:
record_identifier_value = str(2990130)
featurestore_runtime.get_record(FeatureGroupName=featuregroup_name, RecordIdentifierValueAsString=record_identifier_value)

{'ResponseMetadata': {'RequestId': '28408a17-7e3c-43ec-8ab2-f68eb3b9c81a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '28408a17-7e3c-43ec-8ab2-f68eb3b9c81a',
   'content-type': 'application/json',
   'content-length': '2201',
   'date': 'Wed, 28 Apr 2021 14:04:41 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'TransactionID', 'ValueAsString': '2990130'},
  {'FeatureName': 'isFraud', 'ValueAsString': '0'},
  {'FeatureName': 'TransactionDT', 'ValueAsString': '152647'},
  {'FeatureName': 'TransactionAmt', 'ValueAsString': '75.0'},
  {'FeatureName': 'card1', 'ValueAsString': '4577'},
  {'FeatureName': 'card2', 'ValueAsString': '583.0'},
  {'FeatureName': 'card3', 'ValueAsString': '150.0'},
  {'FeatureName': 'card4', 'ValueAsString': 'mastercard'},
  {'FeatureName': 'card5', 'ValueAsString': '219.0'},
  {'FeatureName': 'card6', 'ValueAsString': 'credit'},
  {'FeatureName': 'B1', 'ValueAsString': '69'},
  {'FeatureName': 'B2', 'ValueAsString': '80'},
  {'Featur

Running the query below immediately will return 2 records from the offline store, because it takes the Ingest API call ~5 minutes to update the Offline store. After ~5 minutes the query should return 3 queries.

In [240]:
query_string = f'SELECT * FROM "{fg_table}" WHERE transactionid = 2990130'

query.run(query_string=query_string, output_location=f's3://{bucket}/{s3_folder}/query_results/')
query.wait()
dataset = query.as_dataframe()

dataset.head()

Unnamed: 0,transactionid,isfraud,transactiondt,transactionamt,card1,card2,card3,card4,card5,card6,...,n4,n5,n6,n7,n8,n9,eventtime,write_time,api_invocation_time,is_deleted
0,2990130,0,152647,75.0,4577,583.0,150.0,mastercard,219.0,credit,...,T,T,T,T,F,F,2021-01-02T02:57:15Z,2021-04-28 14:04:26.000,2021-04-28 14:04:26.000,False
1,2990130,0,152647,75.0,4577,583.0,150.0,mastercard,219.0,credit,...,T,T,T,T,F,F,2021-01-05T05:48:28Z,2021-04-28 14:04:26.000,2021-04-28 14:04:26.000,False
2,2990130,0,152647,75.0,4577,583.0,150.0,mastercard,219.0,credit,...,T,T,T,T,F,F,2021-01-08T01:22:38Z,2021-04-28 14:09:44.886,2021-04-28 14:04:38.000,False


In [232]:
feature_group.delete()

In [233]:
s3 = boto3.resource('s3')
s3bucket = s3.Bucket(bucket)
s3bucket.objects.filter(Prefix=s3_folder).delete()