# SageMaker Local pipeline

This notebook demonstrates how to orchestrate SageMaker jobs locally using SageMaker Pipelines. 
The notebook uses a parameter `run_locally` to define the `pipeline_session` to either be a `LocalPipelineSession` object or a `PipelineSession` object.
Depending on how the `run_locally` parameter is set, the pipeline will run on your local machine or in the cloud. 

**Note**: You can run this on SageMaker Classic Notebook instances OR your local IDE.
This notebook will also run in the new SageMaker Studio (e.g. Code Editor or Jupyterlab). It will not run in SageMaker Studio Classic since the classic does not support docker.

# Prerequisites for Sagemaker Studio (Code Editor or Jupyterlab)

To run this notebook in the new Sagemaker Studio Experience (e.g. Code Editor or Jupyterlab), you need to install Docker as described in the  [Local mode support in Amazon SageMaker Studio docs](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-local.html)

### Enable Docker in Sagemaker Studio Domain

Open e.g. AWS CloudShell and update the domain to enable docker access.

```sh
# update domain
aws --region region \
    sagemaker update-domain --domain-id domain-id \
    --domain-settings-for-update '{"DockerSettings": {"EnableDockerAccess": "ENABLED"}}'
```

### Install docker packages

This notebook cell writes a file `sagemaker-ubuntu-jammy-docker-cli-install.sh`, which contains the installation script of docker.

In [1]:
%%writefile sagemaker-ubuntu-jammy-docker-cli-install.sh
#!/bin/bash

# for a bit of documentation, that script is meant for jammy jellyfish,
# if you want to use another version, set the VERSION_CODENAME environment
# variable when running for another version, also it defaults the DOCKER_HOST
# to the location of the socket but if sagemaker does evolve, you can again
# just set that environment variable
apt-get update
apt-get install ca-certificates curl gnupg -y
install -m 0755 -d /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg
chmod a+r /etc/apt/keyrings/docker.gpg

echo \
  "deb [arch="$(dpkg --print-architecture)" signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \
  "$(. /etc/os-release && echo "${VERSION_CODENAME:-jammy}")" stable" | \
  tee /etc/apt/sources.list.d/docker.list > /dev/null
apt-get update

# pick the latest patch from:
# apt-cache madison docker-ce | awk '{ print $3 }' | grep -i 20.10
VERSION_STRING=5:20.10.24~3-0~ubuntu-${VERSION_CODENAME:-jammy}
apt-get install docker-ce-cli=$VERSION_STRING docker-compose-plugin -y

# validate the Docker Client is able to access Docker Server at [unix:///docker/proxy.sock]

if [ -z "${DOCKER_HOST}" ]; then
  export DOCKER_HOST="unix:///docker/proxy.sock"
fi

docker version

Overwriting sagemaker-ubuntu-jammy-docker-cli-install.sh


## SageMaker Pipelines Local Mode

SageMaker Pipelines Local Mode supports the following activities, which are demonstrated in this notebook:

* ProcessingStep
* TrainingStep
* ConditionStep
* ModelStep
* TransformStep

#### Install the latest version of the SageMaker Python SDK. 

In [2]:
%pip install 'sagemaker' --upgrade

Collecting sagemaker
  Using cached sagemaker-2.224.1-py3-none-any.whl.metadata (15 kB)
Using cached sagemaker-2.224.1-py3-none-any.whl (1.5 MB)
Installing collected packages: sagemaker
  Attempting uninstall: sagemaker
    Found existing installation: sagemaker 2.219.0
    Uninstalling sagemaker-2.219.0:
      Successfully uninstalled sagemaker-2.219.0
Successfully installed sagemaker-2.224.1
Note: you may need to restart the kernel to use updated packages.


In [1]:
import sys
from os import path
import os
import yaml
import json
import pathlib as pl
import sagemaker
import sagemaker.session
from sagemaker.estimator import Estimator
from sagemaker.model import Model
from sagemaker.inputs import TrainingInput, TransformInput
from sagemaker.metadata_properties import MetadataProperties
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CacheConfig, TuningStep, TransformStep
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet, Join
from sagemaker.tuner import (
    ContinuousParameter,
    HyperparameterTuner,
    WarmStartConfig,
    WarmStartTypes,
)
from sagemaker.xgboost import XGBoostPredictor
from sagemaker.transformer import Transformer

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


This code snippet is useful for setting up a base directory for tasks and a flag to determine whether to run operations locally.

In [2]:
BASE_DIR = pl.Path(path.realpath('./tasks'))
run_locally = True

### These functions are useful in a variety of scenarios:

load_config(config_path): Use this function to load configurations from YAML files, which is a common practice for managing application settings.
<br>
is_file_empty(file_path): This function can be used to check whether a file exists and is non-empty, which is useful for file validation before processing.


In [3]:
def load_config(config_path):
    """Load configuration from a YAML file."""
    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)
    return config

def is_file_empty(file_path):
    """Check if the given file is empty."""
    if not os.path.exists(file_path):
        return True
    elif os.stat(file_path).st_size == 0:
        return True
    return False

### Define session

In [4]:
if run_locally:
    pipeline_session = LocalPipelineSession()
else:
    pipeline_session = PipelineSession()

### Defining Variables
The provided code allows for flexible and scalable project management by utilizing YAML for configuration and environment variables for deployment-specific settings.

In [5]:
# Load configuration from config.yaml
config_path = 'config.yaml'
config = load_config(config_path)
instance_type = config["INSTANCE_TYPE"] #ml.m5.xlarge
instance_count = config["INSTANCE_COUNT"] #1
base_job_name = config["BASE_JOB_NAME"] #"XGBTraffic"
input_data_s3_uri = config["INPUT_DATA_S3_URI"] #"s3://og-407-temp-test-data/austin-traffic/Radar_Traffic_Counts_20240528.csv"
output_path = '/opt/ml/processing'#pl.Path(path.realpath('./processing'))#config["OUTPUT_PATH"] #"/opt/ml/processing"
xgboost_framework_version = config["XGBOOST_FRAMEWORK_VERSION"] #"1.7-1"
xgboost_py_version = config["XGBOOST_PY_VERSION"] #"py3"
evaluation_threshold = config["EVALUATION_THRESHOLD"] #"1"
hyperparameters = config["HYPERPARAMETERS"]
cache_preprocess = False #config["CACHE_PREPROCESS"]
cache_training = False #config["CACHE_TRAINING"]

In [6]:
# Define env variables
aws_region = os.environ.get("AWS_DEFAULT_REGION", pipeline_session.boto_region_name)
role = os.environ.get("SM_PIPELINE_ROLE_ARN", sagemaker.session.get_execution_role())
kms_key_alias = os.environ.get("SM_STUDIO_KMS_KEY_ALIAS", None)
sm_project_name = os.environ.get("SM_PROJECT_NAME", "local_project")
sm_project_id = os.environ.get("SM_PROJECT_ID", "p-012345")
mg_name = os.environ.get("MPG_NAME", "local_model_group")
commit = os.environ.get("COMMIT_ID", "0123456789")
repo_name = os.environ.get("REPO_NAME", "model-local")

## Define a Processing Step for Feature Engineering

First, develop a preprocessing script that is specified in the Processing step.

This notebook cell writes a file `preprocess.py`, which contains the preprocessing script. You can update the script, and rerun this cell to overwrite. 
<br>
The Processing step executes the script on the input data. The Training step uses the preprocessed training features and labels to train a model. The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model.

In [7]:
%%writefile tasks/preprocess.py
import argparse
import logging
import os
import pathlib
import sys
import subprocess

subprocess.check_call([
    sys.executable, "-m", "pip", "install", "duckdb",
])

import boto3
import duckdb
import numpy as np
import pandas as pd

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

S3_PATH = os.environ.get(
    "S3_PATH",
    "s3://og-407-temp-test-data/austin-traffic/Radar_Traffic_Counts_20240528.csv",
)
DB_PATH = os.environ.get("DB_PATH", "data/traffic.duckdb")
# CONN = duckdb.connect(DB_PATH)

TARGET_COLUMN = "volume"
FEATURE_COLUMNS = [
    "intersection_name",
    "month",
    "day",
    "year",
    "hour",
    "minute",
    "day_of_week",
    "time_bin",
    "lane_nb_out",
    "lane_sb",
    "lane_nb",
    "lane_eb_mid",
    "lane_nb_4",
    "lane_sb_out",
    "lane_wb_in",
    "lane_wb_out",
    "lane_nb_mid",
    "lane_sb_mid",
    "lane_sb_1",
    "lane_nb_in",
    "lane_sb_rturn",
    "lane_sb_lturn",
    "lane_eb_out",
    "lane_sb_2",
    "lane_nb_3",
    "lane_sb_in",
    "lane_eb_in",
    "direction_sb2",
    "direction_wb",
    "direction_nb3",
    "direction_eb",
    "direction_nb4",
    "direction_sb",
    "direction_nb",
    "direction_sb1",
]


def clean_directions(table: duckdb.DuckDBPyRelation) -> duckdb.DuckDBPyRelation:
    """Remove rows with invalid travel direction."""
    logging.debug("Cleaning rows with invalid travel directions.")
    return table.filter("direction <> '%None%'")


def clean_lanes(table: duckdb.DuckDBPyRelation) -> duckdb.DuckDBPyRelation:
    """Remove rows with invalid lane values."""
    logger.debug("Cleaning rows with invalid lane values.")
    return table.filter("lane not like '%Lane%'")


def normalize_lanes(table: duckdb.DuckDBPyRelation) -> duckdb.DuckDBPyRelation:
    """Normalize lane values to a consistent snake_case format."""
    logger.debug("Normalizing lane values to consistent format.")
    reformat_expr = duckdb.FunctionExpression(
        "regexp_replace",
        duckdb.ColumnExpression("lane"),
        duckdb.ConstantExpression(r"(\d+$)"),
        duckdb.ConstantExpression(r"_\1"),
    )
    return table.select(
        duckdb.StarExpression(exclude=["lane"]),
        reformat_expr.alias("lane"),
    )


def apply_dummies(
    table: duckdb.DuckDBPyRelation, column: str, exclude_original: bool = True
) -> duckdb.DuckDBPyRelation:
    """Convert a categorical column into dummy columnds (i.e. one-hot encoding)."""

    logger.debug(f"Applying one-hot encoding to column '{column}'.")

    unique_values = table.select(column).distinct().fetchall()
    logger.debug(f"Unique values in column '{column}': {unique_values}")

    dummy_expressions = []

    for val_row in unique_values:
        (val,) = val_row
        dummy_expressions.append(
            duckdb.CaseExpression(
                condition=duckdb.ColumnExpression(column)
                == duckdb.ConstantExpression(val),
                value=duckdb.ConstantExpression(1),
            )
            .otherwise(duckdb.ConstantExpression(0))
            .alias(f"{column}_{val}")
        )

    return table.select(
        duckdb.StarExpression(exclude=[column] if exclude_original else []),
        *dummy_expressions,
    )


def downcase_columns(table: duckdb.DuckDBPyRelation) -> duckdb.DuckDBPyRelation:
    """Convert column names to lowercase and replace spaces with underscores."""
    return table.select(
        *[
            duckdb.ColumnExpression(col).alias("_".join(col.lower().split()))
            for col in table.columns
        ]
    )


def main():
    """Command line interface for preprocessing script."""
    logging.info("Starting preprocessing.")

    parser = argparse.ArgumentParser(
        description="Preprocess austin traffic data for XGBoost."
    )
    parser.add_argument(
        "--input-data",
        type=str,
        required=True,
        default=S3_PATH,
        help="Path to input data.",
    )
    parser.add_argument(
        "--random-seed",
        type=int,
        default=42,
        help="Seed for random number generation.",
    )
    args = parser.parse_args()

    input_data = args.input_data
    random_seed = args.random_seed

    base_dir = os.path.join("/", "opt", "ml", "processing")
    # base_dir = os.path.join("opt", "ml", "processing")
    data_dir = os.path.join(base_dir, "data")
    pathlib.Path(data_dir).mkdir(parents=True, exist_ok=True)

    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])
    
    logging.info(f"Downloading data from bucket: {bucket}, key: {key}")
    
    file_name = os.path.join(data_dir, "traffic-dataset.csv")
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, file_name)
        
    logging.debug(f"Reading downloaded data from {file_name}")
    table = duckdb.read_csv(file_name, header=True)
    # os.unlink(file_name)

    logger.debug("Applying transformations.")
    table = clean_directions(table)
    table = clean_lanes(table)
    table = normalize_lanes(table)
    table = apply_dummies(table, "direction")
    table = apply_dummies(table, "lane")
    table = downcase_columns(table)
    y = table.select(TARGET_COLUMN).fetchdf()
    X_pre = table.select(*FEATURE_COLUMNS).fetchdf().to_numpy()
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((X_pre, y_pre), axis=1)

    logger.info("Splitting data into train, validation, and test sets.")
    np.random.seed = random_seed
    np.random.shuffle(X)
    train, val, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    logger.debug(f"Writing processed data to {base_dir}.")
    
    train_path = os.path.join(base_dir, "train")
    val_path = os.path.join(base_dir, "validation")
    test_path = os.path.join(base_dir, "test")
    pathlib.Path(train_path).mkdir(parents=True, exist_ok=True)
    pathlib.Path(val_path).mkdir(parents=True, exist_ok=True)
    pathlib.Path(test_path).mkdir(parents=True, exist_ok=True)

    # NOTE: All duckdb tasks combined take less time than one of the pandas writes

    pd.DataFrame(train).to_csv(
        os.path.join(train_path, "train.csv"), header=False, index=False
    )
    pd.DataFrame(val).to_csv(
        os.path.join(val_path, "validation.csv"), header=False, index=False
    )
    pd.DataFrame(test).to_csv(
        os.path.join(test_path, "test.csv"), header=False, index=False
    )


if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        logger.exception(e)
    # finally:
        # CONN.close()

Overwriting tasks/preprocess.py


Next, create an instance of a `SKLearnProcessor` processor and use that in our `ProcessingStep`.

In [8]:
#Preprocess Step
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type=instance_type,
    instance_count=instance_count,
    base_job_name=f"{sm_project_name}/{base_job_name}-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

step_process = ProcessingStep(
    name=f"{base_job_name}Preprocess",
    processor=sklearn_processor,
    outputs=[ProcessingOutput(output_name="train", source=f"{output_path}/train"),
             ProcessingOutput(output_name="validation", source=f"{output_path}/validation"),
             ProcessingOutput(output_name="test", source=f"{output_path}/test")],
        code=str(BASE_DIR.joinpath("preprocess.py")),
        job_arguments=["--input-data", input_data_s3_uri],
        cache_config=cache_config if cache_preprocess else None,
    )

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


## Define Training Step for Xgboost Model

First, develop a preprocessing script that is specified in the Processing step.

This notebook cell writes a file `train.py`, which contains the training script. You can update the script, and rerun this cell to overwrite. 
<br>
The Training step uses the preprocessed training features and labels to train a model. 

In [9]:
%%writefile tasks/train.py

import argparse
import json
import logging
import os
import pathlib
import pickle as pkl
import tarfile


import numpy as np
import pandas as pd
import xgboost as xgb



logging.basicConfig(level=logging.INFO)

TRAIN_VALIDATION_FRACTION = 0.2
RANDOM_STATE_SAMPLING = 200

logging.basicConfig(level=logging.INFO)


def prepare_data(train_dir, validation_dir):
    """Read data from train and validation channel, and return predicting features and target variables.

    Args:
        data_dir (str): directory which saves the training data.

    Returns:
        Tuple of training features, training target, validation features, validation target.
    """
    df_train = pd.read_csv(
        os.path.join(train_dir, "train.csv"),
        header=None,
    )
    df_train = df_train.iloc[np.random.permutation(len(df_train))]
    df_train = pd.get_dummies(df_train)
    df_train.columns = ["target"] + [f"feature_{x}" for x in range(df_train.shape[1] - 1)]
    
    try:
        df_validation = pd.read_csv(
            os.path.join(validation_dir, "validation.csv"),
            header=None,
        )
        df_validation = pd.get_dummies(df_validation)
        df_validation.columns = ["target"] + [
            f"feature_{x}" for x in range(df_validation.shape[1] - 1)
        ]
    except FileNotFoundError:  # when validation data is not available in the directory
        logging.info(
            f"Validation data is not found. {TRAIN_VALIDATION_FRACTION * 100}% of training data is "
            f"randomly selected as validation data. The seed for random sampling is {RANDOM_STATE_SAMPLING}."
        )
        df_validation = df_train.sample(
            frac=TRAIN_VALIDATION_FRACTION,
            random_state=RANDOM_STATE_SAMPLING,
        )
        df_train.drop(df_validation.index, inplace=True)
        df_validation.reset_index(drop=True, inplace=True)
        df_train.reset_index(drop=True, inplace=True)

    X_train, y_train = df_train.iloc[:, 1:], df_train.iloc[:, :1]
    X_val, y_val = df_validation.iloc[:, 1:], df_validation.iloc[:, :1]

    return X_train.values, y_train.values, X_val.values, y_val.values


def main():
    """Run training."""
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--max_depth",
        type=int,
    )
    parser.add_argument("--eta", type=float)
    parser.add_argument("--gamma", type=int)
    parser.add_argument("--min_child_weight", type=int)
    parser.add_argument("--subsample", type=float)
    parser.add_argument("--verbosity", type=int)
    parser.add_argument("--objective", type=str)
    parser.add_argument("--num_round", type=int)
    parser.add_argument("--tree_method", type=str, default="auto")
    parser.add_argument("--predictor", type=str, default="auto")
    parser.add_argument("--learning_rate", type=str, default="auto")
    parser.add_argument("--output_data_dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR"))
    parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--validation", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION"))
    parser.add_argument("--sm_hosts", type=str, default=os.environ.get("SM_HOSTS"))
    parser.add_argument("--sm_current_host", type=str, default=os.environ.get("SM_CURRENT_HOST"))

    args, _ = parser.parse_known_args()

    X_train, y_train, X_val, y_val = prepare_data(args.train, args.validation)

    # create dataset for lightgbm
    #dtrain = xgb.DMatrix(data=X_train, label=y_train)
    #dval = xgb.DMatrix(data=X_val, label=y_val)
    #watchlist = [(dtrain, "train"), (dval, "validation")]

    # specify your configurations as a dict
    params = {
        "booster": "gbtree",
        "objective": args.objective,
        "learning_rate": args.learning_rate,
        "gamma": args.gamma,
        "min_child_weight": args.min_child_weight,
        "max_depth": args.max_depth,
        "subsample": args.subsample,
        "colsample_bytree": 1,
        "reg_lambda": 1,
        "reg_alpha": 0,
        "eval_metric": "rmse",
    }

    bst = xgb.XGBRegressor(objective='reg:squarederror',)
    bst.fit(X_train, y_train)

    model_location = args.model_dir + "/xgboost-model"
    pkl.dump(bst, open(model_location, "wb"))
    logging.info("Stored trained model at {}".format(model_location))


if __name__ == "__main__":
    main()

Overwriting tasks/train.py


In [10]:
# Training Step
# Must have entry_point script, cannot use build-in algorithm
model_path = f"s3://{pipeline_session.default_bucket()}/{sm_project_name}/{base_job_name}-train"

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=aws_region,
    version=xgboost_framework_version,
    py_version=xgboost_py_version,
    instance_type=instance_type,
)

xgb_train = Estimator(
    entry_point=str(BASE_DIR.joinpath("train.py")) if not is_file_empty(str(BASE_DIR.joinpath("train.py"))) else None,
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=instance_count,
    output_path=model_path,
    output_kms_key=kms_key_alias,
    base_job_name=f"{sm_project_name}/{base_job_name}-train",
    sagemaker_session=pipeline_session,
    role=role
)

xgb_train.set_hyperparameters(**hyperparameters)

step_train = TrainingStep(
    name=f"{base_job_name}TrainModel",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/libsvm"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/libsvm"
        )
    },
    cache_config=cache_config if cache_training else None,
)

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


#### Trining Step Limitation of Local Pipeline Session
- No Built-in Algorithm Support: You cannot use built-in algorithms provided by SageMaker directly within a local pipeline session. You must define your own training script.

## Define Evaluation Step to Evaluate the Trained Model
First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.

After pipeline execution, you can examine the resulting `evaluation.json` for analysis.

In [18]:
%%writefile tasks/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    df = pd.get_dummies(df)
    y_test = df.iloc[:, 0].to_numpy()
    #df.drop(df.columns[0], axis=1, inplace=True)

    X_test = df.values

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }
    
#    report_dict = {
#        "regression_metrics": {
#            "mse": {"value": 2.0, "standard_deviation": 0.0},
#        }
#    }
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting tasks/evaluation.py


In [12]:
# Evaluation Step
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=instance_type,
    instance_count=instance_count,
    base_job_name=f"{sm_project_name}/{base_job_name}-eval",
    sagemaker_session=pipeline_session,
    role=role,
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name=f"{base_job_name}Eval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination=f"{output_path}/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination=f"{output_path}/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source=f"{output_path}/evaluation"),
    ],
    code=str(BASE_DIR.joinpath("evaluation.py")),
    property_files=[evaluation_report],
)

## Define Create Model Step to Create a Model

In order to perform batch transformation using the model, create a SageMaker model. 

In [13]:
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    sagemaker_session=pipeline_session
)

step_create_model = ModelStep(
    name=f"{base_job_name}CreateModel", step_args=model.create(instance_type=instance_type)
)



## Define Transform Step to Perform Batch Transformation

Now that a model instance is defined, create a `Transformer` instance with the appropriate model type, compute instance type, and desired output S3 URI.

In [14]:
transformer_path = f"s3://{pipeline_session.default_bucket()}/{sm_project_name}/{base_job_name}-transform"

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type=instance_type,
    instance_count=instance_count,
    output_path=transformer_path,
    sagemaker_session=pipeline_session,
)

transform_data = Join(
    on="/",
    values=[
        step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        "test.csv",
    ],
)

transform_args = transformer.transform(transform_data, content_type="text/csv")

step_transform = TransformStep(name=f"{base_job_name}Transform", step_args=transform_args)

## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation Or Terminate the Execution in Failed State

In this step, the model is created only if the accuracy of the model, as determined by the evaluation step `step_eval`, exceeded a specified value. Otherwise, the pipeline execution fails and terminates. A `ConditionStep` enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties.

In the following section, you:

* Define a `ConditionLessThanOrEqualTo` on the accuracy value found in the output of the evaluation step, `step_eval`.
* Use the condition in the list of conditions in a `ConditionStep`.
* Pass the `CreateModelStep` and `TransformStep` steps into the `if_steps` of the `ConditionStep`, which are only executed if the condition evaluates to `True`.
* Pass the `FailStep` step into the `else_steps`of the `ConditionStep`, which is only executed if the condition evaluates to `False`.

In [15]:
# Conditional Step

step_fail = FailStep(
    name=f"{base_job_name}MSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", evaluation_threshold]),
)

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=evaluation_threshold
)

step_cond = ConditionStep(
    name=f"{base_job_name}MSECond",
    conditions=[cond_lte],
    if_steps=[step_create_model, step_transform],
    else_steps=[]
)

## Define a Pipeline

In this section, combine the steps into a Pipeline so it can be executed. Depending on the `pipeline_session` variable the steps in the pipeline will run either locally on your machine or in the cloud.

In [16]:
pipeline_name = f"LocalModelPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)
definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'XGBTrafficPreprocess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '341280168497.dkr.ecr.ca-central-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3',
     'ContainerArguments': ['--input-data',
      's3://407etr-360834319396-ca-central-1-sm-domain-resources/sample-project-data/Radar_Traffic_Counts_20240528.csv'],
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/preprocess.py']},
    'RoleArn': 'arn:aws:iam::360834319396:role/407ETR_Role_sm-studio-execution-role',
    'ProcessingInputs': [{'InputName': 'code',
      'AppManaged': False,
      'S3Input': {'S3Uri': 's3://sa

## Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps.

In [19]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker.local.entities:Starting execution for pipeline LocalModelPipeline. Execution ID is 86c942e6-7972-4788-ba0b-43b36b65e241
INFO:sagemaker.local.entities:Starting pipeline step: 'XGBTrafficPreprocess'
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview

 Container 2var2zyjnh-sagemaker-local  Creating
 Container 2var2zyjnh-sagemaker-local  Created
Attaching to 2var2zyjnh-sagemaker-local
2var2zyjnh-sagemaker-local  | Collecting duckdb
2var2zyjnh-sagemaker-local  |   Downloading duckdb-1.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (762 bytes)
2var2zyjnh-sagemaker-local  | Downloading duckdb-1.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.5/18.5 MB[0m [31m53.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m-[0m
2var2zyjnh-sagemaker-local  | [?25hInstalling collected packages: duckdb
2var2zyjnh-sagemaker-local  | Successfully installed duckdb-1.0.0
2var2zyjnh-sagemaker-local  | Starting preprocessing.
2var2zyjnh-sagemaker-local  | Downloading data from bucket: 407etr-360834319396-ca-central-1-sm-domain-resources, key: sample-project-data/Radar_Traffic_Counts_20240528.csv
2var2zyjnh-sagemaker-local  | Changing event name f

INFO:sagemaker.local.image:===== Job Complete =====
INFO:sagemaker.local.entities:Pipeline step 'XGBTrafficPreprocess' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'XGBTrafficTrainModel'
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.local_session:Starting training job
INFO:sagemaker.local.image:docker compose file: 
services:
  sagemaker-local:
    command: train
    container_name: 0rsmfy7sfw-sagemaker-local
    environment:
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Mas

 Container 0rsmfy7sfw-sagemaker-local  Creating
 Container 0rsmfy7sfw-sagemaker-local  Created
Attaching to 0rsmfy7sfw-sagemaker-local
0rsmfy7sfw-sagemaker-local  |   from pandas import MultiIndex, Int64Index
0rsmfy7sfw-sagemaker-local  | [2024-06-26 19:33:30.742 default:1 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None
0rsmfy7sfw-sagemaker-local  | [2024-06-26 19:33:30.761 default:1 INFO profiler_config_parser.py:111] Unable to find config at /opt/ml/input/config/profilerconfig.json. Profiler is disabled.
0rsmfy7sfw-sagemaker-local  | [2024-06-26:19:33:31:INFO] Imported framework sagemaker_xgboost_container.training
0rsmfy7sfw-sagemaker-local  | [2024-06-26:19:33:31:INFO] Failed to parse hyperparameter objective value reg:squarederror to Json.
0rsmfy7sfw-sagemaker-local  | Returning the value itself
0rsmfy7sfw-sagemaker-local  | [2024-06-26:19:33:31:INFO] No GPUs detected (normal if no gpus installed)
0rsmfy7sfw-sagemaker-local  | [2024-06-26:19:33:31:INFO] Invoking user trainin

INFO:root:creating /home/sagemaker-user/tmp/tmpfmq3uxl7/artifacts/output/data
INFO:root:copying /home/sagemaker-user/tmp/tmpfmq3uxl7/model/xgboost-model -> /home/sagemaker-user/tmp/tmpfmq3uxl7/artifacts/model


0rsmfy7sfw-sagemaker-local  | INFO:root:Stored trained model at /opt/ml/model/xgboost-model
[K0rsmfy7sfw-sagemaker-local exited with code 0
Aborting on container exit...
 Container 0rsmfy7sfw-sagemaker-local  Stopping
 Container 0rsmfy7sfw-sagemaker-local  Stopped


INFO:sagemaker.local.image:===== Job Complete =====
INFO:sagemaker.local.entities:Pipeline step 'XGBTrafficTrainModel' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'XGBTrafficEval'
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.local_session:Starting processing job
INFO:sagemaker.local.image:docker compose file: 
services:
  sagemaker-local:
    container_name: n4zh6qtjex-sagemaker-local
    entrypoint:
    - python3
    - /opt/ml/processing/input/code/evaluation.py
    environment:
    - '[Masked]'
    -

 Container n4zh6qtjex-sagemaker-local  Creating
 Container n4zh6qtjex-sagemaker-local  Created
Attaching to n4zh6qtjex-sagemaker-local
n4zh6qtjex-sagemaker-local  |   from pandas import MultiIndex, Int64Index


INFO:sagemaker.local.image:===== Job Complete =====


[Kn4zh6qtjex-sagemaker-local exited with code 0
Aborting on container exit...
 Container n4zh6qtjex-sagemaker-local  Stopping
 Container n4zh6qtjex-sagemaker-local  Stopped


INFO:sagemaker.local.entities:Pipeline step 'XGBTrafficEval' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'XGBTrafficMSECond'
INFO:sagemaker.local.entities:Pipeline step 'XGBTrafficMSECond' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'XGBTrafficCreateModel-CreateModel'
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker.local.entities:Pipeline step 'XGBTrafficCreateModel-CreateModel' SUCCEEDED.
INFO:sagemaker.local.entities:Starting pipeline step: 'XGBTrafficTransform'
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help

Attaching to yiu6pb58hx-sagemaker-local
yiu6pb58hx-sagemaker-local  |   from pandas import MultiIndex, Int64Index
yiu6pb58hx-sagemaker-local  | [2024-06-26:19:50:06:INFO] No GPUs detected (normal if no gpus installed)
yiu6pb58hx-sagemaker-local  | [2024-06-26:19:50:06:INFO] No GPUs detected (normal if no gpus installed)
yiu6pb58hx-sagemaker-local  | [2024-06-26:19:50:06:INFO] nginx config: 
yiu6pb58hx-sagemaker-local  | worker_processes auto;
yiu6pb58hx-sagemaker-local  | daemon off;
yiu6pb58hx-sagemaker-local  | pid /tmp/nginx.pid;
yiu6pb58hx-sagemaker-local  | error_log  /dev/stderr;
yiu6pb58hx-sagemaker-local  | 
yiu6pb58hx-sagemaker-local  | worker_rlimit_nofile 4096;
yiu6pb58hx-sagemaker-local  | 
yiu6pb58hx-sagemaker-local  | events {
yiu6pb58hx-sagemaker-local  |   worker_connections 2048;
yiu6pb58hx-sagemaker-local  | }
yiu6pb58hx-sagemaker-local  | 
yiu6pb58hx-sagemaker-local  | http {
yiu6pb58hx-sagemaker-local  |   include /etc/nginx/mime.types;
yiu6pb58hx-sagemaker-local  |

INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 10
INFO:sagemaker.local.entities:Container still not up, got: -1
INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 15
INFO:sagemaker.local.entities:Container still not up, got: -1
INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 20
INFO:sagemaker.local.entities:Container still not up, got: -1
INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 25
INFO:sagemaker.local.entities:Container still not up, got: -1
INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 30
INFO:sagemaker.local.entities:Container still not up, got: -1
INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 35
INFO:sagemaker.local.entities:Container still not up, got: -1
INFO:sagemaker.local.entities:Checking if serving container is up, attempt: 40
INFO:sagemaker.local.entities:Container still not up, got: -1
INFO:sagemake

In [None]:
# Get output files from processing job
processing_job_name = steps["PipelineExecutionSteps"][0]["Metadata"]["ProcessingJob"]["Arn"]
outputs = pipeline_session.sagemaker_client.describe_processing_job(
    ProcessingJobName=processing_job_name
)["ProcessingOutputConfig"]["Outputs"]
for key in outputs:
    print(outputs[key]["S3Output"]["S3Uri"])

In [None]:
# Get output from training job

training_job_name = steps["PipelineExecutionSteps"][1]["Metadata"]["TrainingJob"]["Arn"]
outputs = pipeline_session.sagemaker_client.describe_training_job(
    TrainingJobName=training_job_name
)
print("Model location : ", outputs["ModelArtifacts"]["S3ModelArtifacts"])

In [None]:
# Get output of ModelStep
model_name = steps["PipelineExecutionSteps"][-2]["Metadata"]["Model"]["Arn"]
outputs = pipeline_session.sagemaker_client.describe_model(ModelName=model_name)
print(outputs)

In [None]:
# Get output from the TransformStep

transform_job_name = steps["PipelineExecutionSteps"][-1]["Metadata"]["TransformJob"]["Arn"]
outputs = pipeline_session.sagemaker_client.describe_transform_job(
    TransformJobName=transform_job_name
)
print(outputs)

## Not Supported Steps
Certain steps, such as RegisterModel and Tuning, are not supported in local mode.

In [None]:
# Model Register Step
#ClientError: An error occurred (ValidationException) when calling the start_pipeline_execution operation: Step type RegisterModel is not supported in local mode.
#step_register = RegisterModel(
#    name=f"{base_job_name}RegisterModel",
#    estimator=xgb_train,
#    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
#    content_types=["text/csv"],
#    response_types=["text/csv"],
#    model_package_group_name=mg_name,
#    approval_status="PendingManualApproval",
#    description=f'commit_id={commit}',
#    customer_metadata_properties={"SM_PROJECT_NAME": sm_project_name,
#                                  "SM_PROJECT_ID": sm_project_id,
#                                  "INSTANCE_TYPE": instance_type,
#                                  "INSTANCE_COUNT": str(instance_count),
#                                  "ENV_EXAMPLE": "VariablesStartWithENV_PassedToEndpointRuntime"
#                                  },
#    metadata_properties=MetadataProperties(commit_id=commit, project_id=sm_project_id, repository=repo_name)
#)

In [None]:
#ClientError: An error occurred (ValidationException) when calling the start_pipeline_execution operation: Step type Tuning is not supported in local mode.
#objective_metric_name = "validation:rmse"

#hyperparameter_ranges = {
#    "alpha": ContinuousParameter(0.01, 10, scaling_type="Logarithmic"),
#    "lambda": ContinuousParameter(0.01, 10, scaling_type="Logarithmic"),
#}

#tuner = HyperparameterTuner(
#    xgb_train,
#    objective_metric_name,
#    hyperparameter_ranges,
#    max_jobs=3,
#    max_parallel_jobs=3,
#    strategy="Random",
#    objective_type="Minimize",
#)

#step_tuning = TuningStep(
#    name=f"{base_job_name}TuneModel",
#    tuner=tuner,
#    inputs={
#        "train": TrainingInput(
#            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
#            content_type="text/libsvm"
#        ),
#        "validation": TrainingInput(
#            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
#            content_type="text/libsvm"
#        )
#    },
#    cache_config=cache_config if cache_training else None,
#)

# This is an example to illustrate how a the name of the tuning job from the previous step can be used as the parent tuning job, in practice,
# it is unlikely to have the parent job run before the warm start job on each run. Typically the first tuning job would run and the pipeline
# would be altered to use tuning jobs with a warm start using the first job as the parent job.

#parent_tuning_job_name = (
#    step_tuning.properties.HyperParameterTuningJobName
#)  # Use the parent tuning job specific to the use case

#warm_start_config = WarmStartConfig(
#    WarmStartTypes.IDENTICAL_DATA_AND_ALGORITHM, parents={parent_tuning_job_name}
#)

#tuner_lwarm_start = HyperparameterTuner(
#    xgb_train,
#    objective_metric_name,
#    hyperparameter_ranges,
#    max_jobs=3,
#    max_parallel_jobs=3,
#    strategy="Random",
#    objective_type="Minimize",
#    warm_start_config=warm_start_config,
#)

#TuningStep(
#    name=f"{base_job_name}TuneModelWarmStart",
#    tuner=tuner_lwarm_start,
#    inputs={
#        "train": TrainingInput(
#            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
#            content_type="text/libsvm"
#        ),
#        "validation": TrainingInput(
#            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
#            content_type="text/libsvm"
#        )
#    },
#    cache_config=cache_config if cache_training else None,
#)