This script sets up a Sagemaker pipeline that does the following steps:

1. Upload training data to s3
2. Upload test data to s3
3. Adds a preprocessing step in the SageMaker Pipeline. The preprocessor makes use of the code in file "preprocessing.py"
4. Invokes the default Xgboost algorithm container from sagemaker to train a Xgboost model on pre processed data.
5. Adds a step in the pipeline to evaluate the performance of the trained model on test dataset.
6. Adds a condition step to check if the performance of the model is within acceptable parameters.
7. If the model passes the condition, it is registered in the SageMaker domain for the region.
8. Upon passing the condition, trained model is readied for inference and a "Transformer" step is also added to the pipeline to make batch prediction possible.

Later on, the registered model is deployed from the Sagemaker studio UI and the deployed model in real time inference is invoked with a single datapoint and the model prediction is recorded.


##### Error notification:
Step 8 in the pipeline, where a batch tranformer endpoint for the model is supposed to be deployed, does not succeed. Following is the reason of failure:
  
  'FailureReason': "ClientError: Failed to invoke sagemaker:CreateTransformJob. Error Details: The account-level service limit 'ml.m5.large for transform job usage' is 0 Instances, with current utilization of 0 Instances and a request delta of 1 Instances. Please use AWS Service Quotas to request an increase for this quota. If AWS Service Quotas is not available, contact AWS support to request an increase for this quota.\nRetry not appropriate on execution of step with PipelineExecutionArn arn:aws:sagemaker:us-west-1:323732946862:pipeline/xgboostpipeline/execution/ht96q5q90w6x and StepId XgboostTransform. No retry policy configured for the exception type SAGEMAKER_RESOURCE_LIMIT.",

In [2]:
import boto3
import sagemaker
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

pipeline_session = PipelineSession()

model_package_group_name = f"SallaJobTestXgboostModel"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [3]:

local_path = "./financial_fraud_training_set.csv"


base_uri = f"s3://{default_bucket}/dataset/train"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-west-1-323732946862/dataset/train/financial_fraud_training_set.csv


In [4]:
local_path = "./financial_fraud_test_set.csv"

base_uri = f"s3://{default_bucket}/dataset/test"
test_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(test_data_uri)

s3://sagemaker-us-west-1-323732946862/dataset/test/financial_fraud_test_set.csv


In [5]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

region = ParameterString(
    name='Region',
    default_value='us-west-1'
)

# processing_instance_count = ParameterInteger(
#     name='ProcessingInstanceCount',
#     default_value=1
# )

# processing_instance_type = ParameterString(
#     name='ProcessingInstanceType',
#     default_value='ml.t2.medium'
# )

# training_instance_type = ParameterString(
#     name='TrainingInstanceType',
#     default_value='ml.m4.xlarge'
# )

# training_instance_count = ParameterInteger(
#     name='TrainingInstanceCount',
#     default_value=1
# )


model_approval_status = ParameterString(
    name='ModelApprovalStatus',
    default_value='PendingManualApproval'
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
test_data = ParameterString(
    name="TestData",
    default_value=test_data_uri,
)


In [6]:
from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(
    framework_version='1.0-1',#'1.2-1',
    role=role,
    instance_type='ml.t3.medium',#processing_instance_type,
    instance_count=1)#processing_instance_count)

In [7]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name='process-financial-fraud-data',
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(
                output_name='min_max_scalar',
                source='/opt/ml/processing/output/scaler'),#/min_max_scalar.gz
        ProcessingOutput(
                output_name='pre_processed_train_data',
                source='/opt/ml/processing/output/processed_data/train'),
        ProcessingOutput(
                output_name='pre_processed_val_data',
                source='/opt/ml/processing/output/processed_data/val'),
    ],
    code='./preprocessing.py',
    job_arguments=[
               '--filename', 'financial_fraud_training_set.csv'
    ]
)

In [None]:
## The class imbalnace is noted so that the value of the hyperparameter "scale_pos_weight" could be derived from the ratio of 
# number of negative class to number of positive class instances.
import pandas as pd
train_df = pd.read_csv("./financial_fraud_training_set.csv")
train_df['Class'].value_counts()

## output:
# Class
# 0    16900
# 1     2129
# Name: count, dtype: int64

In [9]:
from sagemaker.estimator import Estimator

model_path = f"s3://{default_bucket}/model_artifacts/"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m4.xlarge"
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m4.xlarge",
    instance_count=1,
    output_path=model_path,
    sagemaker_session=pipeline_session,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="binary:logistic",
    learning_rate=0.1,
    max_depth=5,
    eta=0.2,
    gamma=0,
    alpha=0,
    scale_pos_weight=16900/2129,
    min_child_weight=1,
#     n_estimators=300,
    num_round=50,
    subsample=0.7,
    eval_metric='auc'
)

The input argument region of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [10]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "pre_processed_train_data"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "pre_processed_val_data"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
    },
)

step_train = TrainingStep(
    name="xgboost_train",
    step_args = train_args
)



In [11]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name="xgboost_eval",
    sagemaker_session=pipeline_session,
    role=role,
)

In [12]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

eval_args = script_eval.run(
        inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "min_max_scalar"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/input/scaler"
        ),
      ProcessingInput(source=test_data, destination="/opt/ml/processing/input/test"),  
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="evaluation.py",
)

step_eval = ProcessingStep(
    name="xgboost_eval_step",
    step_args=eval_args,
    property_files=[evaluation_report],
)

In [13]:
from sagemaker.model import Model


model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

from sagemaker.inputs import CreateModelInput


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

from sagemaker.workflow.steps import CreateModelStep


step_create_model = CreateModelStep(
    name="XgboostCreateModel",
    model=model,
    inputs=inputs,
)

In [14]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.large",
    instance_count=1,
    output_path=f"s3://{default_bucket}/XgboostTransform"
)

from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="XgboostTransform",
    transformer=transformer,
    inputs=TransformInput(data=test_data)
)

In [15]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="XgboostRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium","ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)



In [16]:
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_roc_auc_gte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.roc_auc.value"
    ),
    right=0.2
)

step_cond = ConditionStep(
    name="XgboostModelDeploymetnCondition",
    conditions=[cond_roc_auc_gte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[], 
)

In [17]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"XgboostPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        model_approval_status,
        input_data,
        test_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [18]:
import json

json.loads(pipeline.definition())



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-1-323732946862/dataset/train/financial_fraud_training_set.csv'},
  {'Name': 'TestData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-1-323732946862/dataset/test/financial_fraud_test_set.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'process-financial-fraud-data',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.t3.medium',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3',
     'ContainerArguments': ['--filename', 'financial_fr

In [19]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-west-1:323732946862:pipeline/XgboostPipeline',
 'ResponseMetadata': {'RequestId': 'a5234e27-526f-4857-8967-8f01dfc11c93',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a5234e27-526f-4857-8967-8f01dfc11c93',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Sun, 20 Oct 2024 01:04:50 GMT'},
  'RetryAttempts': 0}}

In [20]:
execution = pipeline.start()

In [21]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-1:323732946862:pipeline/XgboostPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-1:323732946862:pipeline/XgboostPipeline/execution/ht96q5q90w6x',
 'PipelineExecutionDisplayName': 'execution-1729386291352',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 10, 20, 1, 4, 51, 269000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 10, 20, 1, 4, 51, 269000, tzinfo=tzlocal()),
 'CreatedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::323732946862:assumed-role/AmazonSageMaker-ExecutionRole-20241019T191278/SageMaker',
   'PrincipalId': 'AROAUWX74X6XGT7RKQ52Z:SageMaker'}},
 'LastModifiedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::323732946862:assumed-role/AmazonSageMaker-ExecutionRole-20241019T191278/SageMaker',
   'PrincipalId': 'AROAUWX74X6XGT7RKQ52Z:SageMaker'}},
 'ResponseMetadata': {'RequestId': 'bad4fb52-f66b-400b-b744-262dcd51546a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requesti

In [22]:
execution.wait()

## NOTE: my service quota is limited so the "Transformer" step fails and the model is not deployed for batch inference. 
## Please check the error statement below in the listing of the execution step.

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [None]:
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
json.loads(evaluation_json)

In [23]:
execution.list_steps()

[{'StepName': 'XgboostTransform',
  'StartTime': datetime.datetime(2024, 10, 20, 1, 17, 31, 766000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 10, 20, 1, 17, 33, 122000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'FailureReason': "ClientError: Failed to invoke sagemaker:CreateTransformJob. Error Details: The account-level service limit 'ml.m5.large for transform job usage' is 0 Instances, with current utilization of 0 Instances and a request delta of 1 Instances. Please use AWS Service Quotas to request an increase for this quota. If AWS Service Quotas is not available, contact AWS support to request an increase for this quota.\nRetry not appropriate on execution of step with PipelineExecutionArn arn:aws:sagemaker:us-west-1:323732946862:pipeline/xgboostpipeline/execution/ht96q5q90w6x and StepId XgboostTransform. No retry policy configured for the exception type SAGEMAKER_RESOURCE_LIMIT.",
  'Metadata': {},
  'AttemptCount': 1},
 {'StepName': 'XgboostRegisterModel-Regis

### Model inference on endpoint

As can be seen in the cell above, the model was succesfully registered. I then moved to Sagemaker studio to approve the deployment of the model manually. The deployed can now be checked for inference.

In [38]:
single_test_datapoint = "-4.81583414e-02, -1.02952475e-01,  1.50507724e+00, -1.02643563e-01,4.28030290e-01,  9.78738567e-01,  5.69525291e-01, -8.93545196e-02,1.05594161e+00,  5.00859415e-01,  3.57255991e-01, -8.55397120e-02,1.25618053e+00,  2.04618270e+00, -9.84367965e-01, -1.33800249e-01,1.02578001e+00,  1.04825074e-01,  1.60931623e-01, -1.42984202e-01,-1.42745571e-01,  2.34665992e-01, -1.51245953e-01, -5.77620800e-01,-4.68273695e-01, -6.13373917e-01,  1.19734034e-01,  3.17617507e-01,1.58102700e+04"

In [39]:
single_test_datapoint

'-4.81583414e-02, -1.02952475e-01,  1.50507724e+00, -1.02643563e-01,4.28030290e-01,  9.78738567e-01,  5.69525291e-01, -8.93545196e-02,1.05594161e+00,  5.00859415e-01,  3.57255991e-01, -8.55397120e-02,1.25618053e+00,  2.04618270e+00, -9.84367965e-01, -1.33800249e-01,1.02578001e+00,  1.04825074e-01,  1.60931623e-01, -1.42984202e-01,-1.42745571e-01,  2.34665992e-01, -1.51245953e-01, -5.77620800e-01,-4.68273695e-01, -6.13373917e-01,  1.19734034e-01,  3.17617507e-01,1.58102700e+04'

In [40]:
import boto3
client = boto3.client('sagemaker-runtime')
response = client.invoke_endpoint(
    EndpointName="SallaJobTestXgboostModel-1729387379776-Endpoint-20241020-012303",
    Body= single_test_datapoint,
    ContentType = 'text/csv')

print("prediction from the deployed model: " + str(json.loads(response['Body'].read())))

prediction from the deployed model: 0.2861855626106262


#### triggering new model training upon data versioning

In [43]:
## upload new data to S3
local_path = "./financial_fraud_training_second_version.csv"


base_uri = f"s3://{default_bucket}/dataset/train"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-west-1-323732946862/dataset/train//financial_fraud_training_second_version.csv


In [46]:
## trigger the pipeline and change the value of InputData parameter
execution = pipeline.start(
    parameters=dict(
        InputData=input_data_uri,
    )
)

In [47]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-1:323732946862:pipeline/XgboostPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-1:323732946862:pipeline/XgboostPipeline/execution/rpa2caoz3zsu',
 'PipelineExecutionDisplayName': 'execution-1729390583548',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'xgboostpipeline',
  'TrialName': 'rpa2caoz3zsu'},
 'CreationTime': datetime.datetime(2024, 10, 20, 2, 16, 23, 494000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 10, 20, 2, 16, 23, 494000, tzinfo=tzlocal()),
 'CreatedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::323732946862:assumed-role/AmazonSageMaker-ExecutionRole-20241019T191278/SageMaker',
   'PrincipalId': 'AROAUWX74X6XGT7RKQ52Z:SageMaker'}},
 'LastModifiedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::323732946862:assumed-role/AmazonSageMaker-ExecutionRole-20241019T191278/SageMaker',
   'PrincipalId': 'AROAUWX74X6XGT7RKQ52Z:SageMaker'}},
 'ResponseMetadata': {'RequestId'

In [None]:
execution.wait()