In [30]:
# Update SageMaker to the latest version
!pip install -U sagemaker



In [31]:
# Import libraries and setup session
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = "arn:aws:iam::619071320705:role/service-role/AmazonSageMaker-ExecutionRole-20250204T212210"
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"

In [32]:
# Create directories for data and code
!mkdir -p data
!mkdir -p code

In [33]:
# Download and upload dataset to S3
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-example-files-prod-{region}").download_file(
    "datasets/tabular/uci_abalone/abalone.csv", local_path
)

base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-ap-southeast-1-619071320705/abalone/abalone-dataset.csv


In [34]:
# Download and upload batch dataset to S3
local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch", local_path
)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-ap-southeast-1-619071320705/abalone/abalone-dataset-batch


In [35]:
# Define pipeline parameters
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

In [36]:
# Create directory for code
!mkdir -p code

In [37]:
# Setup SKLearnProcessor
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [38]:
# Define processing step
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="AbaloneProcess", step_args=processor_args)

In [39]:
# Setup XGBoost Estimator
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [40]:
# Define training step
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    step_args=train_args,
)

In [41]:
# Setup ScriptProcessor for evaluation
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

In [42]:
# Define evaluation step
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

In [43]:
# Create model
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

In [44]:
# Define model step
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="AbaloneCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

In [45]:
# Setup transformer
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform",
)


In [46]:
# Define transform step
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

In [47]:
# Define model metrics and register model
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="AbaloneRegisterModel", step_args=register_args)

In [None]:
# Deploy the model to a SageMaker Endpoint
# predictor = model.deploy(
#     initial_instance_count=1,
#     instance_type="ml.m5.large",  # Adjust instance type if needed
#     endpoint_name="sm-pipeline-model-endpoint"
# )
# step_deploy = ModelStep(name="AbaloneDeployModel", endpoint_name=predictor.endpoint_name)

In [48]:
# Define fail step
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

In [49]:
# Define condition step
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

In [57]:
# Create IAM role for Lambda function
from iam_helper import create_sagemaker_lambda_role

lambda_role = create_sagemaker_lambda_role("deploy-model-lambda-role")

Waiting 30 seconds for the IAM role to propagate


In [61]:
# Define Lambda step for model deployment
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda
import time

current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())
endpoint_config_name = "tf2-california-housing-endpoint-config"
endpoint_name = "sm-pipeline-endpoint-" + current_time

deploy_model_lambda_function_name = "sagemaker-deploy-model-lambda-" + current_time

deploy_model_lambda_function = Lambda(
    function_name=deploy_model_lambda_function_name,
    execution_role_arn=lambda_role,
    script="deploy_model_lambda.py",
    handler="deploy_model_lambda.lambda_handler",
)

step_lower_mse_deploy_model_lambda = LambdaStep(
    name="Deploy-California-Housing-Model-To-Endpoint",
    lambda_func=deploy_model_lambda_function,
    inputs={
        "model_name": step_create_model.properties.ModelName,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "endpoint_instance_type": "ml.m5.large",
    },
)

In [67]:
# Create IAM role for Lambda function to send email
from iam_helper import create_s3_lambda_role

lambda_role = create_s3_lambda_role("send-email-to-ds-team-lambda-role")

Waiting 30 seconds for the IAM role to propagate


In [None]:
# Define Lambda step to send email
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

evaluation_s3_uri = "{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

send_email_lambda_function_name = (
    "sagemaker-send-email-to-ds-team-lambda-" + current_time
)

send_email_lambda_function = Lambda(
    function_name=send_email_lambda_function_name,
    execution_role_arn=lambda_role,
    script="send_email_lambda.py",
    handler="send_email_lambda.lambda_handler",
)

step_higher_mse_send_email_lambda = LambdaStep(
    name="Send-Email-To-DS-Team",
    lambda_func=send_email_lambda_function,
    inputs={"evaluation_s3_uri": evaluation_s3_uri},
)

In [62]:
# Define pipeline
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[
        step_process,
        step_train,
        step_eval,
        step_cond,
        step_lower_mse_deploy_model_lambda,
        step_higher_mse_send_email_lambda,
    ],
)

In [None]:
# Lambda function to send email
%%writefile send_email_lambda.py

"""
This Lambda function sends an E-Mail to the Data Science team with the MSE from model evaluation step. 
The evaluation.json location in S3 is provided via the `event` argument
"""
# import json
# def lambda_handler(event, context):
#     print(f"Received Event: {event}")

#     evaluation_s3_uri = event["evaluation_s3_uri"]
#     path_parts = evaluation_s3_uri.replace("s3://", "").split("/")
#     bucket = path_parts.pop(0)
#     key = "/".join(path_parts)

#     content = s3_client.get_object(Bucket=bucket, Key=key)
#     text = content["Body"].read().decode()
#     evaluation_json = json.loads(text)
#     mse = evaluation_json["regression_metrics"]["mse"]["value"]

#     subject_line = "Please check high MSE ({}) detected on model evaluation".format(mse)
#     print(f"Sending E-Mail to Data Science Team with subject line: {subject_line}")

#     # TODO - ADD YOUR CODE TO SEND EMAIL...

#     return {"statusCode": 200, "body": json.dumps("E-Mail Sent Successfully")}

In [63]:
# Print pipeline definition
import json


definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-southeast-1-619071320705/abalone/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-southeast-1-619071320705/abalone/abalone-dataset-batch'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 6.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'G

In [64]:
# Upsert pipeline
pipeline.upsert(role_arn=role)


{'PipelineArn': 'arn:aws:sagemaker:ap-southeast-1:619071320705:pipeline/AbalonePipeline',
 'ResponseMetadata': {'RequestId': '72260e10-0f1d-42da-980b-9d7000a749ce',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '72260e10-0f1d-42da-980b-9d7000a749ce',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Thu, 06 Feb 2025 01:54:38 GMT'},
  'RetryAttempts': 0}}

In [65]:
# Start pipeline execution
execution = pipeline.start()
