# Orchestrating Jobs, Model Registration, and Continuous Deployment with Amazon SageMaker

Amazon SageMaker offers Machine Learning application developers and Machine Learning operations engineers the ability to orchestrate SageMaker jobs and author reproducible Machine Learning pipelines, deploy custom-build models for inference in real-time with low latency or offline inferences with Batch Transform, and track lineage of artifacts. You can institute sound operational practices in deploying and monitoring production workflows, deployment of model artifacts, and track artifact lineage through a simple interface, adhering to safety and best-practice paradigmsfor Machine Learning application development.

The SageMaker Pipelines service supports a SageMaker Machine Learning Pipeline Domain Specific Language (DSL), which is a declarative Json specification. This DSL defines a Directed Acyclic Graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that are already familiar to engineers and scientists alike.

The SageMaker Model Registry is where trained models are stored, versioned, and managed. Data Scientists and Machine Learning Engineers can compare model versions, approve models for deployment, and deploy models from different AWS accounts, all from a single Model Registry. SageMaker enables customers to follow the best practices with ML Ops and getting started right. Customers are able to standup a full ML Ops end-to-end system with a single API call.

## SageMaker Pipelines

Amazon SageMaker Pipelines support the following activites:

* Pipelines - A Directed Acyclic Graph of steps and conditions to orchestrate SageMaker jobs and resource creation.
* Processing Job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
* Training Job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
* Conditional step execution - Provides conditional execution of branches in a pipeline.
* Registering Models - Creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.
* Creating Model steps - Create a model for use in transform steps or later publication as an endpoint.
* Parameterized Pipeline executions - Allows pipeline executions to vary by supplied parameters.
* Transform Job steps - A batch transform to preprocess datasets to remove noise or bias that interferes with training or inference from your dataset, get inferences from large datasets, and run inference when you don't need a persistent endpoint.

## Layout of the SageMaker ModelBuild Project Template

The template provides a starting point for bringing your SageMaker Pipeline development to production.

```
|-- CONTRIBUTING.md
|-- pipelines
|   |-- abalone
|   |   |-- evaluate.py
|   |   |-- __init__.py
|   |   |-- pipeline.py
|   |   `-- preprocess.py
|   |-- get_pipeline_definition.py
|   |-- __init__.py
|   |-- run_pipeline.py
|   |-- _utils.py
|   `-- __version__.py
|-- README.md
|-- sagemaker-pipelines-project.ipynb
|-- setup.cfg
|-- setup.py
|-- tests
|   `-- test_pipelines.py
`-- tox.ini
```

A description of some of the artifacts is provided below:
<br/><br/>
Your pipeline artifacts, which includes a pipeline module defining the required `get_pipeline` method that returns an instance of a SageMaker pipeline, a preprocessing script that is used in feature engineering, and a model evaluation script to measure the Mean Squared Error of the model that's trained by the pipeline:

```
|-- pipelines
|   |-- abalone
|   |   |-- evaluate.py
|   |   |-- __init__.py
|   |   |-- pipeline.py
|   |   `-- preprocess.py

```

For additional subfolders with code and/or artifacts needed by pipeline, they need to be packaged correctly by the `setup.py` file. For example, to package a `pipelines/source` folder,

* Include a `__init__.py` file within the `source` folder.
* Add it to the `setup.py` file's `package_data` like so:

```
...
    packages=setuptools.find_packages(),
    include_package_data=True,
    package_data={"pipelines.my_pipeline.src": ["*.txt"]},
    python_requires=">=3.6",
    install_requires=required_packages,
    extras_require=extras,
...
```

<br/><br/>
Utility modules for getting pipeline definition jsons and running pipelines:

```
|-- pipelines
|   |-- get_pipeline_definition.py
|   |-- __init__.py
|   |-- run_pipeline.py
|   |-- _utils.py
|   `-- __version__.py
```
<br/><br/>
Python package artifacts:
```
|-- setup.cfg
|-- setup.py
```
<br/><br/>
A stubbed testing module for testing your pipeline as you develop:
```
|-- tests
|   `-- test_pipelines.py
```
<br/><br/>
The `tox` testing framework configuration:
```
`-- tox.ini
```

### A SageMaker Pipeline

The pipeline that we create follows a typical Machine Learning Application pattern of pre-processing, training, evaluation, and conditional model registration and publication, if the quality of the model is sufficient.

![A typical ML Application pipeline](img/pipeline-full.png)

### Getting some constants

We get some constants from the local execution environment.

In [None]:
import boto3
import sagemaker
from sagemaker.workflow.experiment_config import ExperimentConfig


region = boto3.Session().region_name
role = sagemaker.get_execution_role()
default_bucket = sagemaker.session.Session().default_bucket()

# Change these to reflect your project/business name or if you want to separate ModelPackageGroup/Pipeline from the rest of your team
model_package_group_name = f"AbaloneModelPackageGroup-Example"
pipeline_name = f"AbalonePipeline-Example"

# Experiment Tracking and Feature Store configuration
experiment_name = f"{pipeline_name}-Experiment"
feature_group_name = f"Abalone-feature-group"


### Get the pipeline instance

Here we get the pipeline instance from your pipeline module so that we can work with it.

In [2]:
from pipelines.abalone.pipeline import get_pipeline


pipeline = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
    experiment_name=experiment_name,
    feature_group_name=feature_group_name,
    enable_feature_store=True,
)


INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


### Experiment Tracking and Feature Store

The pipeline now includes:

1. **Experiment Tracking**: Each pipeline execution is automatically tracked as a trial in SageMaker Experiments. You can:
   - Compare different pipeline runs
   - Track hyperparameters and metrics
   - View lineage of artifacts

2. **Feature Store Integration**: Features are automatically ingested into SageMaker Feature Store during preprocessing, enabling:
   - Feature reuse across multiple models
   - Point-in-time feature retrieval
   - Online feature serving for real-time inference

Let's explore the experiment and feature store:


In [None]:
# View experiment trials
from sagemaker.experiments import Experiment

experiment = Experiment.load(experiment_name=experiment_name, sagemaker_boto_client=boto3.client("sagemaker"))
trials = experiment.list_trials()

print(f"Experiment: {experiment_name}")
print(f"Number of trials: {len(list(trials))}")
print("\nRecent trials:")
for trial in list(trials)[:5]:
    print(f"  - {trial.trial_name}")
    print(f"    Created: {trial.creation_time}")


In [None]:
# Check Feature Store Feature Group
try:
    from sagemaker.feature_store.feature_group import FeatureGroup
    
    feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker.session.Session())
    
    # Get feature group description
    try:
        feature_group.describe()
        print(f"Feature Group '{feature_group_name}' exists and is ready!")
        print(f"Feature Group ARN: {feature_group.describe().get('FeatureGroupArn', 'N/A')}")
        print(f"Online Store Status: {feature_group.describe().get('OnlineStoreConfig', {}).get('Status', 'N/A')}")
    except Exception as e:
        print(f"Feature Group '{feature_group_name}' may not exist yet or is still being created.")
        print(f"Error: {e}")
        print("Note: Feature Group will be created during pipeline execution if it doesn't exist.")
except ImportError:
    print("SageMaker Feature Store SDK not available. Install with: pip install sagemaker[feature-store]")
except Exception as e:
    print(f"Error checking Feature Store: {e}")


### Viewing Experiment Metrics and Parameters

You can view detailed metrics and parameters for each trial in the SageMaker Studio Experiments UI, or programmatically:


In [None]:
# Get the latest trial from the experiment
trials_list = list(experiment.list_trials())
if trials_list:
    latest_trial = trials_list[0]
    print(f"Latest Trial: {latest_trial.trial_name}")
    
    # List trial components (training jobs, processing jobs, etc.)
    trial_components = list(latest_trial.list_trial_components())
    print(f"\nTrial Components ({len(trial_components)}):")
    for component in trial_components[:5]:
        print(f"  - {component.trial_component_name}")
        print(f"    Type: {component.get('Source', {}).get('SourceType', 'N/A')}")
else:
    print("No trials found yet. Run the pipeline to create a trial.")


### Submit the pipeline to SageMaker and start execution

Let's submit our pipeline definition to the workflow service. The role passed in will be used by the workflow service to create all the jobs defined in the steps.

In [3]:
pipeline.upsert(role_arn=role)

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:335481609370:pipeline/AbalonePipeline-Example',
 'PipelineVersionId': 4,
 'ResponseMetadata': {'RequestId': 'f76db16f-4962-4e6c-bcfc-e5e38dbbc474',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f76db16f-4962-4e6c-bcfc-e5e38dbbc474',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '113',
   'date': 'Tue, 25 Nov 2025 06:16:43 GMT'},
  'RetryAttempts': 0}}

We'll start the pipeline, accepting all the default parameters.

Values can also be passed into these pipeline parameters on starting of the pipeline, and will be covered later. 

In [4]:
execution = pipeline.start()

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


### Pipeline Operations: examining and waiting for pipeline execution

Now we describe execution instance and list the steps in the execution to find out more about the execution.

In [5]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:335481609370:pipeline/AbalonePipeline-Example',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:335481609370:pipeline/AbalonePipeline-Example/execution/zqn8am6nwx8q',
 'PipelineExecutionDisplayName': 'execution-1764051408626',
 'PipelineExecutionStatus': 'Succeeded',
 'PipelineExperimentConfig': {'ExperimentName': 'AbalonePipeline-Example',
  'TrialName': 'zqn8am6nwx8q'},
 'CreationTime': datetime.datetime(2025, 11, 25, 6, 16, 48, 554000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2025, 11, 25, 6, 24, 33, 400000, tzinfo=tzlocal()),
 'CreatedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::335481609370:assumed-role/AmazonSageMaker-ExecutionRole-20251125T093295/SageMaker',
   'PrincipalId': 'AROAU4HCDDSNOC2HVVREB:SageMaker'}},
 'LastModifiedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::335481609370:assumed-role/AmazonSageMaker-ExecutionRole-20251125T093295/SageMaker',
   'PrincipalId': 'AROAU4HCDDSNOC2HVVREB:SageMaker'}},
 'Pipelin

We can wait for the execution by invoking `wait()` on the execution:

In [None]:
execution.wait()

We can list the execution steps to check out the status and artifacts:

In [6]:
execution.list_steps()

[{'StepName': 'RegisterAbaloneModel-RegisterModel',
  'StartTime': datetime.datetime(2025, 11, 25, 6, 24, 32, 12000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 11, 25, 6, 24, 33, 59000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:335481609370:model-package/AbaloneModelPackageGroup-Example/1'}},
  'AttemptCount': 1},
 {'StepName': 'CheckMSEAbaloneEvaluation',
  'StartTime': datetime.datetime(2025, 11, 25, 6, 24, 31, 498000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 11, 25, 6, 24, 31, 676000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Condition': {'Outcome': 'True'}},
  'AttemptCount': 1},
 {'StepName': 'EvaluateAbaloneModel',
  'StartTime': datetime.datetime(2025, 11, 25, 6, 21, 54, 580000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 11, 25, 6, 24, 30, 859000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:

### Parameterized Executions

We can run additional executions of the pipeline specifying different pipeline parameters. The parameters argument is a dictionary whose names are the parameter names, and whose values are the primitive values to use as overrides of the defaults.

Of particular note, based on the performance of the model, we may want to kick off another pipeline execution, but this time on a compute-optimized instance type and set the model approval status automatically be "Approved". This means that the model package version generated by the `RegisterModel` step will automatically be ready for deployment through CI/CD pipelines, such as with SageMaker Projects.

In [8]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Approved",
    )
)

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


In [9]:
execution.wait()

In [10]:
execution.list_steps()

[{'StepName': 'RegisterAbaloneModel-RegisterModel',
  'StartTime': datetime.datetime(2025, 11, 25, 6, 24, 32, 12000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 11, 25, 6, 24, 33, 59000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:335481609370:model-package/AbaloneModelPackageGroup-Example/1'}},
  'AttemptCount': 1},
 {'StepName': 'CheckMSEAbaloneEvaluation',
  'StartTime': datetime.datetime(2025, 11, 25, 6, 24, 31, 498000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 11, 25, 6, 24, 31, 676000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Condition': {'Outcome': 'True'}},
  'AttemptCount': 1},
 {'StepName': 'EvaluateAbaloneModel',
  'StartTime': datetime.datetime(2025, 11, 25, 6, 21, 54, 580000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 11, 25, 6, 24, 30, 859000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws: