In [2]:

!pip install s3fs 
import boto3

import sagemaker

from sagemaker.session import Session

region = boto3.Session().region_name
 
boto_session = boto3.Session(region_name=region)
 
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

# Explain feature store
feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

Keyring is skipped due to an exception: 'keyring.backends'
[0m

# Set up S3 Bucket for the OfflineStore

SageMaker Feature Store writes the ata in the OfflineStore of a FeatureGroup to S3 bucket owned by you. To be able to write to your S3 bucket, SageMaker Feature Store assumes an IAM role which has access to it. The role is also owned by you. Note that the same bucket can be re-used across FeatureGroups. Data in the bucket is partitioned by FeatureGroup.

In [3]:
# Set up S3 Bucket for the offlinestore
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'feature-store'

print(default_s3_bucket_name)

sagemaker-us-east-1-555918697305


In [4]:
region

'us-east-1'

 # Set up IAM Role

In [5]:
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print(role)

arn:aws:iam::555918697305:role/service-role/AmazonSageMaker-ExecutionRole-20221121T170842


# Inspect Dataset
The Heart Failure Clinical Dataset contains electronic medical records of patients quantify symptoms, body features, and clinical laboratory test values of 299 patients with heart failure in 2015.

The dataset contains one table with thirteen (13) columns:

age: age of the patient

In [6]:
import pandas as pd
from IPython.display import display

In [7]:
!aws s3 cp  ./clinical_records_dataset.csv s3://$default_s3_bucket_name/$prefix/data/

upload: ./clinical_records_dataset.csv to s3://sagemaker-us-east-1-555918697305/feature-store/data/clinical_records_dataset.csv


In [8]:
clinical_data_file_name = 'clinical_records_dataset.csv'
clinical_data_path = "s3://{}/{}/data/{}".format(default_s3_bucket_name, prefix, clinical_data_file_name)

In [9]:
clinical_data_path

's3://sagemaker-us-east-1-555918697305/feature-store/data/clinical_records_dataset.csv'

In [10]:
clinical = pd.read_csv(clinical_data_path)
pd.set_option('display.max_columns', 500)
clinical.head()

Unnamed: 0,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,DEATH_EVENT
0,75.0,0,582,0,20,1,265000.0,1.9,130,1,0,4,1
1,55.0,0,7861,0,38,0,263358.03,1.1,136,1,0,6,1
2,65.0,0,146,0,20,0,162000.0,1.3,129,1,1,7,1
3,50.0,1,111,0,20,0,210000.0,1.9,137,1,0,7,1
4,65.0,1,160,1,20,0,327000.0,2.7,116,0,0,8,1


In [11]:
print ('percentage of the value missing in each column is: ')
clinical.isnull().sum() / len(clinical)

percentage of the value missing in each column is: 


age                         0.0
anaemia                     0.0
creatinine_phosphokinase    0.0
diabetes                    0.0
ejection_fraction           0.0
high_blood_pressure         0.0
platelets                   0.0
serum_creatinine            0.0
serum_sodium                0.0
sex                         0.0
smoking                     0.0
time                        0.0
DEATH_EVENT                 0.0
dtype: float64

In [12]:
#### Add an id for each patient
clinical.reset_index(inplace = True)
clinical.rename(columns = {'index': 'patient_id'}, inplace = True)

In [13]:
import time
 
current_time_sec = int(round(time.time()))
# append EventTime feature
clinical['EventTime'] = pd.Series([current_time_sec]*len(clinical), dtype="float64")

In [14]:
def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")
 
# cast object dtype to string. The SageMaker Feature Store Python SDK will then map the string dtype to
# String feature type.
cast_object_to_string(clinical)

# Create a TimeStamp for each Record

In [15]:
from time import gmtime, strftime, sleep

clinical_feature_group_name = 'clinical-feature-group-' + strftime('%d-%H-%M-%S', gmtime())

# Create a feature group

In [16]:
from sagemaker.feature_store.feature_group import FeatureGroup

clinical_feature_group = FeatureGroup(name=clinical_feature_group_name, sagemaker_session=feature_store_session)

# Define Identifier

In this step, we will specify a record identifier name and an event time feature name.

In [17]:
# record identifier and event time feature names
record_identifier_feature_name = "patient_id"
event_time_feature_name = "EventTime"

# Load feature definitions to the feature group

In [18]:
clinical_feature_group.load_feature_definitions(data_frame=clinical); # output is suppressed

In [19]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")
 
clinical_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}", #offline feature store bucket
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)
wait_for_feature_group_creation_complete(feature_group=clinical_feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup clinical-feature-group-02-08-42-31 successfully created.


# Work with you FeatureGroup
Check Feature Group info

When you create a feature group 

In [20]:
clinical_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:555918697305:feature-group/clinical-feature-group-02-08-42-31',
 'FeatureGroupName': 'clinical-feature-group-02-08-42-31',
 'RecordIdentifierFeatureName': 'patient_id',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'patient_id',
   'FeatureType': 'Integral'},
  {'FeatureName': 'age', 'FeatureType': 'Fractional'},
  {'FeatureName': 'anaemia', 'FeatureType': 'Integral'},
  {'FeatureName': 'creatinine_phosphokinase', 'FeatureType': 'Integral'},
  {'FeatureName': 'diabetes', 'FeatureType': 'Integral'},
  {'FeatureName': 'ejection_fraction', 'FeatureType': 'Integral'},
  {'FeatureName': 'high_blood_pressure', 'FeatureType': 'Integral'},
  {'FeatureName': 'platelets', 'FeatureType': 'Fractional'},
  {'FeatureName': 'serum_creatinine', 'FeatureType': 'Fractional'},
  {'FeatureName': 'serum_sodium', 'FeatureType': 'Integral'},
  {'FeatureName': 'sex', 'FeatureType': 'Integral'},
  {'FeatureName': 'smoking', 'Featu

In [21]:
sagemaker_client.list_feature_groups() #use boto client to list FeatureGroups

{'FeatureGroupSummaries': [{'FeatureGroupName': 'clinical-feature-group-02-08-42-31',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:555918697305:feature-group/clinical-feature-group-02-08-42-31',
   'CreationTime': datetime.datetime(2022, 12, 2, 8, 42, 31, 695000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'clinical-feature-group-02-08-41-48',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:555918697305:feature-group/clinical-feature-group-02-08-41-48',
   'CreationTime': datetime.datetime(2022, 12, 2, 8, 41, 49, 155000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'clinical-feature-group-02-08-37-42',
   'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:555918697305:feature-group/clinical-feature-group-02-08-37-42',
   'CreationTime': datetime.datetime(2022, 12, 2, 8, 37, 42, 676000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created'},
  {'FeatureGroupName': 'clinical-feature-group-02-08-37-07',
   'Feature

In [22]:
# Put records into the Featrue Store


In [23]:
clinical_feature_group.ingest(
    data_frame = clinical, max_workers = 3, wait = True)

IngestionManagerPandas(feature_group_name='clinical-feature-group-02-08-42-31', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7faafc739bd0>, max_workers=3, max_processes=1, profile_name=None, _async_result=<multiprocess.pool.MapResult object at 0x7faafab5b990>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

# Get Records from a Feature Group
We can use the get_record function to retrieve the data for a specific feature by its record identifier from the online store.

In [24]:
record_identifier_value = str(200)
 
featurestore_runtime.get_record(FeatureGroupName=clinical_feature_group_name, RecordIdentifierValueAsString=record_identifier_value)

{'ResponseMetadata': {'RequestId': 'a4ff262e-a685-4c48-b159-8213131becb9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a4ff262e-a685-4c48-b159-8213131becb9',
   'content-type': 'application/json',
   'content-length': '788',
   'date': 'Fri, 02 Dec 2022 08:43:00 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'patient_id', 'ValueAsString': '200'},
  {'FeatureName': 'age', 'ValueAsString': '63.0'},
  {'FeatureName': 'anaemia', 'ValueAsString': '1'},
  {'FeatureName': 'creatinine_phosphokinase', 'ValueAsString': '1767'},
  {'FeatureName': 'diabetes', 'ValueAsString': '0'},
  {'FeatureName': 'ejection_fraction', 'ValueAsString': '45'},
  {'FeatureName': 'high_blood_pressure', 'ValueAsString': '0'},
  {'FeatureName': 'platelets', 'ValueAsString': '73000.0'},
  {'FeatureName': 'serum_creatinine', 'ValueAsString': '0.7'},
  {'FeatureName': 'serum_sodium', 'ValueAsString': '137'},
  {'FeatureName': 'sex', 'ValueAsString': '1'},
  {'FeatureName': 'smoking', 'Value

# Generate Hive DDL Commands
The sagemaker Pyhton SDK's Feature Store class also provides the functionality to generate Hive DDL commands

In [25]:
print(clinical_feature_group.as_hive_ddl())

CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.clinical-feature-group-02-08-42-31 (
  patient_id INT
  age FLOAT
  anaemia INT
  creatinine_phosphokinase INT
  diabetes INT
  ejection_fraction INT
  high_blood_pressure INT
  platelets FLOAT
  serum_creatinine FLOAT
  serum_sodium INT
  sex INT
  smoking INT
  time INT
  DEATH_EVENT INT
  EventTime FLOAT
  write_time TIMESTAMP
  event_time TIMESTAMP
  is_deleted BOOLEAN
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  STORED AS
  INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
  OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
LOCATION 's3://sagemaker-us-east-1-555918697305/feature-store/555918697305/sagemaker/us-east-1/offline-store/clinical-feature-group-02-08-42-31-1669970551/data'


In [26]:
%%time
s3_client = boto3.client('s3', region_name=region)
 
account_id = boto3.client('sts').get_caller_identity()["Account"]
 
clinical_feature_group_table_name = clinical_feature_group.describe().get('OfflineStoreConfig').get('DataCatalogConfig').get('TableName')
 
print(account_id)
print(clinical_feature_group_table_name)
 
clinical_feature_group_s3_prefix = prefix + '/' + account_id + '/sagemaker/' + region + '/offline-store/' + clinical_feature_group_table_name + '/data'
 
offline_store_contents = None
while (offline_store_contents is None):
    objects_in_bucket = s3_client.list_objects(Bucket=default_s3_bucket_name, Prefix=clinical_feature_group_s3_prefix)
    if ('Contents' in objects_in_bucket and len(objects_in_bucket['Contents']) >= 1):
        offline_store_contents = objects_in_bucket['Contents']
    else:
        print('Waiting for data in offline store...\n')
        sleep(60)

print('Data available.')

555918697305
clinical-feature-group-02-08-42-31-1669970551
Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Waiting for data in offline store...

Data available.
CPU times: user 179 ms, sys: 21.2 ms, total: 200 ms
Wall time: 6min 1s


In [27]:
# Build a training Dataset



In [28]:
clinical_query = clinical_feature_group.athena_query()
clinical_table = clinical_query.table_name

In [29]:
# Athena query
query_string = 'SELECT * FROM "'+clinical_table+'" LIMIT 290'
 
# run Athena query. The output is loaded to a Pandas dataframe.
dataset = pd.DataFrame()
clinical_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/query_results/')
clinical_query.wait()
dataset = clinical_query.as_dataframe()

In [30]:
id_for_test = []
for i in range(299):
    if i not in dataset['patient_id'].unique():
        id_for_test.append(i)

In [31]:
# Select useful columns for training with target column as the first.
dataset = dataset[["death_event", "age", 'anaemia', 'creatinine_phosphokinase', 'diabetes',
       'ejection_fraction', 'high_blood_pressure', 'platelets',
       'serum_creatinine', 'serum_sodium', 'sex', 'smoking', 'time']]
# Write to csv in S3 without headers and index column.
dataset.to_csv('dataset.csv', header=False, index=False)
s3_client.upload_file('dataset.csv', default_s3_bucket_name, prefix+'/training_input/dataset.csv')
dataset_uri_prefix = 's3://'+default_s3_bucket_name+'/'+prefix+'/training_input/';

In [32]:
dataset.head()

Unnamed: 0,death_event,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time
0,1,75.0,1,81,0,38,1,368000.0,4.0,131,1,1,10
1,0,73.0,0,582,0,35,1,203000.0,1.3,134,1,0,195
2,0,58.0,1,57,0,25,0,189000.0,1.3,132,1,1,205
3,1,60.0,0,582,1,38,1,451000.0,0.6,138,1,1,40
4,0,60.0,1,607,0,40,0,216000.0,0.6,138,1,1,54


In [36]:
#Train and Deploy the MOdel


training_image=sagemaker.image_uris.retrieve("xgboost", region, "1.0-1")


In [37]:
training_output_path='s3://' + default_s3_bucket_name+'/'+prefix + '/training_output'
 
from sagemaker.estimator import Estimator
training_model = Estimator(training_image,
                           role, 
                           instance_count=1, 
                           instance_type='ml.m5.2xlarge',
                           volume_size = 5,
                           max_run = 3600,
                           input_mode= 'File',
                           output_path=training_output_path,
                           sagemaker_session=feature_store_session)

In [38]:
training_model.set_hyperparameters(objective = "binary:logistic",
                                   num_round = 50)

In [39]:
# Creating data channels
train_data = sagemaker.inputs.TrainingInput(dataset_uri_prefix, distribution='FullyReplicated',
                                            content_type='text/csv', s3_data_type='S3Prefix')
data_channels = {'train': train_data}

In [40]:
training_model.fit(inputs=data_channels,logs=True)

2022-12-02 08:52:02 Starting - Starting the training job...
2022-12-02 08:52:26 Starting - Preparing the instances for trainingProfilerReport-1669971122: InProgress
......
2022-12-02 08:53:29 Downloading - Downloading input data...
2022-12-02 08:53:49 Training - Downloading the training image...
2022-12-02 08:54:30 Uploading - Uploading generated training model[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34m[08:54:23] 290x12 matrix with 3480 entries loaded from /opt/ml/input/data/train?format=csv&label_colu

In [41]:
# Setup hosting for the model
#Afyer finishing the training we have to deploy the trained model as an amazon sagemaker real time hosted endpoints. This will allow us to make predictions (or instances) from the model.

In [43]:
predictor = training_model.deploy(initial_instance_count = 1, instance_type = 'ml.m5.xlarge')

----!

In [45]:
#Sagemaker Feature Store During Inference
id_for_test

[70, 109, 130, 156, 174, 190, 191, 232, 283]

In [46]:
#Testing for leftout patient


In [47]:
patient_id = str(194)
 
# Helper to parse the feature value from the record.
def get_feature_value(record, feature_name):
    return str(list(filter(lambda r: r['FeatureName'] == feature_name, record))[0]['ValueAsString'])
 
clinical_response = featurestore_runtime.get_record(FeatureGroupName=clinical_feature_group_name, RecordIdentifierValueAsString=patient_id)
clinical_record = clinical_response['Record']
clinical_record

[{'FeatureName': 'patient_id', 'ValueAsString': '194'},
 {'FeatureName': 'age', 'ValueAsString': '45.0'},
 {'FeatureName': 'anaemia', 'ValueAsString': '0'},
 {'FeatureName': 'creatinine_phosphokinase', 'ValueAsString': '582'},
 {'FeatureName': 'diabetes', 'ValueAsString': '0'},
 {'FeatureName': 'ejection_fraction', 'ValueAsString': '20'},
 {'FeatureName': 'high_blood_pressure', 'ValueAsString': '1'},
 {'FeatureName': 'platelets', 'ValueAsString': '126000.0'},
 {'FeatureName': 'serum_creatinine', 'ValueAsString': '1.6'},
 {'FeatureName': 'serum_sodium', 'ValueAsString': '135'},
 {'FeatureName': 'sex', 'ValueAsString': '1'},
 {'FeatureName': 'smoking', 'ValueAsString': '0'},
 {'FeatureName': 'time', 'ValueAsString': '180'},
 {'FeatureName': 'DEATH_EVENT', 'ValueAsString': '1'},
 {'FeatureName': 'EventTime', 'ValueAsString': '1669970551.0'}]

In [48]:
inference_request = [
    get_feature_value(clinical_record, 'age'),
    get_feature_value(clinical_record, 'anaemia'),
    get_feature_value(clinical_record, 'creatinine_phosphokinase'),
    get_feature_value(clinical_record, 'diabetes'),
    get_feature_value(clinical_record, 'ejection_fraction'),
    get_feature_value(clinical_record, 'high_blood_pressure'),
    get_feature_value(clinical_record, 'platelets'),
    get_feature_value(clinical_record, 'serum_creatinine'),
    get_feature_value(clinical_record, 'serum_sodium'),
    get_feature_value(clinical_record, 'sex'),
    get_feature_value(clinical_record, 'smoking'),
    get_feature_value(clinical_record, 'time')
]

In [49]:
#the predictor will call pur hosted model and give a prediction result.this model predicts the probability of heart failure to patient 


In [50]:
import json
 
results = predictor.predict(','.join(inference_request), initial_args = {"ContentType": "text/csv"})
prediction = json.loads(results)
print (prediction)

0.9914258718490601


In [52]:
#CLeanup REsources
predictor.delete_endpoint()
clinical_feature_group.delete()