# End To End API Example

In [39]:
import boto3
import sagemaker

original_boto3_version = boto3.__version__

Check boto3 version

In [40]:
print(original_boto3_version)

1.19.3


In [41]:
from sagemaker.session import Session

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

Setup a default bucket 

In [42]:
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "sofi-sagemaker-featurestore-demo"

print(default_s3_bucket_name)

sagemaker-us-east-1-967669495843


Get an execution role

In [43]:
from sagemaker import get_execution_role
role = get_execution_role()
print(role)

arn:aws:iam::967669495843:role/service-role/AmazonSageMaker-ExecutionRole-20190812T143756


#### Inspect Data
In this example we have two tables: identity and transactions. They can both be joined by the TransactionId column. 

In [44]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import io

s3_client = boto3.client("s3", region_name=region)

fraud_detection_bucket_name = "sagemaker-sample-files"
identity_file_key = (
    "datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_identity.csv"
)
transaction_file_key = (
    "datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_transactions.csv"
)

identity_data_object = s3_client.get_object(
    Bucket=fraud_detection_bucket_name, Key=identity_file_key
)
transaction_data_object = s3_client.get_object(
    Bucket=fraud_detection_bucket_name, Key=transaction_file_key
)

identity_data = pd.read_csv(io.BytesIO(identity_data_object["Body"].read()))
transaction_data = pd.read_csv(io.BytesIO(transaction_data_object["Body"].read()))

identity_data = identity_data.round(5)
transaction_data = transaction_data.round(5)

identity_data = identity_data.fillna(0)
transaction_data = transaction_data.fillna(0)

# Feature transformations for this dataset are applied before ingestion into FeatureStore.
# One hot encode card4, card6
encoded_card_bank = pd.get_dummies(transaction_data["card4"], prefix="card_bank")
encoded_card_type = pd.get_dummies(transaction_data["card6"], prefix="card_type")

transformed_transaction_data = pd.concat(
    [transaction_data, encoded_card_type, encoded_card_bank], axis=1
)
# blank space is not allowed in feature name
transformed_transaction_data = transformed_transaction_data.rename(
    columns={"card_bank_american express": "card_bank_american_express"}
)

In [45]:
identity_data.head()

Unnamed: 0,TransactionID,id_01,id_02,id_03,id_04,id_05,id_06,id_07,id_08,id_09,...,id_11,id_12,id_13,id_14,id_15,id_16,id_17,id_18,id_19,id_20
0,2990130,-5,38780.0,0.0,0.0,0.0,-70,0,1,100.0,...,32,80,253,241,260,125,T,F,F,T
1,2990266,-10,69246.0,0.0,0.0,0.0,-67,0,2,100.0,...,47,47,122,33,38,60,T,F,T,F
2,2992553,-45,348819.0,0.0,0.0,0.0,-73,0,0,100.0,...,21,143,268,111,2,135,F,F,T,F
3,2994568,-15,337170.0,0.0,0.0,0.0,-10,1,2,100.0,...,55,127,253,202,135,49,F,F,T,T
4,2994749,-5,680670.0,0.0,0.0,8.0,-1,2,2,100.0,...,52,43,257,7,19,254,F,F,T,T


In [46]:
transaction_data.head()

Unnamed: 0,TransactionID,isFraud,TransactionDT,TransactionAmt,card1,card2,card3,card4,card5,card6,...,F17,N1,N2,N3,N4,N5,N6,N7,N8,N9
0,3343087,0,8810855,29.0,12469,360.0,150.0,mastercard,126.0,debit,...,519,F,F,T,T,T,T,T,F,T
1,3307318,0,7955295,107.95,16188,178.0,150.0,mastercard,224.0,debit,...,773,F,T,T,T,F,F,F,F,T
2,3555327,0,15084339,159.95,1825,555.0,150.0,visa,226.0,debit,...,771,F,T,F,F,T,T,T,T,F
3,3310736,0,8017157,159.95,10057,225.0,150.0,mastercard,224.0,debit,...,903,T,T,F,T,T,F,T,F,F
4,3034711,0,1127470,117.0,11444,555.0,150.0,visa,226.0,debit,...,579,T,T,T,F,T,F,T,F,F


In [47]:
transformed_transaction_data.head()

Unnamed: 0,TransactionID,isFraud,TransactionDT,TransactionAmt,card1,card2,card3,card4,card5,card6,...,N8,N9,card_type_0,card_type_credit,card_type_debit,card_bank_0,card_bank_american_express,card_bank_discover,card_bank_mastercard,card_bank_visa
0,3343087,0,8810855,29.0,12469,360.0,150.0,mastercard,126.0,debit,...,F,T,0,0,1,0,0,0,1,0
1,3307318,0,7955295,107.95,16188,178.0,150.0,mastercard,224.0,debit,...,F,T,0,0,1,0,0,0,1,0
2,3555327,0,15084339,159.95,1825,555.0,150.0,visa,226.0,debit,...,T,F,0,0,1,0,0,0,0,1
3,3310736,0,8017157,159.95,10057,225.0,150.0,mastercard,224.0,debit,...,F,F,0,0,1,0,0,0,1,0
4,3034711,0,1127470,117.0,11444,555.0,150.0,visa,226.0,debit,...,F,F,0,0,1,0,0,0,0,1


#### Create Feature Group and ingest data

In [48]:
from time import gmtime, strftime, sleep

identity_feature_group_name = "identity-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
transaction_feature_group_name = "transaction-feature-group-" + strftime("%d-%H-%M-%S", gmtime())

In [49]:
from sagemaker.feature_store.feature_group import FeatureGroup

identity_feature_group = FeatureGroup(
    name=identity_feature_group_name, sagemaker_session=feature_store_session
)
transaction_feature_group = FeatureGroup(
    name=transaction_feature_group_name, sagemaker_session=feature_store_session
)

In [50]:
import time

current_time_sec = int(round(time.time()))


def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == "object":
            data_frame[label] = data_frame[label].astype("str").astype("string")


# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(identity_data)
cast_object_to_string(transformed_transaction_data)

# record identifier and event time feature names
record_identifier_feature_name = "TransactionID"
event_time_feature_name = "EventTime"

# append EventTime feature
identity_data[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(identity_data), dtype="float64"
)
transformed_transaction_data[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(transaction_data), dtype="float64"
)

# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
identity_feature_group.load_feature_definitions(data_frame=identity_data)
# output is suppressed
transaction_feature_group.load_feature_definitions(data_frame=transformed_transaction_data)
# output is suppressed


[FeatureDefinition(feature_name='TransactionID', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='isFraud', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='TransactionDT', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='TransactionAmt', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='card1', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='card2', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='card3', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='card4', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='card5', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='card6', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinit

In [51]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


identity_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

transaction_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

wait_for_feature_group_creation_complete(feature_group=identity_feature_group)
wait_for_feature_group_creation_complete(feature_group=transaction_feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup identity-feature-group-15-18-16-15 successfully created.
Waiting for Feature Group Creation
FeatureGroup transaction-feature-group-15-18-16-15 successfully created.


In [52]:
identity_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:967669495843:feature-group/identity-feature-group-15-18-16-15',
 'FeatureGroupName': 'identity-feature-group-15-18-16-15',
 'RecordIdentifierFeatureName': 'TransactionID',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'TransactionID',
   'FeatureType': 'Integral'},
  {'FeatureName': 'id_01', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_02', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_03', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_04', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_05', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_06', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_07', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_08', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_09', 'FeatureType': 'Fractional'},
  {'FeatureName': 'id_10', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_11', 'FeatureType': 'Integral'},
  {'FeatureName': 'id_12', 'FeatureTyp

In [53]:
transaction_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:967669495843:feature-group/transaction-feature-group-15-18-16-15',
 'FeatureGroupName': 'transaction-feature-group-15-18-16-15',
 'RecordIdentifierFeatureName': 'TransactionID',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'TransactionID',
   'FeatureType': 'Integral'},
  {'FeatureName': 'isFraud', 'FeatureType': 'Integral'},
  {'FeatureName': 'TransactionDT', 'FeatureType': 'Integral'},
  {'FeatureName': 'TransactionAmt', 'FeatureType': 'Fractional'},
  {'FeatureName': 'card1', 'FeatureType': 'Integral'},
  {'FeatureName': 'card2', 'FeatureType': 'Fractional'},
  {'FeatureName': 'card3', 'FeatureType': 'Fractional'},
  {'FeatureName': 'card4', 'FeatureType': 'String'},
  {'FeatureName': 'card5', 'FeatureType': 'Fractional'},
  {'FeatureName': 'card6', 'FeatureType': 'String'},
  {'FeatureName': 'B1', 'FeatureType': 'Integral'},
  {'FeatureName': 'B2', 'FeatureType': 'Integral'},
  {'FeatureName': 'B3', '

#### List Feature Groups 

In [54]:
sagemaker_client.list_feature_groups()  # use boto client to list FeatureGroups

{'FeatureGroupSummaries': [{'FeatureGroupName': 'transaction-feature-group-15-18-16-15',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:967669495843:feature-group/transaction-feature-group-15-18-16-15',
   'CreationTime': datetime.datetime(2021, 12, 15, 18, 16, 48, 230000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'identity-feature-group-15-18-16-15',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:967669495843:feature-group/identity-feature-group-15-18-16-15',
   'CreationTime': datetime.datetime(2021, 12, 15, 18, 16, 46, 617000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'FG-titanic-fb284513',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:967669495843:feature-group/fg-titanic-fb284513',
   'CreationTime': datetime.datetime(2021, 11, 4, 1, 9, 25, 696000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'FG-titanic-eb2d6666',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-eas

#### Ingest Data into FG
After the FeatureGroups have been created, we can put data into the FeatureGroups by using the PutRecord API.

In [55]:
identity_feature_group.ingest(data_frame=identity_data, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='identity-feature-group-15-18-16-15', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7fd9f42d2b50>, max_workers=3, max_processes=1, _async_result=<multiprocess.pool.MapResult object at 0x7fd9e7c221d0>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

In [56]:
transaction_feature_group.ingest(data_frame=transformed_transaction_data, max_workers=5, wait=True)

IngestionManagerPandas(feature_group_name='transaction-feature-group-15-18-16-15', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7fd9f42d2b50>, max_workers=5, max_processes=1, _async_result=<multiprocess.pool.MapResult object at 0x7fd9e7c28210>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

#### Validate Data Ingestion
From the online store

In [57]:
record_identifier_value = str(2990130)

featurestore_runtime.get_record(
    FeatureGroupName=transaction_feature_group_name,
    RecordIdentifierValueAsString=record_identifier_value,
)

{'ResponseMetadata': {'RequestId': '977e1874-58d7-4f83-b03e-61906df19aef',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '977e1874-58d7-4f83-b03e-61906df19aef',
   'content-type': 'application/json',
   'content-length': '2636',
   'date': 'Wed, 15 Dec 2021 18:20:29 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'TransactionID', 'ValueAsString': '2990130'},
  {'FeatureName': 'isFraud', 'ValueAsString': '0'},
  {'FeatureName': 'TransactionDT', 'ValueAsString': '152647'},
  {'FeatureName': 'TransactionAmt', 'ValueAsString': '75.0'},
  {'FeatureName': 'card1', 'ValueAsString': '4577'},
  {'FeatureName': 'card2', 'ValueAsString': '583.0'},
  {'FeatureName': 'card3', 'ValueAsString': '150.0'},
  {'FeatureName': 'card4', 'ValueAsString': 'mastercard'},
  {'FeatureName': 'card5', 'ValueAsString': '219.0'},
  {'FeatureName': 'card6', 'ValueAsString': 'credit'},
  {'FeatureName': 'B1', 'ValueAsString': '69'},
  {'FeatureName': 'B2', 'ValueAsString': '80'},
  {'Featur

In [58]:
featurestore_runtime.batch_get_record(
    Identifiers=[
        {
            "FeatureGroupName": identity_feature_group_name,
            "RecordIdentifiersValueAsString": ["2990130"],
        },
        {
            "FeatureGroupName": transaction_feature_group_name,
            "RecordIdentifiersValueAsString": ["2990130"],
        },
    ]
)

{'ResponseMetadata': {'RequestId': '532daea9-cafa-4ae0-b0aa-7df3f0e0f33b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '532daea9-cafa-4ae0-b0aa-7df3f0e0f33b',
   'content-type': 'application/json',
   'content-length': '3929',
   'date': 'Wed, 15 Dec 2021 18:20:37 GMT'},
  'RetryAttempts': 0},
 'Records': [{'FeatureGroupName': 'transaction-feature-group-15-18-16-15',
   'RecordIdentifierValueAsString': '2990130',
   'Record': [{'FeatureName': 'TransactionID', 'ValueAsString': '2990130'},
    {'FeatureName': 'isFraud', 'ValueAsString': '0'},
    {'FeatureName': 'TransactionDT', 'ValueAsString': '152647'},
    {'FeatureName': 'TransactionAmt', 'ValueAsString': '75.0'},
    {'FeatureName': 'card1', 'ValueAsString': '4577'},
    {'FeatureName': 'card2', 'ValueAsString': '583.0'},
    {'FeatureName': 'card3', 'ValueAsString': '150.0'},
    {'FeatureName': 'card4', 'ValueAsString': 'mastercard'},
    {'FeatureName': 'card5', 'ValueAsString': '219.0'},
    {'FeatureName': '

#### Building The Dataset
Once the offline store is available, we can use it to build the dataset

In [59]:
account_id = boto3.client("sts").get_caller_identity()["Account"]
print(account_id)

identity_feature_group_resolved_output_s3_uri = (
    identity_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)
transaction_feature_group_resolved_output_s3_uri = (
    transaction_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

identity_feature_group_s3_prefix = identity_feature_group_resolved_output_s3_uri.replace(
    f"s3://{default_s3_bucket_name}/", ""
)
transaction_feature_group_s3_prefix = transaction_feature_group_resolved_output_s3_uri.replace(
    f"s3://{default_s3_bucket_name}/", ""
)

offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(
        Bucket=default_s3_bucket_name, Prefix=transaction_feature_group_s3_prefix
    )
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...\n")
        sleep(60)

print("Data available.")

967669495843
Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Data available.


In [60]:
# Build athena query

identity_query = identity_feature_group.athena_query()
transaction_query = transaction_feature_group.athena_query()

identity_table = identity_query.table_name
transaction_table = transaction_query.table_name

query_string = (
    'SELECT * FROM "'
    + transaction_table
    + '" LEFT JOIN "'
    + identity_table
    + '" ON "'
    + transaction_table
    + '".transactionid = "'
    + identity_table
    + '".transactionid'
)
print("Running " + query_string)

# run Athena query. The output is loaded to a Pandas dataframe.
# dataset = pd.DataFrame()
identity_query.run(
    query_string=query_string,
    output_location="s3://" + default_s3_bucket_name + "/" + prefix + "/query_results/",
)
identity_query.wait()
dataset = identity_query.as_dataframe()

dataset

Running SELECT * FROM "transaction-feature-group-15-18-16-15-1639592208" LEFT JOIN "identity-feature-group-15-18-16-15-1639592206" ON "transaction-feature-group-15-18-16-15-1639592208".transactionid = "identity-feature-group-15-18-16-15-1639592206".transactionid


Unnamed: 0,transactionid,isfraud,transactiondt,transactionamt,card1,card2,card3,card4,card5,card6,...,id_15,id_16,id_17,id_18,id_19,id_20,eventtime.1,write_time.1,api_invocation_time.1,is_deleted.1
0,3354281,0,9134272,58.950,17399,111.0,150.0,mastercard,224.0,debit,...,,,,,,,,,,
1,3174596,0,4198271,57.950,7411,383.0,150.0,visa,226.0,debit,...,,,,,,,,,,
2,3479324,0,12850843,117.000,12839,321.0,150.0,visa,226.0,debit,...,,,,,,,,,,
3,3437243,0,11480906,13.000,9112,250.0,150.0,visa,226.0,debit,...,,,,,,,,,,
4,3486003,0,13093208,40.950,1556,314.0,150.0,mastercard,224.0,debit,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1962,3296781,0,7695526,107.950,9500,321.0,150.0,visa,226.0,debit,...,,,,,,,,,,
1963,3049616,0,1435010,25.000,17270,310.0,150.0,mastercard,224.0,debit,...,56.0,149.0,T,F,T,T,1.639592e+09,2021-12-15 18:23:52.407,2021-12-15 18:18:54.000,False
1964,3116986,0,2580358,53.970,7919,194.0,150.0,mastercard,202.0,debit,...,,,,,,,,,,
1965,3498226,0,13380265,23.547,5812,408.0,185.0,mastercard,224.0,debit,...,294.0,275.0,F,F,T,F,1.639592e+09,2021-12-15 18:23:52.407,2021-12-15 18:18:54.000,False


In [61]:
# Prepare query results for training.
query_execution = identity_query.get_query_execution()
query_result = (
    "s3://"
    + default_s3_bucket_name
    + "/"
    + prefix
    + "/query_results/"
    + query_execution["QueryExecution"]["QueryExecutionId"]
    + ".csv"
)
print(query_result)

# Select useful columns for training with target column as the first.
dataset = dataset[<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
	<key>URL</key>
	<string>https://console.aws.amazon.com/sagemaker/home?region=us-east-1#/jobs</string>
</dict>
</plist>

    [
        "isfraud",
        "transactiondt",
        "transactionamt",
        "card1",
        "card2",
        "card3",
        "card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01",
        "id_02",
        "id_03",
        "id_04",
        "id_05",
    ]
]

# Write to csv in S3 without headers and index column.
dataset.to_csv("dataset.csv", header=False, index=False)
s3_client.upload_file("dataset.csv", default_s3_bucket_name, prefix + "/training_input/dataset.csv")
dataset_uri_prefix = "s3://" + default_s3_bucket_name + "/" + prefix + "/training_input/"

dataset

s3://sagemaker-us-east-1-967669495843/sofi-sagemaker-featurestore-demo/query_results/e9aed394-21ae-430f-82a3-8d45c38fab1a.csv


Unnamed: 0,isfraud,transactiondt,transactionamt,card1,card2,card3,card5,card_type_credit,card_type_debit,card_bank_american_express,card_bank_discover,card_bank_mastercard,card_bank_visa,id_01,id_02,id_03,id_04,id_05
0,0,9134272,58.950,17399,111.0,150.0,224.0,0,1,0,0,1,0,,,,,
1,0,4198271,57.950,7411,383.0,150.0,226.0,0,1,0,0,0,1,,,,,
2,0,12850843,117.000,12839,321.0,150.0,226.0,0,1,0,0,0,1,,,,,
3,0,11480906,13.000,9112,250.0,150.0,226.0,0,1,0,0,0,1,,,,,
4,0,13093208,40.950,1556,314.0,150.0,224.0,0,1,0,0,1,0,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1962,0,7695526,107.950,9500,321.0,150.0,226.0,0,1,0,0,0,1,,,,,
1963,0,1435010,25.000,17270,310.0,150.0,224.0,0,1,0,0,1,0,0.0,49038.0,0.0,0.0,0.0
1964,0,2580358,53.970,7919,194.0,150.0,202.0,0,1,0,0,1,0,,,,,
1965,0,13380265,23.547,5812,408.0,185.0,224.0,0,1,0,0,1,0,-5.0,493983.0,0.0,0.0,13.0


#### Train a model

In [62]:
# Pick a training image
training_image = sagemaker.image_uris.retrieve("xgboost", region, "1.0-1")

In [63]:
# Create a estimator
training_output_path = "s3://" + default_s3_bucket_name + "/" + prefix + "/training_output"

from sagemaker.estimator import Estimator

training_model = Estimator(
    training_image,
    role,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
    volume_size=5,
    max_run=3600,
    input_mode="File",
    output_path=training_output_path,
    sagemaker_session=feature_store_session,
)

In [64]:
# Set Hyperparameters
training_model.set_hyperparameters(objective="binary:logistic", num_round=50)

In [65]:
# Specifiy training inputs
train_data = sagemaker.inputs.TrainingInput(
    dataset_uri_prefix,
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)
data_channels = {"train": train_data}


In [66]:
training_model.fit(inputs=data_channels, logs=True)

2021-12-15 18:24:21 Starting - Starting the training job...ProfilerReport-1639592661: InProgress
...
2021-12-15 18:25:09 Starting - Launching requested ML instances......
2021-12-15 18:26:20 Starting - Preparing the instances for training.........
2021-12-15 18:27:50 Downloading - Downloading input data...
2021-12-15 18:28:05 Training - Downloading the training image...
2021-12-15 18:28:52 Uploading - Uploading generated training model.[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34m[18:28:48] 1967x17 matri

#### Deploy - Create Endpoint

In [32]:
predictor = training_model.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge")

-----!

#### Inference Using Feature Store

In [33]:
# Incoming inference request.
transaction_id = str(3450774)

# Helper to parse the feature value from the record.
def get_feature_value(record, feature_name):
    return str(list(filter(lambda r: r["FeatureName"] == feature_name, record))[0]["ValueAsString"])


transaction_response = featurestore_runtime.get_record(
    FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=transaction_id
)
transaction_record = transaction_response["Record"]

transaction_test_data = [
    get_feature_value(transaction_record, "TransactionDT"),
    get_feature_value(transaction_record, "TransactionAmt"),
    get_feature_value(transaction_record, "card1"),
    get_feature_value(transaction_record, "card2"),
    get_feature_value(transaction_record, "card3"),
    get_feature_value(transaction_record, "card5"),
    get_feature_value(transaction_record, "card_type_credit"),
    get_feature_value(transaction_record, "card_type_debit"),
    get_feature_value(transaction_record, "card_bank_american_express"),
    get_feature_value(transaction_record, "card_bank_discover"),
    get_feature_value(transaction_record, "card_bank_mastercard"),
    get_feature_value(transaction_record, "card_bank_visa"),
]

identity_response = featurestore_runtime.get_record(
    FeatureGroupName=identity_feature_group_name, RecordIdentifierValueAsString=transaction_id
)
identity_record = identity_response["Record"]
id_test_data = [
    get_feature_value(identity_record, "id_01"),
    get_feature_value(identity_record, "id_02"),
    get_feature_value(identity_record, "id_03"),
    get_feature_value(identity_record, "id_04"),
    get_feature_value(identity_record, "id_05"),
]

# Join all pieces for inference request.
inference_request = []
inference_request.extend(transaction_test_data[:])
inference_request.extend(id_test_data[:])

inference_request

['11923451',
 '50.0',
 '12501',
 '490.0',
 '150.0',
 '226.0',
 '0',
 '1',
 '0',
 '0',
 '0',
 '1',
 '-40',
 '20130.0',
 '0.0',
 '0.0',
 '16.0']

In [34]:
import json

results = predictor.predict(",".join(inference_request), initial_args={"ContentType": "text/csv"})
prediction = json.loads(results)
print(prediction)

0.8026058077812195


#### Clean up

In [35]:
# delete endpoint
predictor.delete_endpoint()

In [36]:
# delete feature groups
identity_feature_group.delete()
transaction_feature_group.delete()