In [1]:
import boto3

# Initialize the S3 client
s3_client = boto3.client('s3')

# Define the bucket name and prefix
bucket_name = 'housing-dataset-5435v3'
prefix = 'models/'

# List the objects in the specified S3 path
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

# Check if the specified path contains any objects
if 'Contents' in response:
    for obj in response['Contents']:
        print(obj['Key'])
else:
    print(f"No objects found in s3://{bucket_name}/{prefix}")

'''
import boto3

# Initialize the S3 client
s3_client = boto3.client('s3')

# Define the bucket name and prefix
bucket_name = 'housing-dataset-5435v3'
prefix = 'models/'

# List the objects in the specified S3 path
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

# Check if the specified path contains any objects
if 'Contents' in response:
    delete_objects = {'Objects': [{'Key': obj['Key']} for obj in response['Contents']]}
    s3_client.delete_objects(Bucket=bucket_name, Delete=delete_objects)
    print(f"Deleted all objects in s3://{bucket_name}/{prefix}")
else:
    print(f"No objects found in s3://{bucket_name}/{prefix}")

'''

No objects found in s3://housing-dataset-5435v3/models/


'\nimport boto3\n\n# Initialize the S3 client\ns3_client = boto3.client(\'s3\')\n\n# Define the bucket name and prefix\nbucket_name = \'housing-dataset-5435v3\'\nprefix = \'models/\'\n\n# List the objects in the specified S3 path\nresponse = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)\n\n# Check if the specified path contains any objects\nif \'Contents\' in response:\n    delete_objects = {\'Objects\': [{\'Key\': obj[\'Key\']} for obj in response[\'Contents\']]}\n    s3_client.delete_objects(Bucket=bucket_name, Delete=delete_objects)\n    print(f"Deleted all objects in s3://{bucket_name}/{prefix}")\nelse:\n    print(f"No objects found in s3://{bucket_name}/{prefix}")\n\n'

In [2]:
!pip install -U sagemaker



In [3]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

%store -r bucket_name
%store -r role
%store -r image_uri
%store -r model_data

# Define S3 paths for data and model artifacts
train_data = f's3://{bucket_name}/processed/housing/train.csv'
validation_data = f's3://{bucket_name}/processed/housing/validation.csv'
test_data = f's3://{bucket_name}/processed/housing/test.csv'
model_output = f's3://{bucket_name}/models/'


default_bucket = bucket_name
model_package_group_name = f"HousingModelPackageGroupName"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [4]:
!mkdir -p data

In [5]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

train_data = ParameterString(
    name="TrainData",
    default_value=train_data,
)
validation_data = ParameterString(
    name="ValidationData",
    default_value=validation_data,
)
test_data = ParameterString(
    name="TestData",
    default_value=test_data,
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=700.0)

In [6]:
!mkdir -p code

In [7]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [8]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/HousingTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(s3_data=train_data, content_type="csv"),
        "validation": TrainingInput(s3_data=validation_data, content_type="csv"),
    }
)



In [9]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="HousingTrain",
    step_args=train_args,
)

In [10]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
'''
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)
'''
    df = pd.read_csv(test_path)
    y_test = df['saleprice'].to_numpy()
    df.drop(columns=['saleprice'], axis=1, inplace=True]
    X_test = xgboost.DMatric(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluation.py


Next, create an instance of a `ScriptProcessor` processor and use it in the `ProcessingStep`.

In [11]:
from sagemaker.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-housing-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=test_data,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

In [12]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="HousingEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

In [13]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

Define the `ModelStep` by providing the return values from `model.create()` as the step arguments.

In [14]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="HousingCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

In [15]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/HousingTransform",
)

In [16]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="HousingRegisterModel", step_args=register_args)



In [17]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="HousingMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

In [18]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="HousingMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model],
    else_steps=[step_fail],
)

In [19]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"HousingPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        train_data,
        validation_data,
        test_data,
        mse_threshold,
    ],
    steps=[step_train, step_eval, step_cond],
)

### (Optional) Examining the pipeline definition

The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly.

In [20]:
import json


definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'TrainData',
   'Type': 'String',
   'DefaultValue': 's3://housing-dataset-5435v3/processed/housing/train.csv'},
  {'Name': 'ValidationData',
   'Type': 'String',
   'DefaultValue': 's3://housing-dataset-5435v3/processed/housing/validation.csv'},
  {'Name': 'TestData',
   'Type': 'String',
   'DefaultValue': 's3://housing-dataset-5435v3/processed/housing/test.csv'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 700.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'HousingTrain',
   'Type': 'Training',
   'Arguments': {'A

In [21]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:952054755114:pipeline/HousingPipeline',
 'ResponseMetadata': {'RequestId': '6edaa255-ed0e-4316-aad6-0c2d9b5b999d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6edaa255-ed0e-4316-aad6-0c2d9b5b999d',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Mon, 24 Jun 2024 01:36:56 GMT'},
  'RetryAttempts': 0}}

Start the pipeline and accept all the default parameters.

In [22]:

pipeline_response = pipeline.upsert(role_arn=role)

# Print the pipeline ARN
pipeline_arn = pipeline_response['PipelineArn']

print("Pipeline ARN:", pipeline_arn)
# Initialize the SageMaker client
client = pipeline_session.sagemaker_client

# Start the pipeline execution
execution_response = client.start_pipeline_execution(
    PipelineName=pipeline_arn.split('/')[-1]  # Extract the pipeline name from the ARN
)

# Retrieve the pipeline execution ARN
pipeline_execution_arn = execution_response['PipelineExecutionArn']

# Print the pipeline execution ARN
print("Pipeline Execution ARN:", pipeline_execution_arn)

import time

def wait_for_training_completion(client, pipeline_execution_arn):
    while True:
        steps_response = client.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)
        training_step = next((step for step in steps_response['PipelineExecutionSteps'] if step['StepName'] == 'HousingTrain'), None)
        
        if training_step and training_step['StepStatus'] in ['Succeeded', 'Failed']:
            return training_step['StepStatus'] == 'Succeeded'
        
        print("Waiting 30 seconds")
        time.sleep(30)  # Wait for 30 seconds before polling again


# Wait for the training step to complete
if wait_for_training_completion(client, pipeline_execution_arn):
    print("Training step completed successfully.")
else:
    print("Training step failed.")

# List steps in the pipeline execution
steps_response = client.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)

# Find the training job step and extract the training job ARN
training_job_arn = None
for step in steps_response['PipelineExecutionSteps']:
    if step['StepName'] == 'HousingTrain' and 'Metadata' in step and 'TrainingJob' in step['Metadata']:
        training_job_arn = step['Metadata']['TrainingJob']['Arn']
        break

# Extract the job name from the training job ARN
if training_job_arn:
    training_job_name = training_job_arn.split('/')[-1]
    model_artifact_path = f"s3://housing-dataset-5435v3/HousingTrain/{training_job_name}/output/model.tar.gz"
    print("Model Artifact Path:", model_artifact_path)
else:
    print("Training job not found.")




Pipeline ARN: arn:aws:sagemaker:us-east-1:952054755114:pipeline/HousingPipeline
Pipeline Execution ARN: arn:aws:sagemaker:us-east-1:952054755114:pipeline/HousingPipeline/execution/4cnoo7s31mv9
Waiting 30 seconds
Waiting 30 seconds
Waiting 30 seconds
Waiting 30 seconds
Waiting 30 seconds
Training step completed successfully.
Model Artifact Path: s3://housing-dataset-5435v3/HousingTrain/pipelines-4cnoo7s31mv9-HousingTrain-PmKHqZN7gh/output/model.tar.gz


In [23]:
# Store the model data path for later use
pipeline_definition = definition
%store pipeline_definition
model_data = model_artifact_path
%store model_data

Stored 'pipeline_definition' (dict)
Stored 'model_data' (str)


In [24]:
execution = pipeline.start()

In [25]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:952054755114:pipeline/HousingPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:952054755114:pipeline/HousingPipeline/execution/23881kni69e0',
 'PipelineExecutionDisplayName': 'execution-1719193170799',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 6, 24, 1, 39, 30, 734000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 6, 24, 1, 39, 30, 734000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:952054755114:user-profile/d-lrdmbhhktifc/wacuna',
  'UserProfileName': 'wacuna',
  'DomainId': 'd-lrdmbhhktifc',
  'IamIdentity': {'Arn': 'arn:aws:sts::952054755114:assumed-role/LabRole/SageMaker',
   'PrincipalId': 'AROA53KXCV4VD4SFVT6WB:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:952054755114:user-profile/d-lrdmbhhktifc/wacuna',
  'UserProfileName': 'wacuna',
  'DomainId': 'd-lrdmbhhktifc',
  'IamIdentity': {'Arn': '

Wait for the execution to complete.

In [26]:
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service.

In [None]:
execution.list_steps()

### Examining the Evaluation

Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report.

In [None]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

### Lineage

Review the lineage of the artifacts generated by the pipeline.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

In [None]:
execution = pipeline.start(
    parameters=dict(
        ModelApprovalStatus="Approved",
    )
)

In [None]:
execution.wait()

In [None]:
execution.list_steps()

Apart from that, you might also want to adjust the MSE threshold to a smaller value and raise the bar for the accuracy of the registered model. In this case you can override the MSE threshold like the following:

In [None]:
execution = pipeline.start(parameters=dict(MseThreshold=10.0))

If the MSE threshold is not satisfied, the pipeline execution enters the `FailStep` and is marked as failed.

In [None]:
try:
    execution.wait()
except Exception as error:
    print(error)

In [None]:
execution.list_steps()