# End-to-End Machine Learning CI/CD Pipeline

This notebook defines and executes a fully automated end-to-end machine learning CI/CD pipeline using Amazon SageMaker Pipelines. The workflow covers data preprocessing, model training, evaluation, conditional quality gating, and controlled model registration.

Raw input data is ingested and validated in a dedicated preprocessing step, where schema consistency is enforced, features are selected, and datasets are split into training, validation, and test sets. A managed XGBoost training job is then launched using the processed data, producing versioned model artifacts. The trained model is evaluated on a held-out test set, and classification performance metrics (with F1 score as the primary metric) are computed and persisted as structured evaluation artifacts.

Model promotion is governed by an explicit quality gate: if the evaluated F1 score meets or exceeds a configurable threshold, the model is registered in a SageMaker Model Package Group and prepared for deployment; otherwise, the pipeline fails deterministically. All steps are parameterized to support reproducibility, environment portability, and controlled experimentation.

Together, these components form a reproducible, auditable ML lifecycle that integrates model validation and governance directly into the CI/CD process, ensuring that only models meeting defined quality standards are eligible for deployment.

## Imports and constants

In [1]:
import sys
import logging
import warnings
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

warnings.filterwarnings('ignore')
logging.getLogger("sagemaker.workflow.utilities").setLevel(logging.ERROR)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
try:
    # Get AWS Account ID
    account_id = boto3.client("sts").get_caller_identity()["Account"]
    print(f"Successfully retrieved AWS Account ID: {account_id}")
except Exception as e:
    print(f"Cannot retrieve account information: {e}")
    raise

Successfully retrieved AWS Account ID: 697347838118


In [3]:
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

BASE_BUCKET_NAME = f"yelp-aai540-group6-{account_id}"
default_bucket = BASE_BUCKET_NAME


MODEL_PREFIX = "models/"
MODEL_DIR = f"{BASE_BUCKET_NAME}/{MODEL_PREFIX}" 

In [4]:
model_package_group_name = f"venuesignal-model-group-"

In [5]:
input_data_uri = f"s3://yelp-aai540-group6-{account_id}/feature-store/training-data/alldata.csv"

## Define Parameters to Parametrize Pipeline Execution
Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

ParameterString - represents a str Python type
ParameterInteger - represents an int Python type
ParameterFloat - represents a float Python type
These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow include:

processing_instance_count - The instance count of the processing job.
instance_type - The ml.* instance type of the training job.
model_approval_status - The approval status to register with the trained model for CI/CD purposes ("PendingManualApproval" is the default).
input_data - The S3 bucket URI location of the input data.
mse_threshold - The Mean Squared Error (MSE) threshold used to verify the accuracy of a model.

In [6]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
f1_threshold = ParameterFloat(name="F1Threshold", default_value=.8)

## Define Preprocessing step

This preprocessing script prepares raw review-level data for downstream XGBoost model training, validation, and testing within a SageMaker Pipeline. Its primary purpose is to enforce a stable schema, sanitize heterogeneous input data, and emit model-ready CSV artifacts in the format required by the XGBoost training container.

### Input Handling and Schema Enforcement

The script ingests a single CSV file (alldata.csv) containing all splits (train, validation, test, production). Because upstream data sources may inconsistently include headers, the script first detects whether a header row is present and conditionally assigns column names. This prevents schema drift and avoids hard failures caused by misaligned headers or unexpected string tokens in numeric fields.

### Data Normalization and Type Coercion

To ensure numerical stability, all columns expected to be numeric are explicitly coerced using pd.to_numeric(errors="coerce"). Any invalid or non-parsable values are converted to NaN and subsequently imputed with zeros. This guarantees deterministic behavior during training and avoids runtime type errors in XGBoost.

The target label (is_highly_rated) is normalized to a binary integer representation (0/1), ensuring compatibility with binary classification objectives.

### Dataset Partitioning

The dataset is partitioned into training, validation, test, and production subsets using the split column. Non-model fields (event_time, split) are removed after filtering to prevent data leakage and ensure consistent feature vectors across all splits.

### Feature Selection and XGBoost Formatting

A curated subset of features is selected for model training, focusing on business attributes, parking-related signals, and engagement metrics. The output format strictly follows XGBoost conventions: the label column appears first, followed by feature columns, with no header row.

### Output Artifacts

The script writes three CSV artifacts to their respective SageMaker processing directories:

train/train.csv

validation/validation.csv

test/test.csv

These artifacts are consumed directly by downstream training and evaluation steps, enabling reproducible and automated CI/CD execution of the machine learning pipeline.

### Role in the CI/CD Pipeline

This preprocessing step acts as a contract boundary between raw data ingestion and model training. By aggressively validating schema, coercing types, and standardizing outputs, it ensures that downstream steps operate on clean, predictable inputs, reducing pipeline brittleness and improving overall system reliability.

In [7]:
%%writefile code/preprocessing.py
import argparse
import os
import tempfile
import requests

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

feature_columns_names = [
    "review_id",
    "business_id",
    "user_id",
    "mentions_parking",
    "parking_positive",
    "parking_negative",
    "parking_type_lot",
    "parking_type_street",
    "parking_type_garage",
    "parking_type_valet",
    "parking_free",
    "parking_paid",
    "enhanced_parking_score",
    "business_stars",
    "business_review_count",
    "avg_review_stars",
    "std_review_stars",
    "total_reviews",
    "avg_engagement",
    "pct_highly_rated",
    "has_parking_data",
    "parking_sentiment",
    "review_stars",
    "useful",
    "funny",
    "cool",
    "engagement_score",
    "is_engaged",
    "review_year",
    "review_month",
    "review_quarter",
    "is_restaurant",
    "price_range_numeric",
    "event_time",
    "split",
]

label_column = "is_highly_rated"
all_columns = feature_columns_names + [label_column]

xgb_features = [
    # Business features
    "avg_review_stars",
    "std_review_stars",
    "business_review_count",
    "pct_highly_rated",
    # Parking features
    "enhanced_parking_score",
    "parking_positive",
    "parking_negative",
    "parking_sentiment",
    "has_parking_data",
    # Review engagement
    "avg_engagement",
    # Business attributes
    "is_restaurant",
    "price_range_numeric",
]

# Columns that must be numeric
numeric_cols = [
    "mentions_parking",
    "parking_positive",
    "parking_negative",
    "parking_type_lot",
    "parking_type_street",
    "parking_type_garage",
    "parking_type_valet",
    "parking_free",
    "parking_paid",
    "enhanced_parking_score",
    "business_stars",
    "business_review_count",
    "avg_review_stars",
    "std_review_stars",
    "total_reviews",
    "avg_engagement",
    "pct_highly_rated",
    "has_parking_data",
    "parking_sentiment",
    "review_stars",
    "useful",
    "funny",
    "cool",
    "engagement_score",
    "is_engaged",
    "review_year",
    "review_month",
    "review_quarter",
    "is_restaurant",
    "price_range_numeric",
    label_column,
]


def prepare_xgb_data(df: pd.DataFrame, features, target: str = "is_highly_rated") -> pd.DataFrame:
    """Prepare data in XGBoost format: target first, no header."""
    X = df[features].fillna(0)
    y = df[target]
    return pd.concat([y, X], axis=1)


def _detect_header(csv_path: str, expected_first_col: str = "review_id") -> bool:
    """Heuristic: if first cell equals the expected first column name, treat as header."""
    with open(csv_path, "r", encoding="utf-8") as f:
        first_line = f.readline().strip()

    if not first_line:
        return False

    first_cell = first_line.split(",")[0].strip().strip('"').strip("'")
    return first_cell == expected_first_col


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    csv_path = f"{base_dir}/input/alldata.csv"

    # 1) Read CSV without forcing dtypes at read-time
    has_header = _detect_header(csv_path, expected_first_col=feature_columns_names[0])
    if has_header:
        df = pd.read_csv(csv_path, header=0)
        # Ensure we have exactly the columns we expect, in the right order
        df = df[all_columns]
    else:
        df = pd.read_csv(csv_path, header=None, names=all_columns)

    # 2) Normalize split + event_time to strings (split is used for filtering).
    df["split"] = df["split"].astype(str).str.strip().str.lower()
    df["event_time"] = df["event_time"].astype(str)

    # 3) Coerce numeric columns safely; invalid tokens become NaN.
    for c in numeric_cols:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    # 4) Impute missing numeric values (simple, deterministic).
    df[numeric_cols] = df[numeric_cols].fillna(0)

    # 5) Enforce label to 0/1 (robust if it was float/str).
    df[label_column] = (df[label_column] > 0).astype(np.int64)

    # 6) Split datasets and drop non-model fields.
    drop_cols = ["event_time", "split"]
    train_df = df[df["split"] == "train"].drop(columns=drop_cols)
    validation_df = df[df["split"] == "validation"].drop(columns=drop_cols)
    test_df = df[df["split"] == "test"].drop(columns=drop_cols)
    production_df = df[df["split"] == "production"].drop(columns=drop_cols)  # kept for parity

    # 7) Prepare XGBoost input (target first, then features; no header).
    train_xgb = prepare_xgb_data(train_df, xgb_features, target=label_column)
    val_xgb = prepare_xgb_data(validation_df, xgb_features, target=label_column)
    test_xgb = prepare_xgb_data(test_df, xgb_features, target=label_column)

    # 8) Write outputs.
    os.makedirs(f"{base_dir}/train", exist_ok=True)
    os.makedirs(f"{base_dir}/validation", exist_ok=True)
    os.makedirs(f"{base_dir}/test", exist_ok=True)

    train_xgb.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    val_xgb.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test_xgb.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting code/preprocessing.py


## Processing Infrastructure Configuration

This cell defines the processing infrastructure used to execute the preprocessing step within the SageMaker Pipeline. It instantiates an `SKLearnProcessor`, which provisions a managed, ephemeral compute environment for running scikit-learn–compatible processing scripts.

### Processor Type and Framework
The `SKLearnProcessor` is configured with a specific scikit-learn framework version (`1.2-1`), ensuring that the preprocessing environment is fully reproducible and consistent across pipeline executions. Pinning the framework version prevents unexpected behavior caused by upstream library changes.

### Compute Configuration
The processor runs on `ml.m5.xlarge` instances, a general-purpose instance type well-suited for data preprocessing workloads such as type coercion, feature selection, and dataset partitioning. The number of instances is parameterized via `processing_instance_count`, allowing the pipeline to scale horizontally without code changes.

### Execution Context
The processor is associated with:
- an IAM role (`role`) that grants access to required AWS resources (e.g., S3),
- a SageMaker Pipeline session (`pipeline_session`) that integrates the processing job into the broader CI/CD workflow,
- and a descriptive base job name (`sklearn-venue-signal-process`) to support traceability and observability in the SageMaker console.

### Role in the Pipeline
This processor serves as the execution environment for the preprocessing script that validates schema, cleans raw data, and produces model-ready artifacts. By isolating preprocessing in a managed processing step, the pipeline enforces a clean separation of concerns between data preparation, model training, and evaluation, improving reliability and debuggability.

In [8]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-venue-signal-process",
    role=role,
    sagemaker_session=pipeline_session,
)

## Processing Step Definition

This cell defines a SageMaker Pipeline processing step responsible for executing the preprocessing logic and producing model-ready datasets. It wires the previously configured `SKLearnProcessor` to concrete inputs, outputs, and execution code, and registers the step within the pipeline graph.

### Input Configuration
The processing step consumes raw input data via a `ProcessingInput`, which maps the pipeline parameter `input_data` (typically an S3 URI) to the container path `/opt/ml/processing/input`. This standardized directory structure allows the preprocessing script to reliably locate incoming data regardless of execution context.

### Output Artifacts
The step declares three `ProcessingOutput` channels:
- `train` → `/opt/ml/processing/train`
- `validation` → `/opt/ml/processing/validation`
- `test` → `/opt/ml/processing/test`

Each output corresponds to a dataset split generated by the preprocessing script. These outputs are automatically uploaded to S3 and made available to downstream pipeline steps, such as training and evaluation, without manual data movement.

### Execution Logic
The preprocessing logic is defined in `code/preprocessing.py`, which is executed inside the managed scikit-learn processing container. The script performs schema validation, type coercion, feature selection, dataset splitting, and formatting required by the XGBoost training step.

### Pipeline Integration
The `ProcessingStep` named `VenueSignal` encapsulates the full preprocessing operation as a first-class pipeline component. By explicitly defining inputs, outputs, and execution code, this step enables reproducible, traceable, and fully automated data preparation within the CI/CD pipeline.

This step serves as the contract boundary between raw data ingestion and model training, ensuring downstream steps receive clean, validated, and consistently formatted data artifacts.

In [9]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="VenueSignal", step_args=processor_args)

## Model Training Configuration

This cell defines the model training step within the SageMaker Pipeline using the managed XGBoost algorithm. It configures the training container, hyperparameters, and data inputs, and connects the step to the outputs of the preprocessing stage.

### Training Image and Environment
The XGBoost training container is retrieved dynamically using `sagemaker.image_uris.retrieve`, specifying:
- the XGBoost framework,
- version `1.7-1`,
- Python 3 runtime,
- and the target instance type.

This ensures the training environment is fully managed, reproducible, and aligned with SageMaker’s optimized XGBoost implementation.

### Estimator Configuration
An `Estimator` is instantiated to define the training job:
- `instance_type` and `instance_count` control the compute resources used for training,
- `output_path` specifies the S3 location where trained model artifacts are stored,
- `role` provides permissions to access required AWS resources,
- `pipeline_session` integrates the estimator into the SageMaker Pipeline execution context.

### Hyperparameter Specification
The model is configured for binary classification using the `binary:logistic` objective. Key hyperparameters include:
- tree depth, learning rate (`eta`), and subsampling parameters to control model complexity,
- `eval_metric='auc'` to evaluate ranking performance during training,
- `early_stopping_rounds` to prevent overfitting by halting training when validation performance stops improving.

These hyperparameters are explicitly defined to ensure consistent and repeatable training behavior across pipeline runs.

### Training Data Inputs
The training step consumes two datasets:
- a training dataset,
- and a validation dataset.

Both datasets are provided as CSV files produced by the preprocessing step and referenced directly via their S3 output URIs. This explicit dependency ensures that training always operates on the most recent, validated preprocessing outputs.

### Role in the Pipeline
This training configuration encapsulates the full model-fitting process as a deterministic pipeline step. By sourcing inputs from the preprocessing stage and emitting versioned model artifacts to S3, it enables automated retraining, evaluation, and downstream deployment as part of a CI/CD workflow.

In [10]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

xgb_output_path = f"s3://{MODEL_DIR}xgboost-output"

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.7-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=xgb_output_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective='binary:logistic',
    num_round=100,
    max_depth=6,
    eta=0.3,
    gamma=0,
    min_child_weight=1,
    subsample=0.8,
    colsample_bytree=0.8,
    eval_metric='auc',
    early_stopping_rounds=10
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

## Training Step Definition 

This cell registers the model training operation as a first-class step within the SageMaker Pipeline. It wraps the previously configured XGBoost estimator and its training arguments into a `TrainingStep`, enabling the training job to participate in the pipeline’s dependency graph and execution flow.

### Step Construction
The `TrainingStep` is instantiated with a descriptive name (`VenueSignalTrain`) and the `step_args` produced by the estimator’s `fit` call. These arguments fully specify the training configuration, including compute resources, hyperparameters, input datasets, and output artifact locations.

### Pipeline Integration
By defining training as a pipeline step, SageMaker can:
- track the lineage between preprocessing outputs and trained model artifacts,
- automatically resolve execution order based on data dependencies,
- retry or re-execute training deterministically as part of CI/CD workflows.

### Role in the Pipeline
This step encapsulates the core model-fitting logic and produces versioned model artifacts that can be consumed by downstream evaluation, registration, and deployment steps. Treating training as an explicit pipeline step ensures reproducibility, traceability, and seamless integration with automated model lifecycle management.

In [11]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="VenueSignalTrain",
    step_args=train_args,
)

## Model Evaluation Script

This cell writes the evaluation script used by the pipeline to quantify model performance on the held-out test set and produce a machine-readable metrics artifact for downstream quality gating and reporting.

### Model Artifact Loading
The script reads the trained model from `/opt/ml/processing/model/model.tar.gz`, extracts its contents, and loads the model file (`xgboost-model`). It attempts to load the artifact as an XGBoost `Booster` (the standard format for SageMaker’s built-in XGBoost training output) and falls back to Python pickle loading only if the Booster load fails. This makes the evaluation step robust to different serialization formats.

### Test Data Ingestion
The script loads `/opt/ml/processing/test/test.csv`, which is expected to be in XGBoost-compatible format: the first column is the ground-truth label (`is_highly_rated`) and the remaining columns are model features. Labels are cast to integer (0/1), and features are converted into an `xgboost.DMatrix` for efficient prediction.

### Prediction and Thresholding
The model generates predicted probabilities for the positive class (consistent with `binary:logistic`). These probabilities are converted into class predictions using a fixed decision threshold of 0.8. This produces a deterministic mapping from probabilities to labels for metric computation.

### Metric Computation
The primary metric computed is F1 score, which summarizes the precision–recall tradeoff and is useful for imbalanced binary classification. Precision, recall, and accuracy are also computed as supporting diagnostics. If both classes are present in the test set, ROC-AUC is additionally computed using the predicted probabilities.

### Metrics Artifact Output
All computed metrics are written to `/opt/ml/processing/evaluation/evaluation.json` in a structured JSON format. This evaluation artifact is designed to be consumed by subsequent pipeline steps (e.g., a Condition step enforcing an F1 threshold) and to support reproducible, auditable model assessment within the CI/CD workflow.

In [12]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, roc_auc_score


def load_xgb_model(model_filename: str):
    """Load XGBoost model (Booster preferred; pickle fallback)."""
    try:
        booster = xgboost.Booster()
        booster.load_model(model_filename)
        return booster
    except Exception:
        pass

    try:
        with open(model_filename, "rb") as f:
            return pickle.load(f)
    except Exception as e:
        raise RuntimeError(
            f"Unable to load model '{model_filename}'. Expected XGBoost Booster or pickle. Error: {e}"
        )


if __name__ == "__main__":
    # Extract model artifact
    model_tar_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_tar_path) as tar:
        tar.extractall(path=".")

    model_file = "xgboost-model"
    model = load_xgb_model(model_file)

    # Load test data (label in column 0)
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy().astype(int)
    X_test = df.iloc[:, 1:].to_numpy()

    dtest = xgboost.DMatrix(X_test)

    # Predict probabilities (binary:logistic)
    probs = model.predict(dtest)

    # Convert to class predictions
    y_pred = (probs >= 0.5).astype(int)

    # Metrics
    f1 = f1_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, zero_division=0)
    recall = recall_score(y_test, y_pred, zero_division=0)
    accuracy = accuracy_score(y_test, y_pred)

    metrics = {
        "f1": {"value": float(f1)},
        "precision": {"value": float(precision)},
        "recall": {"value": float(recall)},
        "accuracy": {"value": float(accuracy)},
    }

    # AUC only valid if both classes present
    if len(np.unique(y_test)) == 2:
        try:
            auc = roc_auc_score(y_test, probs)
            metrics["auc"] = {"value": float(auc)}
        except Exception:
            pass

    report_dict = {
        "classification_metrics": metrics
    }

    # Write evaluation output
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        json.dump(report_dict, f)

Overwriting code/evaluation.py


## Model Evaluation Infrastructure and Execution (ScriptProcessor)

This cell defines the infrastructure and execution logic for the model evaluation step within the SageMaker Pipeline. It uses a `ScriptProcessor` to run a custom evaluation script against the trained model and the held-out test dataset, producing standardized performance metrics for downstream decision making.

### Processor Configuration
A `ScriptProcessor` is instantiated using the same XGBoost container image employed during training. This ensures that the evaluation environment matches the training runtime exactly, eliminating discrepancies caused by library or framework version mismatches.

The processor is configured to:
- run a Python entrypoint (`python3`),
- execute on an `ml.m5.xlarge` instance,
- use a single instance for deterministic evaluation,
- and operate under the same IAM role and pipeline session as other pipeline steps.

### Input Artifacts
The evaluation step consumes two inputs:

1. **Trained Model Artifact**  
   The model produced by the training step is passed in via `step_train.properties.ModelArtifacts.S3ModelArtifacts` and mounted at `/opt/ml/processing/model`. This artifact contains the serialized XGBoost model (`model.tar.gz`) generated during training.

2. **Test Dataset**  
   The test split generated by the preprocessing step is passed in via the `test` processing output and mounted at `/opt/ml/processing/test`. This ensures evaluation is always performed on data that has been processed with the same logic as training.

### Evaluation Execution
The evaluation logic is defined in `code/evaluation.py`, which:
- extracts and loads the trained model,
- generates predictions on the test set,
- computes classification metrics (including F1 score as the primary metric),
- and writes a structured metrics report to disk.

### Output Artifact
The evaluation results are written to `/opt/ml/processing/evaluation` and exposed as a named processing output (`evaluation`). This directory contains a machine-readable `evaluation.json` file that can be consumed by downstream pipeline steps, such as a conditional quality gate or model registration logic.

### Role in the Pipeline
This evaluation step formalizes model assessment as a first-class pipeline component. By explicitly tying evaluation to the outputs of both preprocessing and training, the pipeline ensures that model quality checks are reproducible, traceable, and fully automated as part of the CI/CD workflow.

In [13]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-venue-signal-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

## Evaluation Step Registration and Metrics Exposure

This cell registers the model evaluation operation as a first-class SageMaker Pipeline step and exposes the evaluation metrics as structured pipeline properties that can be referenced by downstream steps.

### PropertyFile Definition
A `PropertyFile` named `EvaluationReport` is defined to map a structured metrics artifact produced by the evaluation script. It specifies:
- the logical name of the property file (`EvaluationReport`),
- the processing output channel (`evaluation`),
- and the relative path to the metrics file (`evaluation.json`).

This configuration allows SageMaker Pipelines to parse the evaluation output and make individual metrics addressable via the pipeline’s property system.

### Evaluation Step Construction
The `ProcessingStep` named `VenueSignalEval` wraps the previously defined evaluation execution arguments (`eval_args`) and associates them with the declared `PropertyFile`. This formally registers the evaluation logic within the pipeline graph and binds its outputs to pipeline-accessible properties.

### Metrics Accessibility
By attaching the `PropertyFile` to the evaluation step, metrics such as F1 score, precision, recall, and AUC become accessible through expressions like:
`step_eval.properties.PropertyFiles["EvaluationReport"]`

These properties can be used directly in conditional logic (e.g., quality gates), model registration decisions, or reporting steps without manual parsing or data movement.

### Role in the Pipeline
This step serves as the bridge between raw evaluation output and automated decision-making. By converting evaluation results into structured pipeline properties, it enables deterministic, auditable model quality enforcement as part of the CI/CD workflow, such as approving, rejecting, or registering models based on predefined performance thresholds.

In [14]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="VenueSignalEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Model Definition

This cell defines a SageMaker `Model` object that packages the trained model artifact together with its inference container configuration. It represents a deployable model entity that can be registered, approved, or deployed by downstream pipeline steps.

### Model Artifact Binding
The `model_data` parameter references the S3 location of the trained model artifacts produced by the training step (`step_train.properties.ModelArtifacts.S3ModelArtifacts`). This creates a direct lineage between the training output and the deployable model definition, ensuring traceability and reproducibility.

### Inference Environment
The model is associated with a specific container image via `image_uri`. In this case, the same XGBoost image used for training is reused for inference, guaranteeing consistency between training and serving environments and avoiding runtime incompatibilities.

### Execution Context
The model is instantiated with:
- an IAM role (`role`) that grants permissions required for deployment and inference,
- and a SageMaker Pipeline session (`pipeline_session`) that integrates the model into the pipeline’s execution context.

### Role in the Pipeline
This step formalizes the trained artifact as a SageMaker model object that can be consumed by subsequent steps such as model registration, conditional approval, or deployment. By separating model definition from training, the pipeline enables controlled promotion of models based on evaluation results and supports automated model lifecycle management within the CI/CD workflow.

In [15]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

## Model Creation Step

This cell defines a SageMaker Pipeline `ModelStep` that materializes the trained model as a deployable inference resource. It converts the abstract `Model` definition into a concrete SageMaker model entity that can be used for real-time or batch inference.

### Model Instantiation
The `ModelStep` invokes the `create` method on the previously defined SageMaker `Model` object. This operation binds together the trained model artifacts, the inference container image, and the target compute configuration into a single deployable model instance.

### Inference Compute Configuration
The model is configured to run on an `ml.m5.large` instance, which provides a cost-efficient general-purpose environment suitable for low- to moderate-throughput inference workloads. An Elastic Inference accelerator (`ml.eia1.medium`) is attached to improve inference performance while minimizing cost relative to GPU-backed instances.

### Pipeline Integration
By registering model creation as a pipeline step, SageMaker ensures that model instantiation occurs only after successful completion of upstream steps such as training and evaluation. The step becomes part of the pipeline’s dependency graph, enabling deterministic execution, retry behavior, and lineage tracking.

### Role in the Pipeline
This step serves as the transition point from model development to deployment readiness. It produces a deployable SageMaker model that can be conditionally approved, registered in a model registry, or attached to downstream deployment steps as part of an automated CI/CD workflow.

In [16]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="VenueSignalCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

## Model Metrics Attachment and Registration

This cell defines the model registration logic for the SageMaker Pipeline. It attaches evaluation metrics to the trained model and registers the model in a SageMaker Model Package Group, enabling governance, versioning, and controlled promotion to deployment.

### Model Metrics Definition
A `ModelMetrics` object is created to associate quantitative evaluation results with the trained model. The metrics are sourced from the `evaluation.json` file produced by the evaluation processing step and stored in S3.

The `MetricsSource` explicitly specifies:
- the S3 URI of the evaluation artifact,
- and the content type (`application/json`).

By attaching metrics at registration time, SageMaker captures model performance characteristics as immutable metadata, supporting auditability and model comparison across versions.

### Model Registration Configuration
The model is registered using the `model.register()` method, which packages the trained model artifacts, inference container, and metadata into a versioned model package. Registration parameters define:
- supported input (`content_types`) and output (`response_types`) formats,
- compatible instance types for real-time inference and batch transform,
- the target Model Package Group for version management,
- the initial approval status (`model_approval_status`), which may be automated or manually controlled,
- and the associated evaluation metrics.

### Pipeline Integration
The registration logic is wrapped in a `ModelStep` named `VenueSignalRegisterModel`, making model registration a first-class pipeline step. This ensures that model versions are registered deterministically as part of the pipeline execution and only after successful completion of upstream steps.

### Role in the Pipeline
This step formalizes the transition from a trained model artifact to a governed, versioned model package. By registering the model with attached performance metrics, the pipeline enables automated model lifecycle management, supports approval workflows, and enforces quality standards as part of a robust CI/CD process.

In [17]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="VenueSignalRegisterModel", step_args=register_args)

## Quality Gate Failure Step

This cell defines a SageMaker Pipeline `FailStep` that explicitly terminates pipeline execution when the model fails to meet a predefined performance threshold. It is used as part of a conditional quality gate enforcing minimum acceptable model quality.

### Failure Condition Messaging
The `FailStep` is configured with a human-readable error message constructed using `Join`. This dynamically embeds the configured F1 threshold value into the failure message, producing a clear and informative explanation when the pipeline halts (e.g., indicating that the model’s F1 score fell below the required threshold).

### Pipeline Control Flow
This step is not executed unconditionally. Instead, it is typically referenced by a `ConditionStep` that evaluates the F1 score extracted from the evaluation report. If the condition fails (i.e., model performance does not meet the threshold), control is routed to this `FailStep`, immediately stopping pipeline execution.

### Role in the Pipeline
The `FailStep` enforces model quality as a first-class CI/CD concern. By explicitly failing the pipeline when performance requirements are not satisfied, it prevents underperforming models from being registered, approved, or deployed. This ensures that only models meeting defined standards progress through the pipeline, supporting robust governance and automated quality assurance.

In [18]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="VenueSignalF1Fail",
    error_message=Join(on=" ", values=["Execution failed due to F1 <", f1_threshold]),
)

## Model Quality Gate and Conditional Routing

This cell defines the conditional logic that enforces a minimum model quality threshold within the SageMaker Pipeline. It evaluates the model’s F1 score produced during the evaluation step and deterministically routes execution based on whether the model meets the required performance standard.

### Condition Definition
A `ConditionLessThanOrEqualTo` condition is defined to compare:
- the configured F1 performance threshold (`f1_threshold`),
- against the actual F1 score extracted from the evaluation output.

The F1 score is retrieved dynamically using `JsonGet`, which reads the value at
`classification_metrics.f1.value` from the `evaluation.json` file exposed via the evaluation step’s `PropertyFile`. This allows the pipeline to reason over evaluation metrics without manual parsing or hard-coded values.

### Conditional Execution
The `ConditionStep` named `VenueSignalF1Cond` uses this condition to control pipeline flow:
- **If the model meets or exceeds the F1 threshold**, execution proceeds to:
  - model registration (`step_register`),
  - and model creation (`step_create_model`).
- **If the model fails to meet the threshold**, execution is routed to a failure step (`step_fail`), which terminates the pipeline with an explicit error message.

### Pipeline Governance Role
This conditional step serves as the formal quality gate in the CI/CD workflow. By enforcing performance requirements at pipeline runtime, it ensures that only models meeting predefined standards are registered or promoted. This approach enables automated, auditable model governance and prevents underperforming models from entering production systems.

### Role in the End-to-End Pipeline
The `ConditionStep` ties together evaluation, governance, and deployment readiness. It transforms raw evaluation metrics into actionable control flow, completing the automation loop from data ingestion through model validation and controlled promotion.

In [19]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=f1_threshold,
    right=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.f1.value",
    )
)

step_cond = ConditionStep(
    name="VenueSignalF1Cond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model],
    else_steps=[step_fail],
)

## Pipeline Definition and Orchestration

This cell defines the top-level SageMaker Pipeline that orchestrates the end-to-end machine learning workflow. It assembles all previously defined steps, parameters, and dependencies into a single, executable CI/CD pipeline.

### Pipeline Configuration
The pipeline is instantiated with a descriptive name (`VenueSignalPipeline`) to provide clear identification and traceability across executions. This name is used by SageMaker to version, visualize, and manage pipeline runs.

### Parameterization
The pipeline declares a set of runtime parameters that control its behavior without requiring code changes:
- `processing_instance_count` to scale preprocessing resources,
- `instance_type` to control training compute,
- `model_approval_status` to manage governance and approval workflows,
- `input_data` to specify the source dataset,
- `f1_threshold` to enforce a minimum model quality requirement.

By externalizing these values as parameters, the pipeline supports flexible configuration across environments (development, staging, production).

### Step Composition
The pipeline explicitly defines its execution graph via the ordered list of steps:
- `step_process`: data preprocessing and feature preparation,
- `step_train`: model training using XGBoost,
- `step_eval`: model evaluation and metric generation,
- `step_cond`: conditional quality gate controlling model registration and creation.

Although listed sequentially, SageMaker resolves actual execution order based on data and property dependencies, ensuring deterministic and correct execution.

### Role in the CI/CD Workflow
This pipeline definition serves as the control plane for the entire ML lifecycle. It enables automated retraining, evaluation, governance, and promotion of models through a single, reproducible workflow. By encapsulating all stages into a SageMaker Pipeline, the system achieves traceability, repeatability, and enforceable quality standards as part of a robust ML CI/CD process.

In [20]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"VenueSignalPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        f1_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

This cell retrieves and displays the compiled JSON definition of the SageMaker Pipeline. The definition represents the fully resolved pipeline graph, including all parameters, steps, dependencies, and conditional logic, as it will be executed by SageMaker.

In [21]:
import json


definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://yelp-aai540-group6-697347838118/feature-store/training-data/alldata.csv'},
  {'Name': 'F1Threshold', 'Type': 'Float', 'DefaultValue': 0.5}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'VenueSignal',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-eas

## Pipeline Registration

This cell creates or updates the SageMaker Pipeline definition in AWS using the specified IAM role, ensuring that the latest pipeline configuration is registered and ready for execution.

In [22]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:697347838118:pipeline/VenueSignalPipeline',
 'ResponseMetadata': {'RequestId': '1e7cdd6a-3bd0-4215-8bf9-c087c71f9de9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1e7cdd6a-3bd0-4215-8bf9-c087c71f9de9',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '109',
   'date': 'Sat, 21 Feb 2026 22:31:17 GMT'},
  'RetryAttempts': 0}}

## Pipeline Execution

This cell starts a new execution of the SageMaker Pipeline, triggering the end-to-end workflow including preprocessing, training, evaluation, and conditional model registration based on the configured parameters and quality gates.

In [23]:
execution = pipeline.start()

## Pipeline Execution Status

This cell retrieves and displays metadata about the current pipeline execution, including its status, start time, and associated step details, allowing you to monitor progress and diagnose failures during runtime.

In [24]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:697347838118:pipeline/VenueSignalPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:697347838118:pipeline/VenueSignalPipeline/execution/mibq5rtjzpby',
 'PipelineExecutionDisplayName': 'execution-1771711150457',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2026, 2, 21, 21, 59, 10, 382000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2026, 2, 21, 21, 59, 10, 382000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:697347838118:user-profile/d-ohmizsi7kzns/default-1768357685526',
  'UserProfileName': 'default-1768357685526',
  'DomainId': 'd-ohmizsi7kzns',
  'IamIdentity': {'Arn': 'arn:aws:sts::697347838118:assumed-role/LabRole/SageMaker',
   'PrincipalId': 'AROA2EXJLSCTAN3QBZ4ST:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:697347838118:user-profile/d-ohmizsi7kzns/default-1768357685526',
  'UserProfileName': 'default-1768

## Pipeline Execution Wait

This cell blocks execution until the SageMaker Pipeline run has completed, ensuring that all steps finish before proceeding and making it easier to observe final outcomes or handle results synchronously.

In [25]:
execution.wait()

## Pipeline Step Listing

This cell lists all steps executed as part of the current SageMaker Pipeline run, along with their statuses and metadata, allowing you to inspect progress, execution order, and identify any failed or skipped steps.

In [26]:
execution.list_steps()

[{'StepName': 'VenueSignalCreateModel-CreateModel',
  'StartTime': datetime.datetime(2026, 2, 21, 22, 6, 47, 738000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2026, 2, 21, 22, 6, 49, 327000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:697347838118:model/pipelines-mibq5rtjzpby-VenueSignalCreateMod-5oCkUMXxse'}},
  'AttemptCount': 1},
 {'StepName': 'VenueSignalRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2026, 2, 21, 22, 6, 47, 738000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2026, 2, 21, 22, 6, 49, 298000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:697347838118:model-package/venuesignal-model-group-697347838118/1'}},
  'AttemptCount': 1},
 {'StepName': 'VenueSignalF1Cond',
  'StartTime': datetime.datetime(2026, 2, 21, 22, 6, 47, 262000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2026, 2, 21, 22, 6, 47, 392000

```mermaid
flowchart LR
    %% Parameters block
    P[Parameters
    - proc_instance_count
    - proc_instance_type
    - training_instance_type
    - model_approval_status
    - input_data
    - f1_threshold]

    %% Main pipeline
    subgraph PIPELINE[Pipeline]
        A[Processing Step\nfeature engineering]
        B[Training Step\nmodel training]
        C[Processing Step\nmodel evaluation]
        D{Condition Step\nmodel F1 threshold}
    end

    %% If branch
    subgraph IFSTEPS[if steps]
        E[Create Model Step\nmodel]
        F[Register Model\nmodel package]
    end

    %% Else branch
    subgraph ELSESTEPS[else steps]
        H[Fail Step\nexecution failed]
    end

    %% Flow connections
    P --> A
    A --> B --> C --> D

    D -->|meets threshold| E
    E --> F

    D -->|fails threshold| H
```


In [32]:
from datetime import timezone

steps = execution.list_steps()

# Sort by start time
steps = sorted(steps, key=lambda x: x["StartTime"])

for step in steps:
    name = step["StepName"]
    status = step["StepStatus"]
    start = step["StartTime"]
    end = step.get("EndTime")

    duration = None
    if end:
        duration = (end - start).total_seconds()

    print(f"Step: {name}")
    print(f"  Status:   {status}")
    if duration is not None:
        print(f"  Duration: {duration:.1f} seconds")

    # Print useful metadata if present
    metadata = step.get("Metadata", {})
    if "ProcessingJob" in metadata:
        print(f"  ProcessingJob ARN: {metadata['ProcessingJob']['Arn']}")
    if "TrainingJob" in metadata:
        print(f"  TrainingJob ARN:   {metadata['TrainingJob']['Arn']}")
    if "RegisterModel" in metadata:
        print(f"  ModelPackage ARN:  {metadata['RegisterModel']['Arn']}")
    if "Model" in metadata:
        print(f"  Model ARN:         {metadata['Model']['Arn']}")
    if "Condition" in metadata:
        print(f"  Condition Outcome:{metadata['Condition']['Outcome']}")

    print("-" * 70)

Step: VenueSignal
  Status:   Succeeded
  Duration: 153.6 seconds
  ProcessingJob ARN: arn:aws:sagemaker:us-east-1:697347838118:processing-job/pipelines-mibq5rtjzpby-VenueSignal-JSxpfk0SA8
----------------------------------------------------------------------
Step: VenueSignalTrain
  Status:   Succeeded
  Duration: 148.2 seconds
  TrainingJob ARN:   arn:aws:sagemaker:us-east-1:697347838118:training-job/pipelines-mibq5rtjzpby-VenueSignalTrain-hOwI3V2EXu
----------------------------------------------------------------------
Step: VenueSignalEval
  Status:   Succeeded
  Duration: 153.0 seconds
  ProcessingJob ARN: arn:aws:sagemaker:us-east-1:697347838118:processing-job/pipelines-mibq5rtjzpby-VenueSignalEval-mI4vHSX47N
----------------------------------------------------------------------
Step: VenueSignalF1Cond
  Status:   Succeeded
  Duration: 0.1 seconds
  Condition Outcome:True
----------------------------------------------------------------------
Step: VenueSignalCreateModel-CreateMod

## Examining the Evaluation
Examine the resulting model evaluation after the pipeline completes. Download the resulting evaluation.json file from S3 and print the report.

In [37]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

{'classification_metrics': {'accuracy': {'value': 0.7515157539012026},
                            'auc': {'value': 0.7507763739583867},
                            'f1': {'value': 0.8396613648024629},
                            'precision': {'value': 0.770752384316496},
                            'recall': {'value': 0.9221017044654176}}}


## Lineage
Review the lineage of the artifacts generated by the pipeline.

In [39]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

{'StepName': 'VenueSignal', 'StartTime': datetime.datetime(2026, 2, 21, 21, 59, 11, 175000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 2, 21, 22, 1, 44, 806000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:697347838118:processing-job/pipelines-mibq5rtjzpby-VenueSignal-JSxpfk0SA8'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...aa087173b0f29d3e60255d2/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://.../feature-store/training-data/alldata.csv,Input,DataSet,ContributedTo,artifact
2,68331...com/sagemaker-scikit-learn:1.2-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...ine/mibq5rtjzpby/VenueSignal/output/test,Output,DataSet,Produced,artifact
4,s3://...bq5rtjzpby/VenueSignal/output/validation,Output,DataSet,Produced,artifact
5,s3://...ne/mibq5rtjzpby/VenueSignal/output/train,Output,DataSet,Produced,artifact


{'StepName': 'VenueSignalTrain', 'StartTime': datetime.datetime(2026, 2, 21, 22, 1, 45, 228000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 2, 21, 22, 4, 13, 454000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:697347838118:training-job/pipelines-mibq5rtjzpby-VenueSignalTrain-hOwI3V2EXu'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...bq5rtjzpby/VenueSignal/output/validation,Input,DataSet,ContributedTo,artifact
1,s3://...ne/mibq5rtjzpby/VenueSignal/output/train,Input,DataSet,ContributedTo,artifact
2,68331...-1.amazonaws.com/sagemaker-xgboost:1.7-1,Input,Image,ContributedTo,artifact
3,s3://...gnalTrain-hOwI3V2EXu/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'VenueSignalEval', 'StartTime': datetime.datetime(2026, 2, 21, 22, 4, 13, 696000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 2, 21, 22, 6, 46, 722000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:697347838118:processing-job/pipelines-mibq5rtjzpby-VenueSignalEval-mI4vHSX47N'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...2171e5a541d6a670b4aff43561/evaluation.py,Input,DataSet,ContributedTo,artifact
1,s3://...ine/mibq5rtjzpby/VenueSignal/output/test,Input,DataSet,ContributedTo,artifact
2,s3://...gnalTrain-hOwI3V2EXu/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,68331...-1.amazonaws.com/sagemaker-xgboost:1.7-1,Input,Image,ContributedTo,artifact
4,s3://...026-02-21-21-59-08-972/output/evaluation,Output,DataSet,Produced,artifact


{'StepName': 'VenueSignalF1Cond', 'StartTime': datetime.datetime(2026, 2, 21, 22, 6, 47, 262000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 2, 21, 22, 6, 47, 392000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'Condition': {'Outcome': 'True'}}, 'AttemptCount': 1}


None

{'StepName': 'VenueSignalRegisterModel-RegisterModel', 'StartTime': datetime.datetime(2026, 2, 21, 22, 6, 47, 738000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 2, 21, 22, 6, 49, 298000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:697347838118:model-package/venuesignal-model-group-697347838118/1'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...gnalTrain-hOwI3V2EXu/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,68331...-1.amazonaws.com/sagemaker-xgboost:1.7-1,Input,Image,ContributedTo,artifact
2,venuesignal-model-group-697347838118-1-1771711...,Input,ModelLifeCycle,ContributedTo,action
3,venuesignal-model-group-697347838118-1-Pending...,Input,Approval,ContributedTo,action
4,venuesignal-model-group-697347838118-177171160...,Output,ModelGroup,AssociatedWith,context


{'StepName': 'VenueSignalCreateModel-CreateModel', 'StartTime': datetime.datetime(2026, 2, 21, 22, 6, 47, 738000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 2, 21, 22, 6, 49, 327000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:697347838118:model/pipelines-mibq5rtjzpby-VenueSignalCreateMod-5oCkUMXxse'}}, 'AttemptCount': 1}


None