In [33]:
import warnings
warnings.filterwarnings('ignore')

In [34]:
reviews = context.catalog.load("reviews")

2021-03-26 16:22:29,432 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2021-03-26 16:22:29,971 - kedro.io.data_catalog - INFO - Loading data from `reviews` (CSVDataSet)...


In [35]:
reviews.head(5)

Unnamed: 0,shuttle_id,review_scores_rating,review_scores_comfort,review_scores_amenities,review_scores_trip,review_scores_crew,review_scores_location,review_scores_price,number_of_reviews,reviews_per_month
0,63561,97.0,10.0,9.0,10.0,10.0,9.0,10.0,133,1.65
1,36260,90.0,8.0,9.0,10.0,9.0,9.0,9.0,3,0.09
2,57015,95.0,9.0,10.0,9.0,10.0,9.0,9.0,14,0.14
3,14035,93.0,10.0,9.0,9.0,9.0,10.0,9.0,39,0.42
4,10036,98.0,10.0,10.0,10.0,10.0,9.0,9.0,92,0.94


## 1. Setup Feature Store Session 

In [5]:
import boto3
import sagemaker

original_boto3_version = boto3.__version__
from sagemaker.session import Session
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

2021-03-26 12:13:17,131 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials


In [6]:
# SageMaker feature store SDK comes with a default bucket built-in
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'kedro-sagemaker-featurestore-demo'

print(default_s3_bucket_name)

sagemaker-eu-west-1-674444584527


In [7]:
# You need an IAM role that has both AmazonSageMakerFullAccess and AmazonSageMakerFeatureStoreAccess managed policies.
# You can of course configure custom policies to tighten access.
iam = boto3.client('iam')
role = iam.get_role(RoleName='AmazonSageMaker-ExecutionRole')['Role']['Arn']

2021-03-26 12:13:19,987 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials


## 2. Ingesting data

In [8]:
# read DataSet from Kedro
reviews = catalog.load("reviews")
shuttles = catalog.load("shuttles")
companies = catalog.load("companies")

2021-03-26 12:13:24,075 - kedro.io.data_catalog - INFO - Loading data from `reviews` (CSVDataSet)...
2021-03-26 12:13:24,142 - kedro.io.data_catalog - INFO - Loading data from `shuttles` (ExcelDataSet)...
2021-03-26 12:13:31,322 - kedro.io.data_catalog - INFO - Loading data from `companies` (CSVDataSet)...


In [9]:
reviews.head(5)

Unnamed: 0,shuttle_id,review_scores_rating,review_scores_comfort,review_scores_amenities,review_scores_trip,review_scores_crew,review_scores_location,review_scores_price,number_of_reviews,reviews_per_month
0,63561,97.0,10.0,9.0,10.0,10.0,9.0,10.0,133,1.65
1,36260,90.0,8.0,9.0,10.0,9.0,9.0,9.0,3,0.09
2,57015,95.0,9.0,10.0,9.0,10.0,9.0,9.0,14,0.14
3,14035,93.0,10.0,9.0,9.0,9.0,10.0,9.0,39,0.42
4,10036,98.0,10.0,10.0,10.0,10.0,9.0,9.0,92,0.94


### Creating Feature Group

A bit about terminology: In the `reviews` dataset, each column is a feature. In SageMaker's term, `reviews` is a `FeatureGroup`. A `FeatureGroup` is the main Feature Store resource that contains the metadata for all the data stored in Amazon SageMaker Feature Store. A feature group is a logical grouping of features, defined in the feature store, to describe records. A feature group’s definition is composed of a list of feature definitions, a record identifier name, and configurations for its online and offline store. 

In [14]:
reviews_feature_group_name = 'reviews-feature-group'

from sagemaker.feature_store.feature_group import FeatureGroup

reviews_feature_group = FeatureGroup(name=reviews_feature_group_name, sagemaker_session=feature_store_session)

### Ingesting Data

In [15]:
import pandas as pd
import time

current_time_sec = int(round(time.time()))

# record identifier and event time feature names
record_identifier_feature_name = "TransactionID"
event_time_feature_name = "EventTime"

# append EventTime feature
reviews[event_time_feature_name] = pd.Series([current_time_sec]*len(reviews), dtype="float64")
reviews[record_identifier_feature_name] = reviews.index + 1

# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
reviews_feature_group.load_feature_definitions(data_frame=reviews) # output is suppressed

In [12]:
reviews.head(5)

Unnamed: 0,shuttle_id,review_scores_rating,review_scores_comfort,review_scores_amenities,review_scores_trip,review_scores_crew,review_scores_location,review_scores_price,number_of_reviews,reviews_per_month,EventTime,TransactionID
0,63561,97.0,10.0,9.0,10.0,10.0,9.0,10.0,133,1.65,1616761000.0,1
1,36260,90.0,8.0,9.0,10.0,9.0,9.0,9.0,3,0.09,1616761000.0,2
2,57015,95.0,9.0,10.0,9.0,10.0,9.0,9.0,14,0.14,1616761000.0,3
3,14035,93.0,10.0,9.0,9.0,9.0,10.0,9.0,39,0.42,1616761000.0,4
4,10036,98.0,10.0,10.0,10.0,10.0,9.0,9.0,92,0.94,1616761000.0,5


In [16]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

# reviews_feature_group.delete()
reviews_feature_group.create(
    description="Spacesflights shuttles reviews",
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

wait_for_feature_group_creation_complete(feature_group=reviews_feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup reviews-feature-group successfully created.


In [17]:
reviews_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:674444584527:feature-group/reviews-feature-group',
 'FeatureGroupName': 'reviews-feature-group',
 'RecordIdentifierFeatureName': 'TransactionID',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'shuttle_id',
   'FeatureType': 'Integral'},
  {'FeatureName': 'review_scores_rating', 'FeatureType': 'Fractional'},
  {'FeatureName': 'review_scores_comfort', 'FeatureType': 'Fractional'},
  {'FeatureName': 'review_scores_amenities', 'FeatureType': 'Fractional'},
  {'FeatureName': 'review_scores_trip', 'FeatureType': 'Fractional'},
  {'FeatureName': 'review_scores_crew', 'FeatureType': 'Fractional'},
  {'FeatureName': 'review_scores_location', 'FeatureType': 'Fractional'},
  {'FeatureName': 'review_scores_price', 'FeatureType': 'Fractional'},
  {'FeatureName': 'number_of_reviews', 'FeatureType': 'Integral'},
  {'FeatureName': 'reviews_per_month', 'FeatureType': 'Fractional'},
  {'FeatureName': 'EventTime', 'FeatureTy

In [19]:
reviews_feature_group.ingest(data_frame=reviews)

2021-03-26 12:17:21,187 - sagemaker.feature_store.feature_group - INFO - Started ingesting index 0 to 77096
2021-03-26 12:57:36,505 - sagemaker.feature_store.feature_group - INFO - Successfully ingested row 0 to 77096


IngestionManagerPandas(feature_group_name='reviews-feature-group', sagemaker_session=<sagemaker.session.Session object at 0x7fa5508a5f40>, data_frame=       shuttle_id  review_scores_rating  review_scores_comfort  \
0           63561                  97.0                   10.0   
1           36260                  90.0                    8.0   
2           57015                  95.0                    9.0   
3           14035                  93.0                   10.0   
4           10036                  98.0                   10.0   
...           ...                   ...                    ...   
77091        4368                   NaN                    NaN   
77092        2983                   NaN                    NaN   
77093       69684                   NaN                    NaN   
77094       21738                   NaN                    NaN   
77095       72645                   NaN                    NaN   

       review_scores_amenities  review_scores_trip  revie

In [23]:
query = reviews_feature_group.athena_query()

In [24]:
query.table_name

'reviews-feature-group-1616760850'

In [29]:
query.run(f'SELECT * FROM "{query.table_name}" LIMIT 10', output_location=f"s3://{default_s3_bucket_name}/{prefix}/query_results")
query.wait()
dataset = query.as_dataframe()
dataset

2021-03-26 14:35:14,440 - sagemaker - INFO - Query d7e3a3ab-3988-4bd5-9545-d7ec062d5cdd is being executed.
2021-03-26 14:35:19,555 - sagemaker - INFO - Query d7e3a3ab-3988-4bd5-9545-d7ec062d5cdd successfully executed.


Unnamed: 0,shuttle_id,review_scores_rating,review_scores_comfort,review_scores_amenities,review_scores_trip,review_scores_crew,review_scores_location,review_scores_price,number_of_reviews,reviews_per_month,eventtime,transactionid,write_time,api_invocation_time,is_deleted
0,36260,90.0,8.0,9.0,10.0,9.0,9.0,9.0,3,0.09,1616761000.0,2,2021-03-26 12:21:04.102,2021-03-26 12:16:11.000,False
1,23389,76.0,8.0,8.0,8.0,8.0,9.0,9.0,5,0.05,1616761000.0,8,2021-03-26 12:21:04.102,2021-03-26 12:16:11.000,False
2,36260,90.0,8.0,9.0,10.0,9.0,9.0,9.0,3,0.09,1616761000.0,2,2021-03-26 12:21:04.102,2021-03-26 12:17:21.000,False
3,23389,76.0,8.0,8.0,8.0,8.0,9.0,9.0,5,0.05,1616761000.0,8,2021-03-26 12:21:04.102,2021-03-26 12:17:21.000,False
4,25926,93.0,10.0,9.0,10.0,10.0,10.0,9.0,168,1.64,1616761000.0,13,2021-03-26 12:21:04.102,2021-03-26 12:17:21.000,False
5,61282,97.0,10.0,10.0,10.0,10.0,9.0,10.0,52,0.51,1616761000.0,16,2021-03-26 12:21:04.102,2021-03-26 12:17:21.000,False
6,4428,95.0,9.0,10.0,10.0,10.0,9.0,9.0,137,1.37,1616761000.0,18,2021-03-26 12:21:04.102,2021-03-26 12:17:22.000,False
7,47570,89.0,9.0,7.0,9.0,10.0,9.0,8.0,7,0.13,1616761000.0,22,2021-03-26 12:21:04.102,2021-03-26 12:17:22.000,False
8,38226,,,,,,,,0,,1616761000.0,33,2021-03-26 12:21:04.102,2021-03-26 12:17:22.000,False
9,74362,96.0,9.0,9.0,10.0,10.0,10.0,9.0,30,0.46,1616761000.0,36,2021-03-26 12:21:04.102,2021-03-26 12:17:22.000,False


### 3. Online serving for inference

In [30]:
transaction_id = str(2)

# Helper to parse the feature value from the record.
def get_feature_value(record, feature_name):
    return str(list(filter(lambda r: r['FeatureName'] == feature_name, record))[0]['ValueAsString'])

transaction_response = featurestore_runtime.get_record(FeatureGroupName=reviews_feature_group.name, RecordIdentifierValueAsString=transaction_id)

In [32]:
transaction_record = transaction_response['Record']
transaction_record

[{'FeatureName': 'shuttle_id', 'ValueAsString': '36260'},
 {'FeatureName': 'review_scores_rating', 'ValueAsString': '90.0'},
 {'FeatureName': 'review_scores_comfort', 'ValueAsString': '8.0'},
 {'FeatureName': 'review_scores_amenities', 'ValueAsString': '9.0'},
 {'FeatureName': 'review_scores_trip', 'ValueAsString': '10.0'},
 {'FeatureName': 'review_scores_crew', 'ValueAsString': '9.0'},
 {'FeatureName': 'review_scores_location', 'ValueAsString': '9.0'},
 {'FeatureName': 'review_scores_price', 'ValueAsString': '9.0'},
 {'FeatureName': 'number_of_reviews', 'ValueAsString': '3'},
 {'FeatureName': 'reviews_per_month', 'ValueAsString': '0.09'},
 {'FeatureName': 'EventTime', 'ValueAsString': '1616760839.0'},
 {'FeatureName': 'TransactionID', 'ValueAsString': '2'}]