In [38]:
import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker


from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer

from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines

from sagemaker.image_uris import retrieve

In [39]:
# Instantiate AWS services session and client objects
sess = sagemaker.Session()
write_bucket = sess.default_bucket()
write_prefix = 'stroke-prediction'
# write_prefix = "fraud-detect-demo"

region = sess.boto_region_name
s3_client = boto3.client("s3", region_name=region)
sm_client = boto3.client("sagemaker", region_name=region)
sm_runtime_client = boto3.client("sagemaker-runtime")

# Fetch SageMaker execution role
sagemaker_role = sagemaker.get_execution_role()


# S3 locations used for parameterizing the notebook run
# read_bucket = "sagemaker-sample-files"
# read_prefix = "datasets/tabular/synthetic_automobile_claims" 
read_bucket = "mle-group7-project"
read_prefix = "data"

# S3 location where raw data to be fetched from
raw_data_key = f"{write_bucket}/{read_bucket}/{read_prefix}"

# S3 location where processed data to be uploaded
processed_data_key = f"{write_prefix}/processed"

# S3 location where train data to be uploaded
train_data_key = f"{write_prefix}/train"

# S3 location where validation data to be uploaded
validation_data_key = f"{write_prefix}/validation"

# S3 location where test data to be uploaded
test_data_key = f"{write_prefix}/test"


# Full S3 paths
data_uri = f"s3://{raw_data_key}/healthcare-dataset-stroke-data.csv"
output_data_uri = f"s3://{write_bucket}/{write_prefix}/"
scripts_uri = f"s3://{write_bucket}/{write_prefix}/code"
estimator_output_uri = f"s3://{write_bucket}/{write_prefix}/training_jobs"
processing_output_uri = f"s3://{write_bucket}/{write_prefix}/processing_jobs"
model_eval_output_uri = f"s3://{write_bucket}/{write_prefix}/model_eval"


# claims_data_uri = f"{raw_data_key}/claims.csv"
# customers_data_uri = f"{raw_data_key}/customers.csv"
# output_data_uri = f"s3://{write_bucket}/{write_prefix}/"
# scripts_uri = f"s3://{write_bucket}/{write_prefix}/scripts"
# estimator_output_uri = f"s3://{write_bucket}/{write_prefix}/training_jobs"
# processing_output_uri = f"s3://{write_bucket}/{write_prefix}/processing_jobs"
# model_eval_output_uri = f"s3://{write_bucket}/{write_prefix}/model_eval"
# clarify_bias_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/bias_config"
# clarify_explainability_config_output_uri = f"s3://{write_bucket}/{write_prefix}/model_monitor/explainability_config"
# bias_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/bias"
# explainability_report_output_uri = f"s3://{write_bucket}/{write_prefix}/clarify_output/pipeline/explainability"

# Retrieve training image
training_image = retrieve(framework="xgboost", region=region, version="1.7-1")

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


In [40]:
# Set names of pipeline objects
pipeline_name = "StrokeXGBPipeline"
pipeline_model_name = "stroke-prediction-xgb-pipeline"
model_package_group_name = "stroke-prediction-xgb-model-group"
base_job_name_prefix = "stroke-prediction"
endpoint_config_name = f"{pipeline_model_name}-endpoint-config"
endpoint_name = f"{pipeline_model_name}-endpoint"

# Set data parameters
target_col = "stroke"

# Set instance types and counts
process_instance_type = "ml.c5.xlarge"
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"
predictor_instance_count = 1
predictor_instance_type = "ml.m4.xlarge"
# clarify_instance_count = 1
# clarify_instance_type = "ml.m4.xlarge"

In [41]:
# Set up pipeline input parameters

# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set deployment instance type
deploy_instance_type_param = ParameterString(
    name="DeployInstanceType",
    default_value=predictor_instance_type,
)

# Set deployment instance count
deploy_instance_count_param = ParameterInteger(
    name="DeployInstanceCount",
    default_value=predictor_instance_count
)

# Set Clarify check instance type
# clarify_instance_type_param = ParameterString(
#     name="ClarifyInstanceType",
#     default_value=clarify_instance_type,
# )

# Set model bias check params
# skip_check_model_bias_param = ParameterBoolean(
#     name="SkipModelBiasCheck", 
#     default_value=False
# )

# register_new_baseline_model_bias_param = ParameterBoolean(
#     name="RegisterNewModelBiasBaseline",
#     default_value=False
# )

# supplied_baseline_constraints_model_bias_param = ParameterString(
#     name="ModelBiasSuppliedBaselineConstraints", 
#     default_value=""
# )

# # Set model explainability check params
# skip_check_model_explainability_param = ParameterBoolean(
#     name="SkipModelExplainabilityCheck", 
#     default_value=False
# )

# register_new_baseline_model_explainability_param = ParameterBoolean(
#     name="RegisterNewModelExplainabilityBaseline",
#     default_value=False
# )

# supplied_baseline_constraints_model_explainability_param = ParameterString(
#     name="ModelExplainabilitySuppliedBaselineConstraints", 
#     default_value=""
# )

# Set model approval param
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)

# Set accuracy threshold
# model performance step parameters
accuracy_threshold_param = ParameterFloat(name="AccuracyThreshold", default_value=0.75)

In [42]:
# upload the data to s3 bucket
data_s3 = sess.upload_data(path="./data", key_prefix=f'{read_bucket}/{read_prefix}')
print(data_s3)

s3://sagemaker-us-east-1-600187469140/mle-group7-project/data


In [43]:
%%writefile preprocessing.py

import pandas as pd
import numpy as np
import os
from sklearn.model_selection import train_test_split

# Set local path prefix in the processing container
input_data_path = os.path.join(r"/opt/ml/processing/input", "healthcare-dataset-stroke-data.csv")

# df = pd.read_csv('data/healthcare-dataset-stroke-data.csv')
df = pd.read_csv(input_data_path)
df.dropna(inplace=True)
df = df[df['gender']!='Other']

df = pd.concat([df, pd.get_dummies(df[['gender', 'work_type', 'Residence_type', 'smoking_status']], drop_first=True)], axis=1)
df['ever_married'] = np.where(df['ever_married']=='Yes', 1, 0)
df['work_type_Never_worked'] = df['work_type_Never_worked'] + df['work_type_children']
               
df.drop(['id', 'gender', 'work_type', 'Residence_type', 'smoking_status', 'work_type_children'], axis=1, inplace=True)

df = pd.concat([df['stroke'], df.drop(['stroke'], axis=1)], axis=1)

print("Shape of data is:", df.shape)
train, test = train_test_split(df, test_size=0.2)
test, validation = train_test_split(test, test_size=0.5)

# try:
#     os.makedirs("/opt/ml/processing/train")
#     os.makedirs("/opt/ml/processing/validation")
#     os.makedirs("/opt/ml/processing/test")
#     # os.makedirs(f"{bucket}/output/train")
#     # os.makedirs(f"{bucket}/output/validation")
#     # os.makedirs(f"{bucket}/output/test")
#     print("Successfully created directories")
# except Exception as e:
#     # if the Processing call already creates these directories (or directory otherwise cannot be created)
#     print(e)
#     print("Could not make directories")
#     pass

# try:
train.to_csv(r"/opt/ml/processing/output/train/train.csv", index=False)
validation.to_csv(r"/opt/ml/processing/output/validation/validation.csv", index=False)
test.to_csv(r"/opt/ml/processing/output/test/test.csv", index=False)
df.to_csv(r"/opt/ml/processing/output/full/df.csv", index=False)

    # train.to_csv(f"{bucket}/output/train/train.csv")
    # validation.to_csv(f"{bucket}/output/validation/validation.csv")
    # test.to_csv(f"{bucket}/output/test/test.csv")
#     print("Wrote files successfully")
# except Exception as e:
#     print("Failed to write the files")
#     print(e)
#     pass

# print("Completed running the processing job")


Overwriting preprocessing.py


In [44]:
from sagemaker.workflow.pipeline_context import PipelineSession
# Upload processing script to S3
s3_client.upload_file(
    Filename="preprocessing.py", Bucket=write_bucket, Key=f"{write_prefix}/scripts/preprocessing.py"
)

# Define the SKLearnProcessor configuration
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_count=1,
    instance_type=process_instance_type,
    base_job_name=f"{base_job_name_prefix}-processing",
)

# Define pipeline processing step
process_step = ProcessingStep(
    name="DataProcessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=data_uri, destination="/opt/ml/processing/input")
    ],
    outputs=[
        ProcessingOutput(destination=f"{processing_output_uri}/train_data", output_name="train_data", source="/opt/ml/processing/output/train"),
        ProcessingOutput(destination=f"{processing_output_uri}/validation_data", output_name="validation_data", source="/opt/ml/processing/output/validation"),
        ProcessingOutput(destination=f"{processing_output_uri}/test_data", output_name="test_data", source="/opt/ml/processing/output/test"),
        ProcessingOutput(destination=f"{processing_output_uri}/processed_data", output_name="processed_data", source="/opt/ml/processing/output/full")
    ],
    # job_arguments=[
    #     "--train-ratio", "0.8", 
    #     "--validation-ratio", "0.1",
    #     "--test-ratio", "0.1"
    # ],
    code=f"s3://{write_bucket}/{write_prefix}/scripts/preprocessing.py"
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [45]:
%%writefile xgboost_train.py

import argparse
import os
import joblib
import json
import pandas as pd
import xgboost as xgb
from sklearn.metrics import roc_auc_score, accuracy_score
import numpy as np

if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    # Hyperparameters and algorithm parameters are described here
    parser.add_argument("--num_round", type=int, default=100)
    parser.add_argument("--max_depth", type=int, default=3)
    parser.add_argument("--eta", type=float, default=0.2)
    parser.add_argument("--subsample", type=float, default=0.9)
    parser.add_argument("--colsample_bytree", type=float, default=0.8)
    parser.add_argument("--objective", type=str, default="binary:logistic")
    parser.add_argument("--eval_metric", type=str, default="error")
    parser.add_argument("--nfold", type=int, default=3)
    parser.add_argument("--early_stopping_rounds", type=int, default=3)
    

    # SageMaker specific arguments. Defaults are set in the environment variables
    # Set location of input training data
    parser.add_argument("--train_data_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    # Set location of input validation data
    parser.add_argument("--validation_data_dir", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))
    # Set location where trained model will be stored. Default set by SageMaker, /opt/ml/model
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    # Set location where model artifacts will be stored. Default set by SageMaker, /opt/ml/output/data
    parser.add_argument("--output_data_dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR"))
    
    args = parser.parse_args()

    data_train = pd.read_csv(f"{args.train_data_dir}/train.csv")
    train = data_train.drop("stroke", axis=1)
    label_train = pd.DataFrame(data_train["stroke"])
    dtrain = xgb.DMatrix(train, label=label_train)
    
    
    data_validation = pd.read_csv(f"{args.validation_data_dir}/validation.csv")
    validation = data_validation.drop("stroke", axis=1)
    label_validation = pd.DataFrame(data_validation["stroke"])
    dvalidation = xgb.DMatrix(validation, label=label_validation)
    
    # Choose XGBoost model hyperparameters
    params = {"max_depth": args.max_depth,
              "eta": args.eta,
              "objective": args.objective,
              "subsample" : args.subsample,
              "colsample_bytree":args.colsample_bytree
             }
    
    num_boost_round = args.num_round
    nfold = args.nfold
    early_stopping_rounds = args.early_stopping_rounds
    
    # Cross-validate train XGBoost model
    cv_results = xgb.cv(
        params=params,
        dtrain=dtrain,
        num_boost_round=num_boost_round,
        nfold=nfold,
        early_stopping_rounds=early_stopping_rounds,
        metrics=["error"],
        seed=42,
    )
    
    model = xgb.train(params=params, dtrain=dtrain, num_boost_round=len(cv_results))
    
    train_pred = np.where(model.predict(dtrain)>0.5, 1, 0)
    validation_pred = np.where(model.predict(dvalidation)>0.5, 1, 0)
    
    train_accuracy = accuracy_score(label_train, train_pred)
    validation_accuracy = accuracy_score(label_validation, validation_pred)
    
    print(f"[0]#011train-accuracy:{train_accuracy:.2f}")
    print(f"[0]#011validation-accuracy:{validation_accuracy:.2f}")

    metrics_data = {"hyperparameters" : params,
                    "binary_classification_metrics": {"validation:accuracy": {"value": validation_accuracy},
                                                      "train:accuracy": {"value": train_accuracy}
                                                     }
                   }
              
    # Save the evaluation metrics to the location specified by output_data_dir
    metrics_location = args.output_data_dir + "/metrics.json"
    
    # Save the trained model to the location specified by model_dir
    model_location = args.model_dir + "/xgboost-model"

    with open(metrics_location, "w") as f:
        json.dump(metrics_data, f)

    with open(model_location, "wb") as f:
        joblib.dump(model, f)

Overwriting xgboost_train.py


In [46]:
# Set XGBoost model hyperparameters 
hyperparams = {  
    "eval_metric": "error",
    "objective": "binary:logistic",
    "num_round": "5",
    "max_depth":"5",
    "subsample":"0.75",
    "colsample_bytree":"0.75",
    "eta":"0.5"
}

# Set XGBoost estimator
xgb_estimator = XGBoost(
    entry_point="xgboost_train.py", 
    output_path=estimator_output_uri,
    code_location=estimator_output_uri,
    hyperparameters=hyperparams,
    role=sagemaker_role,
    # Fetch instance type and count from pipeline parameters
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    framework_version="1.3-1"
)

# Access the location where the preceding processing step saved train and validation datasets
# Pipeline step properties can give access to outputs which can be used in succeeding steps
s3_input_train = TrainingInput(
    s3_data=process_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri, 
    content_type="csv", 
    s3_data_type="S3Prefix"
)

s3_input_validation = TrainingInput(
    s3_data=process_step.properties.ProcessingOutputConfig.Outputs["validation_data"].S3Output.S3Uri,
    content_type="csv",
    s3_data_type="S3Prefix"
)

# Set pipeline training step
train_step = TrainingStep(
    name="XGBModelTraining",
    estimator=xgb_estimator,
    inputs={
    "train":s3_input_train, # Train channel 
    "validation": s3_input_validation # Validation channel
    }
)

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m4.xlarge.


In [34]:
# xgb_estimator.fit(inputs={
#     "train": f'{processing_output_uri}/train_data/train.csv', # Train channel 
#     "validation": f'{processing_output_uri}/validation_data/validation.csv'# Validation channel
#     })

INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2023-06-11-10-28-14-037


2023-06-11 10:28:14 Starting - Starting the training job...
2023-06-11 10:28:49 Starting - Preparing the instances for training......
2023-06-11 10:29:54 Downloading - Downloading input data...
2023-06-11 10:30:25 Training - Downloading the training image......
2023-06-11 10:31:10 Training - Training image download completed. Training in progress..[34m[2023-06-11 10:31:19.410 ip-10-0-218-36.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2023-06-11 10:31:19.440 ip-10-0-218-36.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2023-06-11:10:31:19:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2023-06-11:10:31:19:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2023-06-11:10:31:19:INFO] Invoking user training script.[0m
[34m[2023-06-11:10:31:19:INFO] Module xgboost_train does not provide a setup.py. [0m
[34mGenerating setup.py[0m
[34m[2023-06-11:10:31:19:INFO] Generating set

In [47]:
# Create a SageMaker model
model = sagemaker.model.Model(
    image_uri=training_image,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=sagemaker_role
)

# Specify model deployment instance type
inputs = sagemaker.inputs.CreateModelInput(instance_type=deploy_instance_type_param)

create_model_step = CreateModelStep(name="StrokePredictionModel", model=model, inputs=inputs)

In [155]:
%%writefile evaluate.py

import json
import logging
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost as xgb

from sklearn.metrics import roc_auc_score, accuracy_score

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

feature_columns = [
    "age",
    "hypertension",
    "heart_disease",
    "ever_married",
    "avg_glucose_level",
    "bmi",
    "gender_Male",
    "work_type_Never_worked",
    "work_type_Private",
    "work_type_Self-employed",
    "Residence_type_Urban",
    "smoking_status_formerly smoked",
    "smoking_status_never smoked",
    "smoking_status_smokes"
]

if __name__ == "__main__":
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading xgboost model.")
    # The name of the file should match how the model was saved in the training script
    model = pickle.load(open("xgboost-model", "rb"))

    logger.debug("Reading test data.")
    test_local_path = "/opt/ml/processing/test/test.csv"
    df_test = pd.read_csv(test_local_path)
    
    # Extract test set target column
    y_test = df_test.iloc[:, 0].values
   
    # cols_when_train = model.feature_names
    cols_when_train = feature_columns
    # Extract test set feature columns
    X = df_test[cols_when_train].copy()
    X_test = xgb.DMatrix(X)

    logger.info("Generating predictions for test data.")
    pred = np.where(model.predict(X_test)>0.5, 1, 0)
    
    # Calculate model evaluation score
    logger.debug("Calculating Accuracy score.")
    score = accuracy_score(y_test, pred)
    metric_dict = {
        "classification_metrics": {"accuracy": {"value": score}}
    }
    
    # Save model evaluation metrics
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info("Writing evaluation report with Accuracy: %f", score)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(metric_dict))

Overwriting evaluate.py


In [158]:
# Upload model evaluation script to S3
s3_client.upload_file(
    Filename="evaluate.py", Bucket=write_bucket, Key=f"{write_prefix}/scripts/evaluate.py"
)

eval_processor = ScriptProcessor(
    image_uri=training_image,
    command=["python3"],
    instance_type=predictor_instance_type,
    instance_count=predictor_instance_count,
    base_job_name=f"{base_job_name_prefix}-model-eval",
    sagemaker_session=sess,
    role=sagemaker_role,
)
evaluation_report = PropertyFile(
    name="StrokePredictionEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

# Set model evaluation step
evaluation_step = ProcessingStep(
    name="XGBModelEvaluate",
    processor=eval_processor,
    inputs=[
        ProcessingInput(
            # Fetch S3 location where train step saved model artifacts
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            # Fetch S3 location where processing step saved test data
            source=process_step.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(destination=f"{model_eval_output_uri}", output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code=f"s3://{write_bucket}/{write_prefix}/scripts/evaluate.py",
    property_files=[evaluation_report],
)

In [159]:
# Define register model step
register_step = RegisterModel(
    name="XGBRegisterModel",
    estimator=xgb_estimator,
    # Fetching S3 location where train step saved model artifacts
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[predictor_instance_type],
    transform_instances=[predictor_instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param
    # Registering baselines metrics that can be used for model monitoring
    # model_metrics=model_metrics
)

In [160]:
%%writefile lambda_deployer.py

"""
Lambda function creates an endpoint configuration and deploys a model to real-time endpoint. 
Required parameters for deployment are retrieved from the event object
"""

import json
import boto3


def lambda_handler(event, context):
    sm_client = boto3.client("sagemaker")

    # Details of the model created in the Pipeline CreateModelStep
    model_name = event["model_name"]
    model_package_arn = event["model_package_arn"]
    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]
    role = event["role"]
    instance_type = event["instance_type"]
    instance_count = event["instance_count"]
    primary_container = {"ModelPackageName": model_package_arn}

    # Create model
    model = sm_client.create_model(
        ModelName=model_name,
        PrimaryContainer=primary_container,
        ExecutionRoleArn=role
    )

    # Create endpoint configuration
    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
        {
            "VariantName": "Alltraffic",
            "ModelName": model_name,
            "InitialInstanceCount": instance_count,
            "InstanceType": instance_type,
            "InitialVariantWeight": 1
        }
        ]
    )

    # Create endpoint
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, 
        EndpointConfigName=endpoint_config_name
    )

Overwriting lambda_deployer.py


In [161]:
# The function name must contain sagemaker
function_name = "sagemaker-stroke-prediction-demo-lambda-step"
# Define Lambda helper class can be used to create the Lambda function required in the Lambda step
func = Lambda(
    function_name=function_name,
    execution_role_arn=sagemaker_role,
    script="lambda_deployer.py",
    handler="lambda_deployer.lambda_handler",
    timeout=600,
    memory_size=10240,
)

# The inputs used in the lambda handler are passed through the inputs argument in the 
# LambdaStep and retrieved via the `event` object within the `lambda_handler` function

lambda_deploy_step = LambdaStep(
    name="LambdaStepRealTimeDeploy",
    lambda_func=func,
    inputs={
        "model_name": pipeline_model_name,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "model_package_arn": register_step.steps[0].properties.ModelPackageArn,
        "role": sagemaker_role,
        "instance_type": deploy_instance_type_param,
        "instance_count": deploy_instance_count_param
    }
)

In [162]:
# Evaluate model performance on test set
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="classification_metrics.accuracy.value",
    ),
    right=accuracy_threshold_param, # Threshold to compare model performance against
)
condition_step = ConditionStep(
    name="CheckStrokeDetectionXGBEvaluation",
    conditions=[cond_gte],
    if_steps=[create_model_step, register_step, lambda_deploy_step], 
    else_steps=[]
)

In [163]:
# Create the Pipeline with all component steps and parameters
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[process_instance_type_param, 
                train_instance_type_param, 
                train_instance_count_param, 
                deploy_instance_type_param,
                deploy_instance_count_param,
                model_approval_status_param,
                accuracy_threshold_param
               ],
    steps=[
        process_step,
        train_step,
        evaluation_step,
        condition_step
    ],
    sagemaker_session=sess
    
)

In [164]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)

# Full Pipeline description
pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.xlarge'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m4.xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'DeployInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m4.xlarge'},
  {'Name': 'DeployInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'AccuracyThreshold', 'Type': 'Float', 'DefaultValue': 0.75}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'DataProcessing',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.c5.xlarge',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'App

In [58]:
# Start the pipeline execution
execution = pipeline.start()
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [None]:
# Delete the Lambda function
func.delete()

# Delete the endpoint
sm_client.delete_endpoint(EndpointName=endpoint_name)

# Delete the EndpointConfig
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

# Delete the model
sm_client.delete_model(ModelName=pipeline_model_name)

# Delete the pipeline
sm_client.delete_pipeline(PipelineName=pipeline_name)

In [None]:
!python 'TeleBot.py'

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
2023-06-17 07:29:22,851 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot5995304835:AAEej9qI-jAGSOLhwAyP1cnDVOzpUGlVacU/getMe "HTTP/1.1 200 OK"
2023-06-17 07:29:22,938 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot5995304835:AAEej9qI-jAGSOLhwAyP1cnDVOzpUGlVacU/deleteWebhook "HTTP/1.1 200 OK"
2023-06-17 07:29:22,939 - telegram.ext.Application - INFO - Application started
2023-06-17 07:29:23,201 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot5995304835:AAEej9qI-jAGSOLhwAyP1cnDVOzpUGlVacU/getUpdates "HTTP/1.1 200 OK"
2023-06-17 07:29:23,488 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot5995304835:AAEej9qI-jAGSOLhwAyP1cnDVOzpUGlVacU/sendMessage "HTTP/1.1 200 OK"
2023-06-