# Orchestrate Jobs to Train and Evaluate Models with Amazon SageMaker Pipelines


## Dataset

In [2]:
!pip install opendatasets

Collecting opendatasets
  Using cached opendatasets-0.1.22-py3-none-any.whl.metadata (9.2 kB)
Collecting kaggle (from opendatasets)
  Using cached kaggle-1.6.14-py3-none-any.whl
Using cached opendatasets-0.1.22-py3-none-any.whl (15 kB)
Installing collected packages: kaggle, opendatasets
Successfully installed kaggle-1.6.14 opendatasets-0.1.22
[0m

In [3]:
!pip install imblearn

Collecting imblearn
  Using cached imblearn-0.0-py2.py3-none-any.whl.metadata (355 bytes)
Collecting imbalanced-learn (from imblearn)
  Using cached imbalanced_learn-0.12.3-py3-none-any.whl.metadata (8.3 kB)
Using cached imblearn-0.0-py2.py3-none-any.whl (1.9 kB)
Using cached imbalanced_learn-0.12.3-py3-none-any.whl (258 kB)
Installing collected packages: imbalanced-learn, imblearn
Successfully installed imbalanced-learn-0.12.3 imblearn-0.0
[0m

In [4]:
import sys
import os
import boto3
import sagemaker
import pandas as pd
from sagemaker.workflow.pipeline_context import PipelineSession

from imblearn.combine import SMOTEENN
import opendatasets as od
from sklearn.model_selection import train_test_split

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"DiabetesModelPackageGroupName"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


Now, upload the data into the default bucket. 

In [5]:
!mkdir -p data

In [6]:
od.download('https://www.kaggle.com/alexteboul/diabetes-health-indicators-dataset')
os.listdir('diabetes-health-indicators-dataset')

In [9]:
import time
import logging
from sagemaker.feature_store.feature_group import FeatureGroup

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to check if the feature group is active
def wait_for_feature_group(feature_group, max_wait=600, interval=10):
    elapsed_time = 0
    while elapsed_time < max_wait:
        status = feature_group.describe().get("FeatureGroupStatus")
        if status == "Created":
            logger.info("Feature group is now active.")
            return True
        logger.info(f"Waiting for feature group to be active... {elapsed_time} seconds elapsed. Current status: {status}")
        time.sleep(interval)
        elapsed_time += interval
    raise TimeoutError(f"Feature group did not become active within {max_wait} seconds. Final status: {status}")

## Creating the Feature Group

In [12]:
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum

# Define the feature group name
feature_group_name = "diabetes-feature-group"

# Define the feature definitions
feature_definitions = [
    FeatureDefinition(feature_name="record_id", feature_type=FeatureTypeEnum.INTEGRAL),
    FeatureDefinition(feature_name="event_time", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="HighBP", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="HighChol", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="CholCheck", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="BMI", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="Smoker", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="Stroke", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="HeartDiseaseorAttack", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="PhysActivity", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="Fruits", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="Veggies", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="HvyAlcoholConsump", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="AnyHealthcare", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="NoDocbcCost", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="GenHlth", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="MentHlth", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="PhysHlth", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="DiffWalk", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="Sex", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="Age", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="Education", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="Income", feature_type=FeatureTypeEnum.FRACTIONAL),
    FeatureDefinition(feature_name="Diabetes_binary", feature_type=FeatureTypeEnum.INTEGRAL),
]

# Define the record identifier and event time feature names
record_identifier_name = "record_id"
event_time_feature_name = "event_time"

# Define the FeatureGroup object
feature_group = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=pipeline_session,
    feature_definitions=feature_definitions
)

# Create the Feature Group
feature_group.create(
    s3_uri=f"s3://{default_bucket}/feature-store",
    record_identifier_name=record_identifier_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    description="Feature group for diabetes dataset",
    enable_online_store=True,
    tags=[{"Key": "Project", "Value": "Diabetes"}]
)

# Wait for the Feature Group to be created and become active
wait_for_feature_group(feature_group)


INFO:__main__:Waiting for feature group to be active... 0 seconds elapsed. Current status: Creating
INFO:__main__:Waiting for feature group to be active... 10 seconds elapsed. Current status: Creating
INFO:__main__:Feature group is now active.


True

In [13]:
# Since we get a headerless CSV file, we specify the column names here.
feature_names = [
    'HighBP', 
    'HighChol', 
    'CholCheck', 
    'BMI', 
    'Smoker',
    'Stroke', 
    'HeartDiseaseorAttack', 
    'PhysActivity', 
    'Fruits', 
    'Veggies',
    'HvyAlcoholConsump', 
    'AnyHealthcare', 
    'NoDocbcCost', 
    'GenHlth',
    'MentHlth', 
    'PhysHlth', 
    'DiffWalk', 
    'Sex', 
    'Age', 
    'Education',
    'Income'
]

dataset = pd.read_csv('./diabetes-health-indicators-dataset/diabetes_binary_health_indicators_BRFSS2015.csv')

dataClean = dataset.drop_duplicates()

X = dataClean.drop(['Diabetes_binary'], axis=1).values
y = dataClean['Diabetes_binary']

# SMOTEENN
smoteenn = SMOTEENN(random_state=42)
X_resampled_smoteenn, y_resampled_smoteenn = smoteenn.fit_resample(X, y)

X_batch, _ = train_test_split(X_resampled_smoteenn, train_size=0.10, random_state=42)
batch_df = pd.DataFrame(X_batch, columns=feature_names)

## Ingesting Data into Feature Group

In [14]:
# Adding ingestion step after data processing
record_ids = range(len(X_resampled_smoteenn))
event_times = pd.Series([pd.Timestamp.now().strftime('%Y-%m-%dT%H:%M:%SZ') for _ in range(len(X_resampled_smoteenn))])
ingestion_data = pd.DataFrame(X_resampled_smoteenn, columns=feature_names)
ingestion_data["record_id"] = record_ids
ingestion_data["event_time"] = event_times
ingestion_data["Diabetes_binary"] = y_resampled_smoteenn

# Ensure the correct data types
for col in feature_names:
    if col in ["HighBP", "HighChol", "CholCheck", "Smoker", "Stroke", "HeartDiseaseorAttack", "PhysActivity", "Fruits", "Veggies", 
               "HvyAlcoholConsump", "AnyHealthcare", "NoDocbcCost", "GenHlth", "MentHlth", "PhysHlth", "DiffWalk", "Sex", "Age", "Education", "Income"]:
        ingestion_data[col] = ingestion_data[col].astype(int)
    elif col == "BMI":
        ingestion_data[col] = ingestion_data[col].astype(float)

ingestion_data["record_id"] = ingestion_data["record_id"].astype(int)
ingestion_data["event_time"] = ingestion_data["event_time"].astype(str)
ingestion_data["Diabetes_binary"] = ingestion_data["Diabetes_binary"].astype(int)

feature_group.ingest(data_frame=ingestion_data, max_workers=3, wait=True)


INFO:sagemaker.feature_store.feature_group:Started ingesting index 23 to 46
INFO:sagemaker.feature_store.feature_group:Started ingesting index 46 to 67
INFO:sagemaker.feature_store.feature_group:Started ingesting index 0 to 23
INFO:sagemaker.feature_store.feature_group:Successfully ingested row 23 to 46
INFO:sagemaker.feature_store.feature_group:Successfully ingested row 46 to 67
INFO:sagemaker.feature_store.feature_group:Successfully ingested row 0 to 23


IngestionManagerPandas(feature_group_name='diabetes-feature-group', feature_definitions={'record_id': {'FeatureName': 'record_id', 'FeatureType': 'Integral'}, 'event_time': {'FeatureName': 'event_time', 'FeatureType': 'String'}, 'HighBP': {'FeatureName': 'HighBP', 'FeatureType': 'Fractional'}, 'HighChol': {'FeatureName': 'HighChol', 'FeatureType': 'Fractional'}, 'CholCheck': {'FeatureName': 'CholCheck', 'FeatureType': 'Fractional'}, 'BMI': {'FeatureName': 'BMI', 'FeatureType': 'Fractional'}, 'Smoker': {'FeatureName': 'Smoker', 'FeatureType': 'Fractional'}, 'Stroke': {'FeatureName': 'Stroke', 'FeatureType': 'Fractional'}, 'HeartDiseaseorAttack': {'FeatureName': 'HeartDiseaseorAttack', 'FeatureType': 'Fractional'}, 'PhysActivity': {'FeatureName': 'PhysActivity', 'FeatureType': 'Fractional'}, 'Fruits': {'FeatureName': 'Fruits', 'FeatureType': 'Fractional'}, 'Veggies': {'FeatureName': 'Veggies', 'FeatureType': 'Fractional'}, 'HvyAlcoholConsump': {'FeatureName': 'HvyAlcoholConsump', 'Featur

In [15]:
dataset = pd.read_csv('./diabetes-health-indicators-dataset/diabetes_binary_health_indicators_BRFSS2015.csv')

dataClean = dataset.drop_duplicates()

X = dataClean.drop(['Diabetes_binary'], axis = 1).values
y = dataClean['Diabetes_binary']

# SMOTEENN
smoteenn = SMOTEENN(random_state=42)
X_resampled_smoteenn, y_resampled_smoteenn = smoteenn.fit_resample(X, y)

X_batch, _ = train_test_split(X_resampled_smoteenn, train_size=0.10, random_state=42)
batch_df = pd.DataFrame(X_batch, columns=feature_names)

In [16]:
# Convert y_resampled_smoteenn to a DataFrame
y_resampled_df = pd.DataFrame(y_resampled_smoteenn, columns=['Diabetes_binary'])

# Convert X_resampled_smoteenn to a DataFrame
X_resampled_df = pd.DataFrame(X_resampled_smoteenn, columns=feature_names)

# Concatenate features and labels
df = pd.concat([y_resampled_df, X_resampled_df], axis=1)

# Use 'csv' format to store the data
# The first column is expected to be the output column
df.to_csv('data/diabetes-dataset.csv', header=False)
batch_df.to_csv('data/diabetes-dataset-batch.csv', index=False, header=False)

In [17]:
local_path = "data/diabetes-dataset.csv"
base_uri = f"s3://{default_bucket}/diabetes"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-east-1-066897737437/diabetes/diabetes-dataset.csv


Download a second dataset for batch transformation after model creation.

In [18]:
local_path = "data/diabetes-dataset-batch.csv"
base_uri = f"s3://{default_bucket}/diabetes"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-us-east-1-066897737437/diabetes/diabetes-dataset-batch.csv


## Define Parameters to Parametrize Pipeline Execution

In [19]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

![Define Parameters](img/pipeline-1.png)

## Define a Processing Step for Feature Engineering

In [20]:
!mkdir -p code

In [21]:
%%writefile code/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file, we specify the column names here.
feature_columns_names = [
    'HighBP', 
    'HighChol', 
    'CholCheck', 
    'BMI', 
    'Smoker',
    'Stroke', 
    'HeartDiseaseorAttack', 
    'PhysActivity', 
    'Fruits', 
    'Veggies',
    'HvyAlcoholConsump', 
    'AnyHealthcare', 
    'NoDocbcCost', 
    'GenHlth',
    'MentHlth', 
    'PhysHlth', 
    'DiffWalk', 
    'Sex', 
    'Age', 
    'Education',
    'Income'
]
label_column = "Diabetes_binary"

feature_columns_dtype = {
    'HighBP': np.float64, 
    'HighChol': np.float64, 
    'CholCheck': np.float64, 
    'BMI': np.float64, 
    'Smoker': np.float64,
    'Stroke': np.float64, 
    'HeartDiseaseorAttack': np.float64, 
    'PhysActivity': np.float64, 
    'Fruits': np.float64, 
    'Veggies': np.float64,
    'HvyAlcoholConsump': np.float64, 
    'AnyHealthcare': np.float64, 
    'NoDocbcCost': np.float64, 
    'GenHlth': np.float64,
    'MentHlth': np.float64, 
    'PhysHlth': np.float64, 
    'DiffWalk': np.float64, 
    'Sex': np.float64, 
    'Age': np.float64, 
    'Education': np.float64,
    'Income': np.float64
}
label_column_dtype = {"Diabetes_binary": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/diabetes-dataset.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    numeric_features = list(feature_columns_names)
    numeric_transformer = Pipeline(
        steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
        ]
    )

    y = df.pop("Diabetes_binary")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting code/preprocessing.py


### Create an instance of a `SKLearnProcessor`

In [22]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-diabetes-process",
    role=role,
    sagemaker_session=pipeline_session,
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [23]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="DiabetesProcess", step_args=processor_args)



## Retrieving Features for Training

In [24]:
# Define the record IDs for the data to be retrieved
record_ids = list(range(len(dataClean)))

# Create a boto3 client for SageMaker Feature Store runtime
featurestore_runtime = boto3.client('sagemaker-featurestore-runtime')

# Function to retrieve features from Feature Store
def retrieve_feature_data(record_id):
    response = featurestore_runtime.get_record(
        FeatureGroupName=feature_group_name,
        RecordIdentifierValueAsString=str(record_id),
    )
    return response

# Retrieve data for all records
retrieved_data = [retrieve_feature_data(record_id) for record_id in record_ids]

# Convert the retrieved data to a DataFrame
retrieved_df = pd.DataFrame([record['Record'] for record in retrieved_data if 'Record' in record])


## Define a Training Step to Train a Model

In [25]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/DiabetesTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [26]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="DiabetesTrain",
    step_args=train_args,
)

## Define a Model Evaluation Step to Evaluate the Trained Model

First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.

After pipeline execution, you can examine the resulting `evaluation.json` for analysis.

The evaluation script uses `xgboost` to do the following:

* Load the model.
* Read the test data.
* Issue predictions against the test data.
* Build a classification report, including accuracy and ROC curve.
* Save the evaluation report to the evaluation directory.

In [27]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluation.py


Next, create an instance of a `ScriptProcessor` processor and use it in the `ProcessingStep`.

In [28]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-diabetes-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

Use the processor's arguments returned by `.run()` to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution.

In [29]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="DiabetesEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Define a Create Model Step to Create a Model

In [30]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

Define the `ModelStep` by providing the return values from `model.create()` as the step arguments.

In [31]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="DiabetesCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

## Define a Transform Step to Perform Batch Transformation

In [32]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/DiabetesTransform",
)

Pass in the transformer instance and the `TransformInput` with the `batch_data` pipeline parameter defined earlier.

In [33]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="DiabetesTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

## Define a Register Model Step to Create a Model Package

In [34]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="DiabetesRegisterModel", step_args=register_args)



## Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

In [35]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="DiabetesMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State

In [36]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="DiabetesMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

## Define a Pipeline of Parameters, Steps, and Conditions

In [37]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"DiabetesPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

### Examining the pipeline definition

In [38]:
import json


definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-066897737437/diabetes/diabetes-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-066897737437/diabetes/diabetes-dataset-batch.csv'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 6.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'DiabetesProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Ge

## Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps.

In [39]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:066897737437:pipeline/DiabetesPipeline',
 'ResponseMetadata': {'RequestId': '2ad9b110-b713-4573-ac07-a9130ce4c6a6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '2ad9b110-b713-4573-ac07-a9130ce4c6a6',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '84',
   'date': 'Sun, 30 Jun 2024 22:34:09 GMT'},
  'RetryAttempts': 0}}

Start the pipeline and accept all the default parameters.

In [40]:
execution = pipeline.start()

## Pipeline Operations: Examining and Waiting for Pipeline Execution

Describe the pipeline execution.

In [41]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:066897737437:pipeline/DiabetesPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:066897737437:pipeline/DiabetesPipeline/execution/gpyidnvizhz8',
 'PipelineExecutionDisplayName': 'execution-1719786850301',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 6, 30, 22, 34, 10, 229000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 6, 30, 22, 34, 10, 229000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:066897737437:user-profile/d-ifgytezau7fl/default-1719458027914',
  'UserProfileName': 'default-1719458027914',
  'DomainId': 'd-ifgytezau7fl',
  'IamIdentity': {'Arn': 'arn:aws:sts::066897737437:assumed-role/LabRole/SageMaker',
   'PrincipalId': 'AROAQ7E3KO3ORCGSZVS2U:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:066897737437:user-profile/d-ifgytezau7fl/default-1719458027914',
  'UserProfileName': 'default-1719458027

Wait for the execution to complete.

In [42]:
try:
    execution.wait()
except Exception as error:
    print(error)

Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"


List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service.

In [43]:
execution.list_steps()

[{'StepName': 'DiabetesTransform',
  'StartTime': datetime.datetime(2024, 6, 30, 22, 41, 33, 125000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 30, 22, 46, 43, 331000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'FailureReason': 'ClientError: ClientError: See job logs for more information',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:transform-job/pipelines-gpyidnvizhz8-DiabetesTransform-BZq0IGAgFI'}},
  'AttemptCount': 1},
 {'StepName': 'DiabetesCreateModel-CreateModel',
  'StartTime': datetime.datetime(2024, 6, 30, 22, 41, 31, 128000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 30, 22, 41, 32, 595000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:model/pipelines-gpyidnvizhz8-DiabetesCreateModel--P0kZ0GM49r'}},
  'AttemptCount': 1},
 {'StepName': 'DiabetesRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2024, 6, 30, 22, 41, 31

### Examining the Evaluation

Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report.

In [44]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))



{'regression_metrics': {'mse': {'standard_deviation': 1.3112616042139182,
                                'value': 2.0849036882680285}}}


### Lineage

Review the lineage of the artifacts generated by the pipeline.

In [45]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

{'StepName': 'DiabetesProcess', 'StartTime': datetime.datetime(2024, 6, 30, 22, 34, 11, 314000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 6, 30, 22, 36, 44, 110000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:processing-job/pipelines-gpyidnvizhz8-DiabetesProcess-WOYdOnmtZu'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...171de73ed36b4bac27aa0eb/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://...6897737437/diabetes/diabetes-dataset.csv,Input,DataSet,ContributedTo,artifact
2,68331...com/sagemaker-scikit-learn:1.2-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...gpyidnvizhz8/DiabetesProcess/output/test,Output,DataSet,Produced,artifact
4,s3://...vizhz8/DiabetesProcess/output/validation,Output,DataSet,Produced,artifact
5,s3://...pyidnvizhz8/DiabetesProcess/output/train,Output,DataSet,Produced,artifact


{'StepName': 'DiabetesTrain', 'StartTime': datetime.datetime(2024, 6, 30, 22, 36, 45, 53000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 6, 30, 22, 38, 55, 862000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:training-job/pipelines-gpyidnvizhz8-DiabetesTrain-DbcZmzXSwa'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...vizhz8/DiabetesProcess/output/validation,Input,DataSet,ContributedTo,artifact
1,s3://...pyidnvizhz8/DiabetesProcess/output/train,Input,DataSet,ContributedTo,artifact
2,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...etesTrain-DbcZmzXSwa/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'DiabetesEval', 'StartTime': datetime.datetime(2024, 6, 30, 22, 38, 56, 664000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 6, 30, 22, 41, 29, 670000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:processing-job/pipelines-gpyidnvizhz8-DiabetesEval-lRRaZAGrU4'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...1efdc9e2d552feb7e97dadd942/evaluation.py,Input,DataSet,ContributedTo,artifact
1,s3://...gpyidnvizhz8/DiabetesProcess/output/test,Input,DataSet,ContributedTo,artifact
2,s3://...etesTrain-DbcZmzXSwa/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...024-06-30-22-34-08-686/output/evaluation,Output,DataSet,Produced,artifact


{'StepName': 'DiabetesMSECond', 'StartTime': datetime.datetime(2024, 6, 30, 22, 41, 30, 192000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 6, 30, 22, 41, 30, 549000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'Condition': {'Outcome': 'True'}}, 'AttemptCount': 1}


None

{'StepName': 'DiabetesRegisterModel-RegisterModel', 'StartTime': datetime.datetime(2024, 6, 30, 22, 41, 31, 128000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 6, 30, 22, 41, 32, 443000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:model-package/DiabetesModelPackageGroupName/1'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...etesTrain-DbcZmzXSwa/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,DiabetesModelPackageGroupName-1-PendingManualA...,Input,Approval,ContributedTo,action
3,DiabetesModelPackageGroupName-1719787292-aws-m...,Output,ModelGroup,AssociatedWith,context


{'StepName': 'DiabetesCreateModel-CreateModel', 'StartTime': datetime.datetime(2024, 6, 30, 22, 41, 31, 128000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 6, 30, 22, 41, 32, 595000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:model/pipelines-gpyidnvizhz8-DiabetesCreateModel--P0kZ0GM49r'}}, 'AttemptCount': 1}


None

{'StepName': 'DiabetesTransform', 'StartTime': datetime.datetime(2024, 6, 30, 22, 41, 33, 125000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 6, 30, 22, 46, 43, 331000, tzinfo=tzlocal()), 'StepStatus': 'Failed', 'FailureReason': 'ClientError: ClientError: See job logs for more information', 'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:transform-job/pipelines-gpyidnvizhz8-DiabetesTransform-BZq0IGAgFI'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...etesTrain-DbcZmzXSwa/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,s3://...7437/diabetes/diabetes-dataset-batch.csv,Input,DataSet,ContributedTo,artifact
3,s3://...us-east-1-066897737437/DiabetesTransform,Output,DataSet,Produced,artifact


### Parametrized Executions

In [46]:
execution = pipeline.start(
    parameters=dict(
        ModelApprovalStatus="Approved",
    )
)

In [47]:
try:
    execution.wait()
except Exception as error:
    print(error)

Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"


In [48]:
execution.list_steps()

[{'StepName': 'DiabetesTransform',
  'StartTime': datetime.datetime(2024, 6, 30, 22, 55, 18, 115000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 30, 23, 0, 29, 181000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'FailureReason': 'ClientError: ClientError: See job logs for more information',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:transform-job/pipelines-kh9ia4a5g9kk-DiabetesTransform-7BTsf5mNNc'}},
  'AttemptCount': 1},
 {'StepName': 'DiabetesCreateModel-CreateModel',
  'StartTime': datetime.datetime(2024, 6, 30, 22, 55, 16, 627000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 30, 22, 55, 17, 678000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:model/pipelines-kh9ia4a5g9kk-DiabetesCreateModel--BD4e8PuQOJ'}},
  'AttemptCount': 1},
 {'StepName': 'DiabetesRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2024, 6, 30, 22, 55, 16,

Apart from that, you might also want to adjust the MSE threshold to a smaller value and raise the bar for the accuracy of the registered model. In this case you can override the MSE threshold like the following:

In [49]:
execution = pipeline.start(parameters=dict(MseThreshold=3.0))

If the MSE threshold is not satisfied, the pipeline execution enters the `FailStep` and is marked as failed.

In [50]:
try:
    execution.wait()
except Exception as error:
    print(error)

Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"


In [51]:
execution.list_steps()

[{'StepName': 'DiabetesTransform',
  'StartTime': datetime.datetime(2024, 6, 30, 23, 8, 43, 380000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 30, 23, 14, 0, 910000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'FailureReason': 'ClientError: ClientError: See job logs for more information',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:transform-job/pipelines-o7t1ydylfy0s-DiabetesTransform-V0Pi1kzrfq'}},
  'AttemptCount': 1},
 {'StepName': 'DiabetesCreateModel-CreateModel',
  'StartTime': datetime.datetime(2024, 6, 30, 23, 8, 41, 115000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 30, 23, 8, 42, 485000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:066897737437:model/pipelines-o7t1ydylfy0s-DiabetesCreateModel--RuzLTFHFP9'}},
  'AttemptCount': 1},
 {'StepName': 'DiabetesRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2024, 6, 30, 23, 8, 41, 115