First, we need to install the relevant libaries required for this local training usecase. You will need to pip install boto3, pandas and Sagemaker into your Python environment. For this lab, running Python 3.11 as your kernel is recommended. Please see the requirements.txt for what's needed.


In [1]:
import tarfile
import boto3
import pandas as pd
import os
from sagemaker.estimator import Estimator
from sagemaker.local import LocalSession
from sagemaker.predictor import csv_serializer

In [2]:
from __future__ import print_function

import json
import os
import pickle
import sys
import traceback

import pandas as pd
from causalnex.discretiser import Discretiser
import warnings
from causalnex.structure import StructureModel
from sklearn.model_selection import train_test_split
from causalnex.network import BayesianNetwork
from causalnex.evaluation import classification_report
from causalnex.evaluation import roc_auc

  from .autonotebook import tqdm as notebook_tqdm


The next step is to create a SageMaker Local session, define the ARN for Sagemeker execution and create a session using the 'mlops-prod' profile. 


In [3]:
sagemaker_session = LocalSession()
sagemaker_session.config = {'local': {'local_code': True}}

role = 'arn:aws:iam::403775705461:role/SageMaker-IAM-Role-AB3'

Define the data location, which is apart of this repository.


In [4]:
data_location = "./data/heart_failure_clinical_records_dataset.csv"

In the cell, we are doing data pre-processing to make our dataset ML friendly. This code is using a Discretiser class from the causalnex library to transform a continuous feature into a discrete one. 

Taking age as an example, we are using an numeric_split_points=[60], which means it will split the data into two bins: below 60 and above 60. Similar approaches are used on other columns.

In [5]:
from causalnex.discretiser import Discretiser
import pandas as pd

initial_df = pd.read_csv(data_location)

initial_df["age"] = Discretiser(method="fixed", numeric_split_points=[60]).transform(
    initial_df["age"].values
)
initial_df["serum_sodium"] = Discretiser(method="fixed", numeric_split_points=[136]).transform(
    initial_df["serum_sodium"].values
)
initial_df["serum_creatinine"] = Discretiser(
    method="fixed", numeric_split_points=[1.1, 1.4]
).transform(initial_df["serum_sodium"].values)

initial_df["ejection_fraction"] = Discretiser(
    method="fixed", numeric_split_points=[30, 38, 42]
).transform(initial_df["ejection_fraction"].values)

initial_df["creatinine_phosphokinase"] = Discretiser(
    method="fixed", numeric_split_points=[120, 540, 670]
).transform(initial_df["creatinine_phosphokinase"].values)

initial_df["platelets"] = Discretiser(method="fixed", numeric_split_points=[263358]).transform(
    initial_df["platelets"].values
)

print ("Dataset after pre-processing")
initial_df.head()

Dataset after pre-processing


Unnamed: 0,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,DEATH_EVENT
0,1,0,2,0,0,1,1,0,0,1,0,4,1
1,0,0,3,0,2,0,1,0,1,1,0,6,1
2,1,0,1,0,0,0,0,0,0,1,1,7,1
3,0,1,0,0,0,0,0,0,1,1,0,7,1
4,1,1,1,1,0,0,1,0,0,0,0,8,1


In [6]:
sm = StructureModel()
sm.add_edges_from([
    ('ejection_fraction', 'DEATH_EVENT'),
    ('creatinine_phosphokinase', 'DEATH_EVENT'),
    ('age','DEATH_EVENT'),
    ('smoking','high_blood_pressure'),
    ('age','high_blood_pressure'),            
    ('serum_sodium','DEATH_EVENT'),
    ('high_blood_pressure','DEATH_EVENT'),
    ('anaemia','DEATH_EVENT'),
    ('creatinine_phosphokinase','DEATH_EVENT'),
    ('smoking','DEATH_EVENT')
])

train, test = train_test_split(initial_df, train_size=0.8, test_size=0.2, random_state=42)
        
bn = BayesianNetwork(sm)
bn = bn.fit_node_states(initial_df)
bn = bn.fit_cpds(train, method="BayesianEstimator", bayes_prior="K2")

roc, auc = roc_auc(bn, test, "DEATH_EVENT")
print("Model AUC: " + str(auc))

print(classification_report(bn, test, "DEATH_EVENT"))

# save the model
model_path = "models"
isExist = os.path.exists(model_path)
if not isExist:
   os.makedirs(model_path)
with open(os.path.join(model_path, 'causal_model.pkl'), 'wb') as out:
    pickle.dump(bn, out)

Model AUC: 0.7368055555555555
{'DEATH_EVENT_0': {'precision': 0.6122448979591837, 'recall': 0.8571428571428571, 'f1-score': 0.7142857142857143, 'support': 35.0}, 'DEATH_EVENT_1': {'precision': 0.5454545454545454, 'recall': 0.24, 'f1-score': 0.3333333333333333, 'support': 25.0}, 'accuracy': 0.6, 'macro avg': {'precision': 0.5788497217068646, 'recall': 0.5485714285714285, 'f1-score': 0.5238095238095238, 'support': 60.0}, 'weighted avg': {'precision': 0.5844155844155844, 'recall': 0.6, 'f1-score': 0.5555555555555555, 'support': 60.0}}


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[col] = df[col].map(self._node_states[col])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[col] = df[col].map(self._node_states[col])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[col] = df[col].map(self._node_states[col])
A value is trying to be set on a copy of a slice from a DataFrame.


Now we want to use this code to create a docker image that can be pushed to ECR and then use that image with Sagemaker

For this step, we need to create a docker image in our local environment. Please ensure you have 'sagemaker-causalnex-local' in your Docker Images.

1. Please ensure Docker is running locally
2. Please run 'docker build -t sagemaker-causal-nex:latest .' in the 'sagemaker-local-to-cloud/local/container' path
3. Please create a role in your AWS account with permissions to access SageMaker and insert the ARN in the 'role_value' section

In [8]:
from sagemaker.local import LocalSession

# DUMMY_IAM_ROLE = 'arn:aws:iam::111111111111:role/service-role/AmazonSageMaker-ExecutionRole-20200101T000001'
# LOCAL_SESSION = LocalSession()
# LOCAL_SESSION.config={'local': {'local_code': True}} # Ensure full code locality, see: https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode

image = 'sagemaker-causal-nex'

env={
    "MODEL_SERVER_WORKERS":"2"
    }

local_regressor = Estimator(
    image,
    role = 'arn:aws:iam::403775705461:role/SageMaker-IAM-Role-AB3',
    train_instance_count=1,
    train_instance_type="local")

train_location = 'file://'+data_location

local_regressor.fit(train_location, logs=True)



 Container tmp3hpy2po8-algo-1-mhav3-1  Creating
 Container tmp3hpy2po8-algo-1-mhav3-1  Created
Attaching to tmp3hpy2po8-algo-1-mhav3-1
tmp3hpy2po8-algo-1-mhav3-1  | Starting the training.
tmp3hpy2po8-algo-1-mhav3-1  | Model AUC: 0.7368055555555555
tmp3hpy2po8-algo-1-mhav3-1  | {'DEATH_EVENT_0': {'precision': 0.6122448979591837, 'recall': 0.8571428571428571, 'f1-score': 0.7142857142857143, 'support': 35.0}, 'DEATH_EVENT_1': {'precision': 0.5454545454545454, 'recall': 0.24, 'f1-score': 0.3333333333333333, 'support': 25.0}, 'accuracy': 0.6, 'macro avg': {'precision': 0.5788497217068646, 'recall': 0.5485714285714285, 'f1-score': 0.5238095238095238, 'support': 60.0}, 'weighted avg': {'precision': 0.5844155844155844, 'recall': 0.6, 'f1-score': 0.5555555555555555, 'support': 60.0}}
tmp3hpy2po8-algo-1-mhav3-1  | Training complete.
tmp3hpy2po8-algo-1-mhav3-1 exited with code 0
Aborting on container exit...
 Container tmp3hpy2po8-algo-1-mhav3-1  Stopping
 Container tmp3hpy2po8-algo-1-mhav3-1  St

In [9]:
predictor = local_regressor.deploy(1, 'local', env=env)



Attaching to tmpu3kh0bxq-algo-1-vunsu-1
tmpu3kh0bxq-algo-1-vunsu-1  | Starting the inference server with 2 workers.
tmpu3kh0bxq-algo-1-vunsu-1  | [2023-07-27 12:09:18 +0000] [10] [INFO] Starting gunicorn 21.2.0
tmpu3kh0bxq-algo-1-vunsu-1  | [2023-07-27 12:09:18 +0000] [10] [INFO] Listening at: unix:/tmp/gunicorn.sock (10)
tmpu3kh0bxq-algo-1-vunsu-1  | [2023-07-27 12:09:18 +0000] [10] [INFO] Using worker: sync
tmpu3kh0bxq-algo-1-vunsu-1  | [2023-07-27 12:09:18 +0000] [12] [INFO] Booting worker with pid: 12
tmpu3kh0bxq-algo-1-vunsu-1  | [2023-07-27 12:09:18 +0000] [13] [INFO] Booting worker with pid: 13
!tmpu3kh0bxq-algo-1-vunsu-1  | 172.18.0.1 - - [27/Jul/2023:12:09:22 +0000] "GET /ping HTTP/1.1" 200 1 "-" "python-urllib3/1.26.10"


In [15]:
test_data = open('payload.json')
test_data1 = '{"age": 1, "anaemia": 0, "creatinine_phosphokinase": 2, "diabetes": 0, "ejection_fraction": 0, "high_blood_pressure": 1, "platelets": 1, "serum_creatinine": 0, "serum_sodium": 0, "sex": 1, "smoking": 0, "time": 4}'

#with open('payload.json') as f:
#    d = json.load(f)
#    s = json.dumps(d)
#    print(d)

with open('payload.json') as f:
    test = json.load(f)


# s = json.dumps(test_data1)
print (test)
print(type(test)) 
# print (s)

{'data': [{'age': 1, 'anaemia': 0, 'creatinine_phosphokinase': 2, 'diabetes': 0, 'ejection_fraction': 0, 'high_blood_pressure': 1, 'platelets': 1, 'serum_creatinine': 0, 'serum_sodium': 0, 'sex': 1, 'smoking': 0, 'time': 4}, {'age': 0, 'anaemia': 0, 'creatinine_phosphokinase': 2, 'diabetes': 0, 'ejection_fraction': 0, 'high_blood_pressure': 1, 'platelets': 1, 'serum_creatinine': 0, 'serum_sodium': 0, 'sex': 1, 'smoking': 1, 'time': 4}], 'pred_type': 'prediction', 'target_node': 'DEATH_EVENT'}
<class 'dict'>


In [17]:
predicted = predictor.predict(test["data"]).decode('utf-8')
# predicted = predictor.predict(test["data"]).decode('utf-8')

# predicted = predictor.predict(s)

tmpu3kh0bxq-algo-1-vunsu-1  | 172.18.0.1 - - [27/Jul/2023:12:15:09 +0000] "POST /invocations HTTP/1.1" 400 0 "-" "python-urllib3/1.26.10"


TypeError: can't concat dict to bytes

In [12]:
print(predicted)

This predictor only supports JSON data


In [None]:
predictor.delete_endpoint()