## Orchestrate Jobs to Train and Evaluate Models with Amazon SageMaker Pipeline

This notebook demonstrates an end-to-end SageMaker Pipeline for a Natural Gas demand forecasting project, including preprocessing, model training, evaluation, batch transform, and model registration and conditional execution.

**A SageMaker Pipeline**

The pipeline that you create follows a typical machine learning (ML) application pattern of preprocessing, training, evaluation, model creation, batch transformation, and model registration. In this notebook, we use Pipelines to build a repeatable workflow for Natural Gas demand forecasting.

**Dataset: Natural Gas Demand Forecasting**

This project uses a Natural Gas dataset to build a regression model that predicts gas demand based on historical observations and related explanatory variables.

- Problem Type- The task is a supervised regression problem, where the goal is to predict a continuous numerical target variable representing natural gas demand.

- Target Variable-The target variable represents:

(1) Natural Gas demand (e.g., daily or monthly consumption)

(2) Measured as a continuous numeric value

(3) Evaluated using regression metrics such as Mean Squared Error (MSE)

(4) The pipeline later uses MSE as the primary model quality metric in the Condition step to determine whether the model passes the quality threshold.







In [1]:
import sys, time

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_sess = sagemaker.session.Session()
region = sagemaker_sess.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_sess.default_bucket()
model_package_group_name = f"NaturalGasModelPackage"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
import sagemaker

sess = sagemaker.Session()
bucket = sess.default_bucket()
region = sess.boto_region_name

local_path = "data.csv"

base_uri = f"s3://{bucket}/natural-gas/input"

input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)

print("input_data_uri:", input_data_uri)

input_data_uri: s3://sagemaker-us-east-1-137225474160/natural-gas/input/data.csv


In [3]:
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.workflow.steps import CacheConfig

# Define a cache that expires after 1 hour (PT1H)
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
# Setup Data Capture (Required for CloudWatch to see inference data)
data_capture_prefix = "sagemaker/natural-gas/datacapture"
s3_capture_upload_path = f"s3://{default_bucket}/{data_capture_prefix}"

data_capture_config = DataCaptureConfig(
    enable_capture=True, 
    sampling_percentage=100, 
    destination_s3_uri=s3_capture_upload_path
)

## Define Parameters to Parametrize Pipeline Execution

Pipeline parameters allow you to run the same pipeline multiple times with different values (e.g., instance types, data locations, thresholds, approval status).

In this project, key parameters include:
- Input data location (S3)
- Processing / Training instance types and counts
- Model approval status (if using Model Registry)
- MSE threshold used in the quality gate (Condition step)

In [4]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)


instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

mse_threshold = ParameterFloat(name="MseThreshold", default_value=10000000.0)

In [5]:
import os

os.getcwd()

'/home/sagemaker-user/NG-demand-forecasting'

## Define a Processing Step for Feature Engineering

This Processing step performs dataset-specific preprocessing and feature engineering, and writes artifacts back to S3 for downstream steps.

Typical outputs:
- `train/` (training data)
- `test/` (test data)
- `batch/`

In [6]:
%%writefile /home/sagemaker-user/NG-demand-forecasting/code/preprocessing.py

#!/usr/bin/env python3
import argparse
import os
import glob
import pandas as pd
import numpy as np


def parse_args():
    parser = argparse.ArgumentParser()

    # Input directory/file (SageMaker Processing input)
    parser.add_argument("--input-dir", type=str, default="/opt/ml/processing/input")
    parser.add_argument("--input-file", type=str, default="data.csv")

    # Output directories (SageMaker Processing outputs)
    parser.add_argument("--train-output", type=str, default="/opt/ml/processing/train")
    parser.add_argument("--validation-output", type=str, default="/opt/ml/processing/validation")
    parser.add_argument("--test-output", type=str, default="/opt/ml/processing/test")
    parser.add_argument("--batch-output", type=str, default="/opt/ml/processing/batch")

    # Separate metadata output (IMPORTANT: keep non-CSV out of train/val/test)
    parser.add_argument("--metadata-output", type=str, default="/opt/ml/processing/metadata")

    # Column configuration (for your Kaggle natural gas dataset)
    parser.add_argument("--year-col", type=str, default="year")
    parser.add_argument("--month-col", type=str, default="month")
    parser.add_argument("--target-col", type=str, default="value")
    parser.add_argument("--units-col", type=str, default="units")

    # Slice selection (recommended for time-series lags; otherwise auto-picks most frequent)
    parser.add_argument("--series-col", type=str, default="series")
    parser.add_argument("--series-filter", type=str, default="")
    parser.add_argument("--duoarea-col", type=str, default="duoarea")
    parser.add_argument("--duoarea-filter", type=str, default="")

    # Feature engineering settings
    parser.add_argument("--max-lag", type=int, default=12)
    parser.add_argument("--roll-window-1", type=int, default=3)
    parser.add_argument("--roll-window-2", type=int, default=12)

    # Time splits
    parser.add_argument("--test-months", type=int, default=12)
    parser.add_argument("--val-months", type=int, default=6)

    # Batch transform window (features only). If 0 -> same as test window.
    parser.add_argument("--batch-months", type=int, default=0)

    # Output formatting for built-in XGBoost (NO header when flag is set)
    parser.add_argument("--no-header", action="store_true")

    return parser.parse_args()


def ensure_dirs(*dirs):
    for d in dirs:
        os.makedirs(d, exist_ok=True)


def safe_to_numeric(s):
    return pd.to_numeric(s, errors="coerce")


def choose_most_frequent_value(df, col):
    vc = df[col].value_counts(dropna=True)
    if vc.empty:
        raise ValueError(f"Cannot auto-select {col}: no non-null values.")
    return vc.index[0]


def add_time_features(df, ds_col="ds"):
    df["year"] = df[ds_col].dt.year
    df["month"] = df[ds_col].dt.month
    df["quarter"] = df[ds_col].dt.quarter
    df["month_sin"] = np.sin(2 * np.pi * df["month"] / 12.0)
    df["month_cos"] = np.cos(2 * np.pi * df["month"] / 12.0)
    return df


def add_lag_features(df, y_col="y", max_lag=12):
    for lag in range(1, max_lag + 1):
        df[f"lag_{lag}"] = df[y_col].shift(lag)
    return df


def add_rolling_features(df, y_col="y", windows=(3, 12)):
    for w in windows:
        df[f"roll_mean_{w}"] = df[y_col].rolling(window=w).mean()
        df[f"roll_std_{w}"] = df[y_col].rolling(window=w).std()
        df[f"roll_min_{w}"] = df[y_col].rolling(window=w).min()
        df[f"roll_max_{w}"] = df[y_col].rolling(window=w).max()
    return df


def resolve_input_path(input_dir: str, input_file: str) -> str:
    # Prefer explicit file name if present
    candidate = os.path.join(input_dir, input_file)
    if os.path.exists(candidate):
        return candidate

    # Otherwise, pick first CSV in directory
    csvs = sorted(glob.glob(os.path.join(input_dir, "*.csv")))
    if not csvs:
        raise FileNotFoundError(
            f"No CSV found in {input_dir}. Expected {candidate} or any *.csv file."
        )
    return csvs[0]


def main():
    args = parse_args()

    # Resolve input path robustly
    input_path = resolve_input_path(args.input_dir, args.input_file)
    df = pd.read_csv(input_path)

    # Validate columns exist
    required = {args.year_col, args.month_col, args.target_col, args.series_col, args.duoarea_col}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}. Found: {list(df.columns)}")

    # Basic cleaning
    df[args.year_col] = safe_to_numeric(df[args.year_col])
    df[args.month_col] = safe_to_numeric(df[args.month_col])
    df[args.target_col] = safe_to_numeric(df[args.target_col])

    df = df.dropna(subset=[args.year_col, args.month_col, args.target_col]).copy()
    df[args.year_col] = df[args.year_col].astype(int)
    df[args.month_col] = df[args.month_col].astype(int)
    df = df[(df[args.month_col] >= 1) & (df[args.month_col] <= 12)].copy()

    # Build ds and y
    df["ds"] = pd.to_datetime(
        dict(year=df[args.year_col], month=df[args.month_col], day=1),
        errors="coerce",
    )
    df = df.dropna(subset=["ds"]).copy()
    df["y"] = df[args.target_col].astype(float)

    # Select ONE series slice
    if args.series_filter.strip():
        df = df[df[args.series_col].astype(str) == args.series_filter.strip()].copy()
        selected_series = args.series_filter.strip()
    else:
        selected_series = choose_most_frequent_value(df, args.series_col)
        df = df[df[args.series_col] == selected_series].copy()

    if args.duoarea_filter.strip():
        df = df[df[args.duoarea_col].astype(str) == args.duoarea_filter.strip()].copy()
        selected_duoarea = args.duoarea_filter.strip()
    else:
        selected_duoarea = choose_most_frequent_value(df, args.duoarea_col)
        df = df[df[args.duoarea_col] == selected_duoarea].copy()

    # Optionally ensure a single unit (helps consistency)
    selected_unit = None
    if args.units_col in df.columns:
        df[args.units_col] = df[args.units_col].astype(str)
        selected_unit = choose_most_frequent_value(df, args.units_col)
        df = df[df[args.units_col] == selected_unit].copy()

    # Aggregate duplicates by month
    df = (
        df.groupby("ds", as_index=False)
          .agg(y=("y", "mean"))
          .sort_values("ds")
          .reset_index(drop=True)
    )

    # Feature engineering
    df_feat = df.copy()
    df_feat = add_time_features(df_feat, ds_col="ds")
    df_feat = add_lag_features(df_feat, y_col="y", max_lag=args.max_lag)
    df_feat = add_rolling_features(df_feat, y_col="y", windows=(args.roll_window_1, args.roll_window_2))

    # Drop rows created by lags/rolling
    df_feat = df_feat.dropna().reset_index(drop=True)

    # Ensure enough rows
    test_months = int(args.test_months)
    val_months = int(args.val_months)
    if df_feat.shape[0] < (test_months + val_months + 5):
        raise ValueError(
            f"Not enough rows after feature engineering. Got {df_feat.shape[0]} rows. "
            f"Need at least {test_months + val_months + 5}. "
            f"Reduce lags/windows or choose a series with longer history."
        )

    # Split (time-based)
    n = df_feat.shape[0]
    test_start = n - test_months
    val_start = test_start - val_months

    train_df = df_feat.iloc[:val_start].copy()
    val_df = df_feat.iloc[val_start:test_start].copy()
    test_df = df_feat.iloc[test_start:].copy()

    # Features (exclude ds and y)
    feature_cols = [c for c in df_feat.columns if c not in ["ds", "y"]]

    X_train, y_train = train_df[feature_cols], train_df["y"]
    X_val, y_val = val_df[feature_cols], val_df["y"]
    X_test, y_test = test_df[feature_cols], test_df["y"]

    # Batch (features only)
    batch_months = int(args.batch_months) if int(args.batch_months) > 0 else test_months
    batch_df = df_feat.iloc[-batch_months:].copy()
    X_batch = batch_df[feature_cols].copy()

    # Write ONLY CSVs into train/val/test/batch folders
    ensure_dirs(args.train_output, args.validation_output, args.test_output, args.batch_output, args.metadata_output)

    header = not args.no_header

    def write_label_first_csv(out_dir, filename, X, y):
        # label first column for built-in XGBoost
        out_path = os.path.join(out_dir, filename)
        out = pd.concat([y.reset_index(drop=True), X.reset_index(drop=True)], axis=1)
        out.to_csv(out_path, index=False, header=header)

    def write_features_only_csv(out_dir, filename, X):
        out_path = os.path.join(out_dir, filename)
        X.to_csv(out_path, index=False, header=header)

    write_label_first_csv(args.train_output, "train.csv", X_train, y_train)
    write_label_first_csv(args.validation_output, "validation.csv", X_val, y_val)
    write_label_first_csv(args.test_output, "test.csv", X_test, y_test)
    write_features_only_csv(args.batch_output, "batch.csv", X_batch)

    # Metadata goes to metadata folder ONLY (keeps XGBoost channels clean)
    meta = {
        "input_path": input_path,
        "selected_series": selected_series,
        "selected_duoarea": selected_duoarea,
        "selected_unit": selected_unit,
        "rows_after_filter": int(df.shape[0]),
        "rows_after_feat_dropna": int(df_feat.shape[0]),
        "train_rows": int(train_df.shape[0]),
        "val_rows": int(val_df.shape[0]),
        "test_rows": int(test_df.shape[0]),
        "batch_rows": int(X_batch.shape[0]),
        "feature_count": int(len(feature_cols)),
        "max_lag": int(args.max_lag),
        "roll_windows": [int(args.roll_window_1), int(args.roll_window_2)],
        "test_months": int(test_months),
        "val_months": int(val_months),
        "no_header": bool(args.no_header),
    }

    pd.Series(meta).to_json(os.path.join(args.metadata_output, "preprocess_meta.json"))
    pd.Series(feature_cols).to_csv(
        os.path.join(args.metadata_output, "feature_columns.csv"),
        index=False,
        header=["feature"],
    )

    print("Preprocessing complete.")
    print(meta)


if __name__ == "__main__":
    main()

Overwriting /home/sagemaker-user/NG-demand-forecasting/code/preprocessing.py


**Creating an instance of a SKLearnProcessor processor**

In [7]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=instance_type,                 # <-- change this
    instance_count=processing_instance_count,
    base_job_name="sklearn-natgas-process",      # optional rename
    role=role,
    sagemaker_session=pipeline_session,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


**Taking the output of the processor's run method and pass that as arguments to the ProcessingStep**

Here generated a batch feature file from the latest window inside preprocessing, then exposed it as ProcessingOutput(output_name="batch"), which is exactly why our TransformStep can run without a second source.

In [8]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ProcessingOutput(output_name="batch", source="/opt/ml/processing/batch"),
    ],
    code="/home/sagemaker-user/NG-demand-forecasting/code/preprocessing.py",
    arguments=["--no-header"],                   
)

step_process = ProcessingStep(name="NatGasProcess", step_args=processor_args, cache_config=cache_config)



# Define a Training Step to Train a Model

This step trains a regression model using the processed training dataset produced by the previous step.

The trained model artifacts are saved to S3 (e.g., `model.tar.gz`) and are later consumed by evaluation, CreateModel, and/or RegisterModel steps.

In [9]:
import sagemaker
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

# Store model artifacts here
model_path = f"s3://{default_bucket}/NaturalGasTrain"

# Built-in XGBoost container
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=instance_type,
)

# Estimator
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)

# Hyperparameters (regression)
xgb_train.set_hyperparameters(
    objective="reg:squarederror",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,

)

# Fit args (these are pipeline step args, not an immediate training job)
train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

# Training step
step_train = TrainingStep(
    name="NaturalGasTrain",
    step_args=train_args,
    cache_config=cache_config
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


# Define a Model Evaluation Step to Evaluate the Trained Model

This Processing step evaluates the trained model on the test dataset and writes an evaluation report (JSON) to S3.

For this project, we track regression metrics such as Mean Squared Error (MSE). The downstream Condition step reads the metric value from the evaluation report and decides whether to proceed or fail the pipeline.

In [10]:
%%writefile /home/sagemaker-user/NG-demand-forecasting/code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost
from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    # Model artifact from TrainingStep
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    # Built-in XGBoost training writes the model file named "xgboost-model"
    model = pickle.load(open("xgboost-model", "rb"))

    # Test data from ProcessingStep output: headerless, label-first
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    X_test = df.iloc[:, 1:].to_numpy()

    dtest = xgboost.DMatrix(X_test)
    preds = model.predict(dtest)

    mse = mean_squared_error(y_test, preds)
    std = float(np.std(y_test - preds))

    report_dict = {
        "regression_metrics": {
            "mse": {"value": float(mse), "standard_deviation": std},
        }
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        json.dump(report_dict, f)

Overwriting /home/sagemaker-user/NG-demand-forecasting/code/evaluation.py


**Creating an instance of a ScriptProcessor processor and use it in the ProcessingStep**

In [11]:
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

script_eval = ScriptProcessor(
    image_uri=image_uri,  # reuse the same xgboost image_uri
    command=["python3"],
    instance_type=instance_type,   # <-- use the same variable/parameter used elsewhere
    instance_count=1,
    base_job_name="script-natgas-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="/home/sagemaker-user/NG-demand-forecasting/code/evaluation.py",
)

**Using the processor's arguments returned by .run() to construct a ProcessingStep, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution**



In [12]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

step_eval = ProcessingStep(
    name="NaturalGasEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

# Define a Create Model Step to Create a Model

This step creates a SageMaker Model object from the training artifacts (model.tar.gz) and the inference container image. This model can be used for Batch Transform (offline inference) or endpoint deployment.

In [13]:
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
    name=f"natural-gas-model-{int(time.time())}"
)

step_create_model = ModelStep(
    name="NaturalGasCreateModel",
    step_args=model.create(
        instance_type=instance_type,   # or use transform_instance_type if you have it
    ),
)

# Define a Transform Step to Perform Batch Transformation

Batch Transform runs offline inference on a dataset stored in S3 (for example, a batch input created during preprocessing).

This is useful when you want predictions on large datasets without hosting a persistent endpoint.

In [14]:
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type=instance_type,  # or use transform_instance_type if you have a separate parameter
    instance_count=1,
    output_path=f"s3://{default_bucket}/NaturalGasTransform",
    assemble_with="Line",
)

step_transform = TransformStep(
    name="NaturalGasTransform",
    transformer=transformer,
    inputs=TransformInput(
        data=step_process.properties.ProcessingOutputConfig.Outputs["batch"].S3Output.S3Uri,
        content_type="text/csv",
        split_type="Line", # standard for CSV

    ),
)

# Define a Register Model Step to Create a Model Package

A model package groups all artifacts needed for inference (model data + inference image + metadata). Model packages can be versioned in a Model Package Group, enabling you to track and approve models over time.

If enabled, we register the trained model as a new version in the projectâ€™s Model Package Group.

In [15]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.model_step import ModelStep

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],   # keep simple / commonly available
    transform_instances=["ml.m5.large", "ml.m5.xlarge"],  # keep what your lab allows
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

step_register = ModelStep(
    name="NaturalGasRegisterModel",
    step_args=register_args,
)



# Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

This step reads the evaluation report and checks whether the model meets a quality threshold (e.g., MSE <= threshold).

- If the condition passes: proceed to CreateModel + Batch Transform (+ optional RegisterModel)
- If the condition fails: execute a Fail step with a clear error message so the pipeline run is marked as Failed

In [16]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="NaturalGasMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

In [17]:
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker import image_uris

# Retrieve the model monitor image
monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region)

model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=pipeline_session
)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


# Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State

In [18]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="NaturalGasMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

# Define a Pipeline of Parameters, Steps, and Conditions

In this section we combine:
- Parameters
- Processing / Training / Evaluation steps
- Condition + Fail logic
into a single Pipeline DAG that can be created/updated and executed in SageMaker.

In [19]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "NaturalGasPipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

**Submit the pipeline to SageMaker and start execution**

In [20]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:137225474160:pipeline/NaturalGasPipeline',
 'ResponseMetadata': {'RequestId': '64569090-28c6-4931-a522-e2a834f89dc2',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '64569090-28c6-4931-a522-e2a834f89dc2',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '108',
   'date': 'Mon, 23 Feb 2026 16:50:35 GMT'},
  'RetryAttempts': 0}}

In [21]:
execution = pipeline.start()

**Pipeline Operations: Examining and Waiting for Pipeline Execution**

In [22]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:137225474160:pipeline/NaturalGasPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:137225474160:pipeline/NaturalGasPipeline/execution/yrc71tnivyr0',
 'PipelineExecutionDisplayName': 'execution-1771865435902',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2026, 2, 23, 16, 50, 35, 846000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2026, 2, 23, 16, 50, 35, 846000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:137225474160:user-profile/d-u2y08ppawvnm/default-1771745007294',
  'UserProfileName': 'default-1771745007294',
  'DomainId': 'd-u2y08ppawvnm',
  'IamIdentity': {'Arn': 'arn:aws:sts::137225474160:assumed-role/LabRole/SageMaker',
   'PrincipalId': 'AROAR742GSBYNSR3W5KWG:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:137225474160:user-profile/d-u2y08ppawvnm/default-1771745007294',
  'UserProfileName': 'default-177174

In [None]:
execution.wait()

In [None]:
unique_endpoint_name = f"gas-endpoint-{int(time.time())}"

predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    data_capture_config=data_capture_config,
    endpoint_name=unique_endpoint_name
)

endpoint_name = predictor.endpoint_name
print(f"Endpoint deployed: {endpoint_name}")

In [None]:
# Checking the Failed Job's Arn
from sagemaker.workflow.selective_execution_config import SelectiveExecutionConfig

failed_execution_arn = execution.arn
failed_execution_arn

In [None]:
selective_execution_config = SelectiveExecutionConfig(
    source_pipeline_execution_arn=failed_execution_arn,
    selected_steps=["NaturalGasEval", "NaturalGasCond", "NaturalGasCreateModel"] # Add the create step here!
)

In [None]:
selective_execution = pipeline.start(
    selective_execution_config=selective_execution_config,
    parameters={"MseThreshold": 400000.0}
)

print(f"Selective Execution started: {selective_execution.arn}")

In [None]:
selective_execution.wait()

In [None]:
execution.list_steps()

In [None]:
selective_execution.list_steps()

**Examining the Evaluation**

Examine the resulting model evaluation after the pipeline completes

In [None]:
import json
from pprint import pprint
import sagemaker

evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
unique_endpoint_name = f"gas-endpoint-{int(time.time())}"
pprint(json.loads(evaluation_json))

In [None]:
from sagemaker.model_monitor import EndpointInput

# Set the schedule to run every 10 minutes
# Note: Use standard cron expression for 10-minute intervals
cron_10_min = "cron(0/10 * * * ? *)"

model_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name="NaturalGas-Quality-Monitor-10Min",
    endpoint_input=EndpointInput(
        endpoint_name=endpoint_name, # The endpoint name from your CI/CD pipeline
        destination="/opt/ml/processing/input/endpoint",
    ),
    output_s3_uri=f"s3://{bucket}/sagemaker/natural-gas/monitoring_reports",
    # Reference baseline metrics (ensure these variables are defined or hardcoded)
    statistics=s3_baseline_statistics_uri, 
    constraints=s3_baseline_constraints_uri,
    schedule_cron_expression=cron_10_min,
    enable_cloudwatch_metrics=True,
)