# Feature Store 

In this Notebook we will take our dataset and place it into the feature store.

Note: In order to execute this Notebook your Sagemaker Execution Role must be granted the **AmazonS3FullAccess** policy, or a more restricted version for access to the S3 bucket you will use for your feature store.

In [1]:
import utils.display as disp
import utils.config as cfg
import pandas as pd


In [2]:
path_to_partitioned = cfg.get_path_to_partitioned_data()
train_data = path_to_partitioned + "/train.csv"
train_df = pd.read_csv(train_data)
train_df.head()

Unnamed: 0,encounter_id,patient_nbr,race,gender,age,weight,admission_type_id,discharge_disposition_id,admission_source_id,time_in_hospital,...,citoglipton,insulin,glyburide-metformin,glipizide-metformin,glimepiride-pioglitazone,metformin-rosiglitazone,metformin-pioglitazone,change,diabetesMed,readmitted
0,235619604,66979476,Caucasian,Female,[60-70),?,Physician Referral,3,7,5,...,No,Down,No,No,No,No,No,Ch,Yes,1
1,224890200,59228019,Caucasian,Female,[60-70),?,Transfer from a Skilled Nursing Facility (SNF),3,7,4,...,No,Up,No,No,No,No,No,Ch,Yes,1
2,209008848,37592973,Caucasian,Male,[40-50),?,HMO Referral,1,1,6,...,No,No,No,No,No,No,No,No,No,0
3,201556332,102918168,Caucasian,Male,[40-50),?,Physician Referral,1,7,3,...,No,Down,No,No,No,No,No,Ch,Yes,1
4,62886192,4232214,Caucasian,Male,[70-80),?,Physician Referral,18,7,2,...,No,No,No,No,No,No,No,No,Yes,1


In [3]:
import boto3
import sagemaker
from sagemaker.session import Session

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

# S3 Bucket for Offline Storage

The Feature Store requires an S3 bucket for offline store of your features.

They will be structured with meta-data, and made available to query using Athena SQL. 

In [4]:
# You can modify the following to use a bucket of your choosing
s3_client = boto3.client('s3', region_name=region)
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'sagemaker-featurestore-newdemo'

print(default_s3_bucket_name)

sagemaker-eu-west-2-320389841409


In [5]:
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print (role)


arn:aws:iam::320389841409:role/service-role/AmazonSageMaker-ExecutionRole-20201022T141998


In [6]:
from time import gmtime, strftime, sleep

feature_group_name = 'patient-feature-group-' + strftime('%d-%H-%M-%S', gmtime())

In [7]:
from sagemaker.feature_store.feature_group import FeatureGroup

feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)

In [8]:

def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")

# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(train_df)


In [10]:
import time
current_time_sec = int(round(time.time()))

# record identifier and event time feature names
record_identifier_feature_name = "encounter_id"
event_time_feature_name = "RecordTime"

# append EventTime feature
train_df[event_time_feature_name] = pd.Series([current_time_sec]*len(train_df), dtype="float64")


In [11]:
train_df.head()

Unnamed: 0,encounter_id,patient_nbr,race,gender,age,weight,admission_type_id,discharge_disposition_id,admission_source_id,time_in_hospital,...,insulin,glyburide-metformin,glipizide-metformin,glimepiride-pioglitazone,metformin-rosiglitazone,metformin-pioglitazone,change,diabetesMed,readmitted,RecordTime
0,235619604,66979476,Caucasian,Female,[60-70),?,Physician Referral,3,7,5,...,Down,No,No,No,No,No,Ch,Yes,1,1614641000.0
1,224890200,59228019,Caucasian,Female,[60-70),?,Transfer from a Skilled Nursing Facility (SNF),3,7,4,...,Up,No,No,No,No,No,Ch,Yes,1,1614641000.0
2,209008848,37592973,Caucasian,Male,[40-50),?,HMO Referral,1,1,6,...,No,No,No,No,No,No,No,No,0,1614641000.0
3,201556332,102918168,Caucasian,Male,[40-50),?,Physician Referral,1,7,3,...,Down,No,No,No,No,No,Ch,Yes,1,1614641000.0
4,62886192,4232214,Caucasian,Male,[70-80),?,Physician Referral,18,7,2,...,No,No,No,No,No,No,No,Yes,1,1614641000.0


In [12]:
# load feature definitions to the feature group. 
# SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.

feature_group.load_feature_definitions(data_frame=train_df); # output is suppressed


In [13]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

wait_for_feature_group_creation_complete(feature_group=feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup patient-feature-group-01-23-19-38 successfully created.


# Inspect the feature group

In [14]:
feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-2:320389841409:feature-group/patient-feature-group-01-23-19-38',
 'FeatureGroupName': 'patient-feature-group-01-23-19-38',
 'RecordIdentifierFeatureName': 'encounter_id',
 'EventTimeFeatureName': 'RecordTime',
 'FeatureDefinitions': [{'FeatureName': 'encounter_id',
   'FeatureType': 'Integral'},
  {'FeatureName': 'patient_nbr', 'FeatureType': 'Integral'},
  {'FeatureName': 'race', 'FeatureType': 'String'},
  {'FeatureName': 'gender', 'FeatureType': 'String'},
  {'FeatureName': 'age', 'FeatureType': 'String'},
  {'FeatureName': 'weight', 'FeatureType': 'String'},
  {'FeatureName': 'admission_type_id', 'FeatureType': 'String'},
  {'FeatureName': 'discharge_disposition_id', 'FeatureType': 'Integral'},
  {'FeatureName': 'admission_source_id', 'FeatureType': 'Integral'},
  {'FeatureName': 'time_in_hospital', 'FeatureType': 'Integral'},
  {'FeatureName': 'payer_code', 'FeatureType': 'String'},
  {'FeatureName': 'medical_specialty', 'FeatureType'

In [15]:
 # use boto client to list FeatureGroups
sagemaker_client.list_feature_groups()


{'FeatureGroupSummaries': [{'FeatureGroupName': 'patient-feature-group-23-23-30-50',
   'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-2:320389841409:feature-group/patient-feature-group-23-23-30-50',
   'CreationTime': datetime.datetime(2021, 2, 23, 23, 44, 11, 40000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created',
   'OfflineStoreStatus': {'Status': 'Active'}},
  {'FeatureGroupName': 'patient-feature-group-01-23-19-38',
   'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-2:320389841409:feature-group/patient-feature-group-01-23-19-38',
   'CreationTime': datetime.datetime(2021, 3, 1, 23, 22, 41, 948000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'FG-flow-26-01-28-08-1dade39c',
   'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-2:320389841409:feature-group/fg-flow-26-01-28-08-1dade39c',
   'CreationTime': datetime.datetime(2021, 2, 26, 1, 28, 9, 585000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'FG-flow-26-01-20-3

In [16]:
feature_group.ingest(
    data_frame=train_df, max_workers=3, wait=True
)

IngestionManagerPandas(feature_group_name='patient-feature-group-01-23-19-38', sagemaker_session=<sagemaker.session.Session object at 0x7f6c84ef0690>, data_frame=       encounter_id  patient_nbr       race  gender      age weight  \
0         235619604     66979476  Caucasian  Female  [60-70)      ?   
1         224890200     59228019  Caucasian  Female  [60-70)      ?   
2         209008848     37592973  Caucasian    Male  [40-50)      ?   
3         201556332    102918168  Caucasian    Male  [40-50)      ?   
4          62886192      4232214  Caucasian    Male  [70-80)      ?   
...             ...          ...        ...     ...      ...    ...   
61054     159702168     65998809  Caucasian    Male  [40-50)      ?   
61055     185459760     40948776  Caucasian    Male  [40-50)      ?   
61056     184227156     25467516  Caucasian  Female  [50-60)      ?   
61057     147755358     98568972   Hispanic    Male  [70-80)      ?   
61058      29365914     60704199          ?  Female  [80-

In [17]:
record_identifier_value = str(112487478)

record = featurestore_runtime.get_record(FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=record_identifier_value)

In [18]:
print(record)

{'ResponseMetadata': {'RequestId': 'a1964a51-c3c7-4545-96f2-b29a6d110e94', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'a1964a51-c3c7-4545-96f2-b29a6d110e94', 'content-type': 'application/json', 'content-length': '2751', 'date': 'Mon, 01 Mar 2021 23:32:50 GMT'}, 'RetryAttempts': 0}, 'Record': [{'FeatureName': 'encounter_id', 'ValueAsString': '112487478'}, {'FeatureName': 'patient_nbr', 'ValueAsString': '62389053'}, {'FeatureName': 'race', 'ValueAsString': 'Caucasian'}, {'FeatureName': 'gender', 'ValueAsString': 'Female'}, {'FeatureName': 'age', 'ValueAsString': '[80-90)'}, {'FeatureName': 'weight', 'ValueAsString': '?'}, {'FeatureName': 'admission_type_id', 'ValueAsString': ' Physician Referral'}, {'FeatureName': 'discharge_disposition_id', 'ValueAsString': '6'}, {'FeatureName': 'admission_source_id', 'ValueAsString': '7'}, {'FeatureName': 'time_in_hospital', 'ValueAsString': '4'}, {'FeatureName': 'payer_code', 'ValueAsString': 'MC'}, {'FeatureName': 'medical_specialty',

In [19]:
print(feature_group.as_hive_ddl())

CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.patient-feature-group-01-23-19-38 (
  encounter_id INT
  patient_nbr INT
  race STRING
  gender STRING
  age STRING
  weight STRING
  admission_type_id STRING
  discharge_disposition_id INT
  admission_source_id INT
  time_in_hospital INT
  payer_code STRING
  medical_specialty STRING
  num_lab_procedures INT
  num_procedures INT
  num_medications INT
  number_outpatient INT
  number_emergency INT
  number_inpatient INT
  diag_1 STRING
  diag_2 STRING
  diag_3 STRING
  number_diagnoses INT
  max_glu_serum STRING
  A1Cresult STRING
  metformin STRING
  repaglinide STRING
  nateglinide STRING
  chlorpropamide STRING
  glimepiride STRING
  acetohexamide STRING
  glipizide STRING
  glyburide STRING
  tolbutamide STRING
  pioglitazone STRING
  rosiglitazone STRING
  acarbose STRING
  miglitol STRING
  troglitazone STRING
  tolazamide STRING
  examide STRING
  citoglipton STRING
  insulin STRING
  glyburide-metformin STRING
  glipizi

In [None]:
account_id = boto3.client('sts').get_caller_identity()["Account"]
print(account_id)

feature_group_s3_prefix = prefix + '/' + account_id + '/sagemaker/' + region + '/offline-store/' + feature_group_name + '/data'

offline_store_contents = None
while (offline_store_contents is None):
    objects_in_bucket = s3_client.list_objects(Bucket=default_s3_bucket_name,Prefix=feature_group_s3_prefix)
    if ('Contents' in objects_in_bucket and len(objects_in_bucket['Contents']) > 1):
        offline_store_contents = objects_in_bucket['Contents']
    else:
        print('Waiting for data in offline store...\n')
        sleep(60)

print('Data available.')



# Retrieve the data

Feature Store makes the data available in Glue Data Catalog that can be queried using Athena.

In this example we are just selecting a subset of the columns. 
However we could just as easily join multiple feature groups together to form a more comprehensive data set.

In [20]:
_query = feature_group.athena_query()

_table = _query.table_name

query_string = 'SELECT readmitted, admission_source_id, number_diagnoses, num_medications, number_inpatient, number_outpatient  FROM "'+_table+'" '
print('Running ' + query_string)

# run Athena query. The output is loaded to a Pandas dataframe.
#dataset = pd.DataFrame()
_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/'+prefix+'/query_results/')
_query.wait()
dataset = _query.as_dataframe()

dataset

Running SELECT readmitted, admission_source_id, number_diagnoses, num_medications, number_inpatient, number_outpatient  FROM "patient-feature-group-01-23-19-38-1614640961" 


Unnamed: 0,readmitted,admission_source_id,number_diagnoses,num_medications,number_inpatient,number_outpatient
0,0,7,9,9,0,0
1,1,7,9,16,1,0
2,0,7,8,3,0,0
3,1,1,9,47,1,0
4,1,7,9,15,1,0
...,...,...,...,...,...,...
53491,0,7,9,8,0,3
53492,0,7,9,13,1,0
53493,0,1,7,8,0,0
53494,1,2,6,28,1,0


### Get S3 path for ML jobs

Sagemaker ML Jobs expect to retrive data from S3.

Luckily, the query results were put directly into S3 for us.
We just need to build the path to our query result.

In [None]:
# Prepare query results for training.
query_execution = _query.get_query_execution()
query_result = 's3://'+default_s3_bucket_name+'/'+prefix+'/query_results/'+query_execution['QueryExecution']['QueryExecutionId']+'.csv'
print(query_result)
