<a href="https://colab.research.google.com/github/calmrocks/master-machine-learning-engineer/blob/main/MLOps/MLPipelineSagemaker.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Automated Machine Learning Pipeline with Amazon SageMaker

## Overview

This notebook demonstrates how to build an automated ML pipeline using Amazon SageMaker. We'll showcase:
- Automated data processing using SageMaker Processing Jobs
- Model training using SageMaker Training Jobs
- Model deployment using SageMaker Endpoints
- Automated monitoring using Model Monitor
- Automated retraining using SageMaker Pipelines

![ML Pipeline](https://github.com/calmrocks/master-machine-learning-engineer/blob/main/MLOps/Diagrams/MLPipeline.png?raw=1)

## Setup

First, let's set up our SageMaker environment and required dependencies.


In [2]:
!pip install boto3 sagemaker --upgrade

Collecting boto3
  Downloading boto3-1.36.21-py3-none-any.whl.metadata (6.7 kB)
Collecting sagemaker
  Downloading sagemaker-2.239.1-py3-none-any.whl.metadata (16 kB)
Collecting botocore<1.37.0,>=1.36.21 (from boto3)
  Downloading botocore-1.36.21-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.12.0,>=0.11.0 (from boto3)
  Downloading s3transfer-0.11.2-py3-none-any.whl.metadata (1.7 kB)
Collecting attrs<24,>=23.1.0 (from sagemaker)
  Downloading attrs-23.2.0-py3-none-any.whl.metadata (9.5 kB)
Collecting docker (from sagemaker)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting fastapi (from sagemaker)
  Downloading fastapi-0.115.8-py3-none-any.whl.metadata (27 kB)
Collecting importlib-metadata<7.0,>=1.4.0 (from sagemaker)
  Downloading importlib_metadata-6.11.0-py3-none-any.whl.metadata (4.9 kB)
Collecting omegaconf<=2.3,>=2.2 (from sagemaker)


In [1]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.model_monitor import DataCaptureConfig, ModelMonitor

# Setup SageMaker session
session = sagemaker.Session()
role = get_execution_role()
region = boto3.Session().region_name



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


## Data Processing Step

Here we define our data processing step using a SageMaker Processing Job.

In [None]:
from sagemaker.processing import ScriptProcessor

# Create processor for data preprocessing
preprocessor = ScriptProcessor(
    base_job_name='preprocessing-job',
    image_uri='<your-preprocessing-image>',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge'
)

# Define processing step
processing_step = ProcessingStep(
    name="PreprocessData",
    processor=preprocessor,
    inputs=[
        ProcessingInput(
            source='s3://your-bucket/raw-data',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='train',
            source='/opt/ml/processing/train'
        ),
        ProcessingOutput(
            output_name='test',
            source='/opt/ml/processing/test'
        )
    ],
    code='preprocess.py'
)

## Training Step

Define the model training step using SageMaker's built-in algorithms or custom containers.

In [None]:
from sagemaker.estimator import Estimator

# Create estimator
estimator = Estimator(
    image_uri='<your-training-image>',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    output_path='s3://your-bucket/model-output'
)

# Define training step
training_step = TrainingStep(
    name="TrainModel",
    estimator=estimator,
    inputs={
        'training': sagemaker.inputs.TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs['train'].S3Output
        )
    }
)

## Model Evaluation Step

Create a step to evaluate model performance and decide if retraining is needed.

In [None]:
evaluation_processor = ScriptProcessor(
    base_job_name='evaluation-job',
    image_uri='<your-evaluation-image>',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge'
)

evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/model'
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output,
            destination='/opt/ml/processing/test'
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation',
            source='/opt/ml/processing/evaluation'
        )
    ],
    code='evaluate.py'
)

## Model Deployment Step

Configure model deployment with monitoring enabled.

In [None]:
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep

# Create model
model = Model(
    image_uri='<your-inference-image>',
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    role=role
)

# Define deployment configuration
data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri='s3://your-bucket/captured-data'
)

# Model deployment step
deployment_step = ModelStep(
    name="DeployModel",
    step_args=model.deploy(
        initial_instance_count=1,
        instance_type='ml.m5.xlarge',
        data_capture_config=data_capture_config
    )
)

## Automated Monitoring Setup

Configure automated monitoring for the deployed model.

In [None]:
# Create model monitor
model_monitor = ModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600
)

# Schedule monitoring
model_monitor.create_monitoring_schedule(
    monitor_schedule_name='model-monitor-schedule',
    endpoint_input=deployment_step.properties.EndpointName,
    statistics=baseline_stats,
    constraints=baseline_constraints,
    schedule_cron_expression='cron(0 * ? * * *)'  # Hourly monitoring
)

## Automated Retraining Trigger

Set up automated retraining based on monitoring results.

In [None]:
from sagemaker.workflow.conditions import ConditionGreaterThanThreshold

# Define condition for retraining
retraining_condition = ConditionGreaterThanThreshold(
    left=evaluation_step.properties.Statistics['DriftMetric'],
    right=0.1  # Threshold for drift
)

# Conditional step for retraining
conditional_training_step = ConditionStep(
    name="CheckDriftAndRetrain",
    conditions=[retraining_condition],
    if_steps=[processing_step, training_step, evaluation_step, deployment_step],
    else_steps=[]
)

## Pipeline Definition

Create and execute the complete pipeline.



# Define pipeline
pipeline = Pipeline(
    name="AutoMLPipeline",
    steps=[
        processing_step,
        training_step,
        evaluation_step,
        conditional_training_step
    ]
)

# Execute pipeline
pipeline.upsert(role_arn=role)
execution = pipeline.start()

## Pipeline Scheduling

Set up automated pipeline execution.

In [None]:
import boto3

events_client = boto3.client('events')

# Create CloudWatch Events rule
response = events_client.put_rule(
    Name='MLPipelineSchedule',
    ScheduleExpression='rate(1 day)',  # Run daily
    State='ENABLED'
)

# Add target to trigger pipeline
events_client.put_targets(
    Rule='MLPipelineSchedule',
    Targets=[{
        'Id': 'MLPipelineTarget',
        'Arn': pipeline.arn,
        'RoleArn': role
    }]
)

## Conclusion

This notebook demonstrated how to:

- Create an automated ML pipeline using SageMaker Pipelines
- Set up automated data processing and model training
- Configure model monitoring and drift detection
- Implement automated retraining triggers
- Schedule regular pipeline execution

Key Benefits:

- Fully automated workflow
- Built-in monitoring and retraining
- Scalable and reproducible
- Integrated with AWS services
- Production-ready implementation
