# 03 - CI/CD Pipelines

## Install Required Packages

In [2]:
!pip install pyathena
!pip install -U sagemaker

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Import Libraries

In [3]:
import boto3 # aws sdk for python
import sagemaker # machine learning platform
import json # encoder and decoder
import numpy as np # array manipulation
import os # operating system interfaces
import pandas as pd # python data analysis
import re # regular expressions
from pyathena import connect # athena client
from sagemaker.pytorch.estimator import PyTorch # PyTorch estimator
from sagemaker.pytorch.model import PyTorchModel # PyTorch model
from time import gmtime, strftime, sleep # time-related functions

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


## Set Up Environment

In [4]:
from sagemaker.workflow.pipeline_context import PipelineSession

# set up sagemaker session
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

# initialize a pipeline session
pipeline_session = PipelineSession()

# define name for model package group
model_package_group_name = f"SafetyModelPackageGroupName"

# define prefixes for the safety catalog and data directories
prefix_catalog = 'safety/catalog'
prefix_data = 'safety/data'

## Create Input Data Using Catalog

In [None]:
# create a 'data' directory for input data
!mkdir -p data

In [5]:
# define database name
database_name = 'safetydb'

# define table name
table_name_csv = 'catalog_csv'

# set s3 temporary staging directory
s3_staging_dir = "s3://{0}/athena/staging".format(default_bucket)

# define connection parameters
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

# define sql query statement
statement = """SELECT * FROM {}.{}
    WHERE img_filename like '%.jpg'
    AND label_filename like '%.txt'
    LIMIT 100""".format(
    database_name, table_name_csv
)

# print sql statement for review before executing
print('SQL query SELECT statement:\n', statement)

SQL query SELECT statement:
 SELECT * FROM safetydb.catalog_csv
    WHERE img_filename like '%.jpg'
    AND label_filename like '%.txt'
    LIMIT 100


In [None]:
# execute sql query
df_catalog_query = pd.read_sql(statement, conn)

# convert to csv format, store in 'data' directory
df_catalog_query.to_csv(f"data/catalog_query.csv")

## Create Pipeline

### Define Input Data

In [6]:
# define path to local csv file with catalog query
local_path = "data/catalog_query.csv"

# use boto3 to interact with s3 resource
s3 = boto3.resource("s3")

# define uri to store catalog in s3
catalog_uri = f"s3://{default_bucket}/{prefix_catalog}"

# upload input/catalog data in s3
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=catalog_uri,
)

# print s3 location for uploaded catalog csv file
print(input_data_uri)

s3://sagemaker-us-east-1-414754026690/safety/catalog/catalog_query.csv


### Define Batch Data

In [7]:
# define uri for batch data (i.e., images), to be used for batch transform job
batch_data_uri = f"s3://{default_bucket}/{prefix_data}/split_cicd/batch/images"

# print batch data uri
print(batch_data_uri)

s3://sagemaker-us-east-1-414754026690/safety/data/split_cicd/batch/images


### Define Pipeline Parameters

In [8]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)

# mean average precision threshold value, to be used during condition step
map_threshold = ParameterFloat(name="mAPThreshold", default_value=0.001)

### Define a Processing Step for Feature Engineering

In [None]:
# create a 'code' directory to store custom requirements and scripts
!mkdir -p code

#### Create Custom Preprocessing Script

In [None]:
%%writefile code/preprocessing.py
import argparse
import numpy as np
import pandas as pd
import subprocess

# define function to copy files for respective data splits to corresponding s3 destinations
# provide 'split_name' as either 'train', 'val', 'test', or 'batch'
# provide 'split_list' as either 'train_list', 'val_list', 'test_list', or 'batch_list'
def split_dataset(split_name, split_list):
    
    # counter for printing copy progress
    num_files_copied = 0

    # iterate through each sample in split
    for index, sample in data[split_list].iterrows():

        # source/destination variables for individual sample
        cp_image_source = f"{s3_images_source}{sample['img_filename']}"
        cp_image_dest = f"{s3_split_dest}{split_name}/images/"
        cp_label_source = f"{s3_labels_source}{sample['label_filename']}"
        cp_label_dest = f"{s3_split_dest}{split_name}/labels/"

        # copy from source to destination
        subprocess.run(f"aws s3 cp {cp_image_source} {cp_image_dest} --exclude '*' --include '*.jpg' --only-show-errors", shell=True)
        subprocess.run(f"aws s3 cp {cp_label_source} {cp_label_dest} --exclude '*' --include '*.txt' --only-show-errors", shell=True)
        
        # increment counter
        num_files_copied += 1

        # print status after every 500 files copied
        if num_files_copied % 500 == 0:
            print(f"{num_files_copied} files copied.")


if __name__ == "__main__":
    
    parser = argparse.ArgumentParser()

    parser.add_argument('--s3-images-source', type=str)
    parser.add_argument('--s3-labels-source', type=str)
    parser.add_argument('--s3-split-dest', type=str)
    
    # obtain arguments
    args, _ = parser.parse_known_args()
    
    # base directory for processing step
    base_dir = "/opt/ml/processing"
    
    # read in catalog query for input data
    data = pd.read_csv(
        f"{base_dir}/input/catalog_query.csv"
    )
    
    # data split in four sets - training, validation, test, and batch inference
    rand_split = np.random.rand(len(data))
    train_list = rand_split < 0.4
    val_list = (rand_split >= 0.4) & (rand_split < 0.5)
    test_list = (rand_split >= 0.5) & (rand_split < 0.6)
    batch_list = rand_split >= 0.6 # "production" data

    # print data splits
    print('Data Splits:')
    print('------------')
    print(f"Train :   {sum(train_list)} samples")
    print(f"Val   :   {sum(val_list)} samples")
    print(f"Test  :   {sum(test_list)} samples")
    print(f"Batch :   {sum(batch_list)} samples")
    
    # define and print source s3 locations
    s3_images_source = args.s3_images_source
    s3_labels_source = args.s3_labels_source
    print('Images source directory location:', s3_images_source)
    print('Labels source directory location:', s3_labels_source, '\n')

    # define and print destination s3 location for data splits
    s3_split_dest = args.s3_split_dest
    print('Split destination directory location:', s3_split_dest)
    
    # perform data copies
    print('Beginning TRAIN data split copies.')
    split_dataset(split_name='train', split_list=train_list)
    print('Completed TRAIN data split copies.\n')

    print('Beginning VAL data split copies.')
    split_dataset(split_name='val', split_list=val_list)
    print('Completed VAL data split copies.\n')

    print('Beginning TEST data split copies.')
    split_dataset(split_name='test', split_list=test_list)
    print('Completed TEST data split copies.\n')

    print('Beginning BATCH data split copies.')
    split_dataset(split_name='batch', split_list=batch_list)
    print('Completed BATCH data split copies.')

#### Create PyTorch Processor

In [9]:
from sagemaker.pytorch.processing import PyTorchProcessor

#Initialize the PyTorchProcessor
pytorch_processor = PyTorchProcessor(
    framework_version='2.1.0',
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=processing_instance_count,
    py_version='py310',
    base_job_name='pytorch-safety-process',
    sagemaker_session=pipeline_session,
)

In [10]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# define processor args
processor_args = pytorch_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    arguments=["--s3-images-source", f"s3://{default_bucket}/{prefix_data}/images/",
               "--s3-labels-source", f"s3://{default_bucket}/{prefix_data}/labels/",
               "--s3-split-dest", f"s3://{default_bucket}/{prefix_data}/split_cicd/"
    ],
    code="preprocessing.py", # custom preprocessing py script
    source_dir='code',
)

# create process step using processor args
step_process = ProcessingStep(name="SafetyProcess", step_args=processor_args)



### Define a Training Step to Train a Model

#### Create Custom Train Script

In [None]:
%%writefile code/train.py
import argparse
import os
import shutil
import torch
from ultralytics import settings
from ultralytics import YOLO

if __name__ =='__main__':

    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script
    parser.add_argument('--data', type=str, default='data.yaml') # yaml config file for custom dataset
    parser.add_argument('--epochs', type=int, default=3) # number of training epochs
    parser.add_argument('--batch', type=int, default=-1) # batch size, -1 for AutoBatch
    parser.add_argument('--yolo-model', type=str, default='yolov8n.pt') # pretrained base model
    parser.add_argument('--saved-model-weights', type=str, default='model.pt') # saved model weights
    
    # data and model directories
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])

    # obtain arguments
    args, _ = parser.parse_known_args()

    # build model
    model = YOLO(args.yolo_model)

    # train model
    results = model.train(data=args.data, epochs=args.epochs, batch=args.batch)
    
    # define soure/dest paths for best model weights
    path_best_model_source = f"{settings['runs_dir']}/detect/train/weights/best.pt"
    path_best_model_dest = os.path.join(args.model_dir, args.saved_model_weights)
    
    # copy best model weights file for packaging
    shutil.copy(path_best_model_source, path_best_model_dest)
    
    # evaluate model on test dataset
    print('EVALUATING MODEL ON TEST DATASET...')
    model_val = YOLO(path_best_model_dest)
    metrics = model_val.val(data=args.data, split='test')
    
    # print evaluation metric to compare performance between models
    print('-------------')
    print('-------------')
    print('-------------')
    print('MODEL EVALUATION METRIC:')
    print('mAP50:', round(metrics.box.map50, 4))
    print('-------------')
    print('-------------')
    print('-------------')

#### Create PyTorch Estimator

In [11]:
#from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

# define job name and output location, 'cicd' for pipeline jobs
job_name = 'yolov8-cicd'
output_location = "s3://{}/{}/output/{}".format(default_bucket, prefix_data, job_name)

# build a PyTorch estimator
pytorch_estimator = PyTorch(
    role=role,
    entry_point='train.py', # custom training script, locate in code directory
    framework_version='2.1.0', # training - CPU - Python 3.10
    py_version='py310',
    source_dir='code',
    instance_count=1,
    instance_type=instance_type,
    output_path=output_location,
    sagemaker_session=pipeline_session, # pipeline session
    hyperparameters = {'data': 'data.yaml', # yaml config file for custom dataset
                       'epochs': 1, # number of training epochs
                       'batch': 32, # batch size, -1 for AutoBatch
                       'yolo_model': 'yolov8n.pt', # pretrained base model
                       'saved_model_weights': 'model.pt' # saved model weights
                      }
)

# s3 location where training data is saved
inputs = f"s3://{default_bucket}/{prefix_data}/split_cicd"

# begin training job
train_args = pytorch_estimator.fit(inputs=inputs, job_name=job_name, logs='All')

In [12]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

# create training step using train args
step_train = TrainingStep(
    name="SafetyTrain",
    step_args=train_args,
)

### Define a Model Evaluation Step to Evaluate the Trained Model

#### Create Custom Evaluation Script

In [None]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile
import argparse
import joblib
import numpy as np
import pandas as pd
from ultralytics import YOLO


if __name__ == "__main__":
    
    parser = argparse.ArgumentParser()

    parser.add_argument('--data', type=str, default='data-eval.yaml') # yaml config file for custom dataset

    # obtain arguments
    args, _ = parser.parse_known_args()
    
    # define path to model and extract
    model_path = '/opt/ml/processing/model/model.tar.gz'
    with tarfile.open(model_path) as tar:
        tar.extractall(path="/opt/ml/processing/model/")
    
    # evaluate model on test dataset
    print('EVALUATING MODEL ON TEST DATASET...')
    model_val = YOLO('/opt/ml/processing/model/model.pt')
    metrics = model_val.val(data=args.data, split='test')
    
    # print evaluation metric to compare performance between models
    print('-------------')
    print('-------------')
    print('-------------')
    print('MODEL EVALUATION METRIC:')
    print('mAP50:', round(metrics.box.map50, 4))
    print('-------------')
    print('-------------')
    print('-------------')
    
    # obtain mAP50 metric, round to 4 decimal places
    map50 = round(metrics.box.map50, 4)

    # create report dict with metric value
    report_dict = {
        "detection_metrics": {
            "mAP50": {"value": map50},
        },
    }

    # output evaluation in json format
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

#### Create PyTorch Processor

In [13]:
#Initialize the PyTorchProcessor
pytorch_processor_eval = PyTorchProcessor(
    framework_version='2.1.0',
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=processing_instance_count,
    py_version='py310',
    base_job_name='pytorch-safety-eval',
    sagemaker_session=pipeline_session,
)

# define args for evaluation step
eval_args = pytorch_processor_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts, # model artifacts from train step
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=inputs, # images
            destination="/opt/ml/processing/input/code/datasets",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="evaluation.py", # custom evaluation py script
    source_dir='code',
)

In [14]:
from sagemaker.workflow.properties import PropertyFile

# define json evaluation report
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# create evaluation step using eval args
step_eval = ProcessingStep(
    name="SafetyEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

### Define a Create Model Step to Create a Model

In [19]:
# trained model artifacts
model_data = step_train.properties.ModelArtifacts.S3ModelArtifacts

# image uri for use with PyTorch model, needed for compatibility
image_uri = sagemaker.image_uris.retrieve(framework='pytorch',
                                          region=region, version='2.1.0',
                                          py_version='py310',
                                          image_scope='inference',
                                          instance_type='ml.m5.xlarge'
)

# create a PyTorch model
pytorch_model = PyTorchModel(
    model_data=model_data, # trained model artifacts
    role=role,
    entry_point='inference.py', # custom inference script, locate in code directory
    image_uri=image_uri,
    source_dir='code',
    sagemaker_session=pipeline_session,
)

In [20]:
from sagemaker.workflow.model_step import ModelStep

# create model step
step_create_model = ModelStep(
    name="SafetyCreateModel",
    step_args=pytorch_model.create(instance_type="ml.m5.xlarge", accelerator_type="ml.eia1.medium"),
)

### Define a Transform Step to Perform Batch Transformation

In [None]:
from sagemaker.transformer import Transformer
from sagemaker.inputs import BatchDataCaptureConfig
from sagemaker.workflow.quality_check_step import DataQualityCheckConfig

transformer_output = f"s3://{default_bucket}/SafetyTransform" # transformer job results

# Define the S3 URI where captured data will be stored
data_capture_destination = f"{transformer_output}/captured_data"

# create a transformer from the PyTorch model
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    output_path=transformer_output,
    accept='application/json',
    max_payload=10
)

In [22]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

# create transform step using transformer and batch data as input
step_transform = TransformStep(
    name="SafetyTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

### Define a Register Model Step to Create a Model Package

In [23]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

# define model metrics source for use when registering model
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

# define args for register model step
register_args = pytorch_model.register(
    content_types=["image/jpeg"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
    domain="COMPUTER_VISION"
)

# create register model step using register args
step_register = ModelStep(name="SafetyRegisterModel", step_args=register_args)

### Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

In [24]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join


# create a fail step for when mAP is less than the defined threshold
step_fail = FailStep(
    name="SafetyMAPFail",
    error_message=Join(on=" ", values=["Execution failed due to mAP <", map_threshold]),
)

### Define a Condition Step based on mAP Threshold Value

In [25]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


# create gte condition
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="detection_metrics.mAP50.value",
    ),
    right=map_threshold,
)

# create condition step
# if gte - register, create, and transform
# otherwise - execute fail step
step_cond = ConditionStep(
    name="SafetyMAPCond",
    conditions=[cond_gte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

### Define a Pipeline of Parameters, Steps, and Conditions

In [26]:
from sagemaker.workflow.pipeline import Pipeline


# define pipeline
pipeline_name = f"SafetyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        map_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

### Examine the Pipeline Definition

In [27]:
import json


# examine pipeline definition
definition = json.loads(pipeline.definition())
definition

INFO:sagemaker.processing:Uploaded code to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/f66c1f24b36759d0859fb2723e09ba04/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/54f0ef6bee583ff9186b762aaf572190/runproc.sh
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.processing:Uploaded code to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/b3646e43958aa8a5814aced796721ea0/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-414754026690/safety/catalog/catalog_query.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-414754026690/safety/data/split_cicd/batch/images'},
  {'Name': 'mAPThreshold', 'Type': 'Float', 'DefaultValue': 0.001}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'SafetyProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {

## Submit the Pipeline to SageMaker, Start Execution

In [28]:
# submit pipeline
pipeline.upsert(role_arn=role)

INFO:sagemaker.processing:Uploaded code to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/f66c1f24b36759d0859fb2723e09ba04/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/54f0ef6bee583ff9186b762aaf572190/runproc.sh
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.processing:Uploaded code to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/b3646e43958aa8a5814aced796721ea0/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh
INFO:sagemaker.processing:Uploaded code to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/f66c1f24b36759d0859fb2723e09ba04/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-414754026690/SafetyPipeline/code/54f0ef6bee583ff9186b762a

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:414754026690:pipeline/SafetyPipeline',
 'ResponseMetadata': {'RequestId': '436f584f-d5d8-4d87-987c-701f6f38c1d8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '436f584f-d5d8-4d87-987c-701f6f38c1d8',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Sun, 25 Feb 2024 23:22:31 GMT'},
  'RetryAttempts': 0}}

In [29]:
# start pipeline
execution = pipeline.start()

## Pipeline Operations: Examining and Waiting for Pipeline Execution

In [30]:
# describe pipeline execution
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:414754026690:pipeline/SafetyPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:414754026690:pipeline/SafetyPipeline/execution/mu3iiq3yysqe',
 'PipelineExecutionDisplayName': 'execution-1708903355094',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'safetypipeline',
  'TrialName': 'mu3iiq3yysqe'},
 'CreationTime': datetime.datetime(2024, 2, 25, 23, 22, 35, 33000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 2, 25, 23, 22, 35, 33000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:414754026690:user-profile/d-kv43uh6emydw/jraimondi',
  'UserProfileName': 'jraimondi',
  'DomainId': 'd-kv43uh6emydw'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:414754026690:user-profile/d-kv43uh6emydw/jraimondi',
  'UserProfileName': 'jraimondi',
  'DomainId': 'd-kv43uh6emydw'},
 'ResponseMetadata': {'RequestId': '59f5513f-5f8d-4a66-9

In [None]:
# wait for pipeline execution to complete
execution.wait()

In [37]:
# list steps in the pipeline execution
execution.list_steps()

[{'StepName': 'SafetyTransform',
  'StartTime': datetime.datetime(2024, 2, 25, 23, 33, 25, 889000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 2, 25, 23, 38, 40, 642000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:414754026690:transform-job/pipelines-mu3iiq3yysqe-SafetyTransform-3WvsFL62zq'}},
  'AttemptCount': 1},
 {'StepName': 'SafetyCreateModel-CreateModel',
  'StartTime': datetime.datetime(2024, 2, 25, 23, 33, 23, 656000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 2, 25, 23, 33, 25, 91000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:414754026690:model/pipelines-mu3iiq3yysqe-safetycreatemodel-cr-gmljxcgxkf'}},
  'AttemptCount': 1},
 {'StepName': 'SafetyRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2024, 2, 25, 23, 33, 22, 742000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 2, 25, 23, 33, 24, 572

## Monitors

### Monitor Baseline Suggestion Job

In [38]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

# Define the S3 URI where captured data will be stored
baseline_results_uri = f"{transformer_output}/baseline_results"

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    #volume_size_in_gb=20,
    #max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=input_data_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating processing-job with name baseline-suggestion-job-2024-02-25-23-40-05-297


...

KeyboardInterrupt: 

### Create and Schedule Data Quality Monitor

In [39]:
from sagemaker.model_monitor import CronExpressionGenerator
from sagemaker.model_monitor import BatchTransformInput
from sagemaker.model_monitor.dataset_format import MonitoringDatasetFormat


# define s3 uri for monitor output report
s3_report_uri = f"{transformer_output}/report"

# create data quality monitor
data_quality_model_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
)

# create data quality monitor schedule
schedule = data_quality_model_monitor.create_monitoring_schedule(
    monitor_schedule_name='SafetyDataQuality',
    batch_transform_input=BatchTransformInput(
        data_captured_destination_s3_uri=batch_data_uri,
        destination="/opt/ml/processing/input",
        dataset_format=MonitoringDatasetFormat.csv(header=False),
    ),
    output_s3_uri=s3_report_uri,
    statistics= f"{baseline_results_uri}/statistics.json",
    constraints = f"{baseline_results_uri}/constraints.json",
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: SafetyDataQuality
ERROR:sagemaker.model_monitor.model_monitoring:Failed to create monitoring schedule.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/sagemaker/model_monitor/model_monitoring.py", line 2050, in create_monitoring_schedule
    self._create_monitoring_schedule_from_job_definition(
  File "/opt/conda/lib/python3.10/site-packages/sagemaker/model_monitor/model_monitoring.py", line 1594, in _create_monitoring_schedule_from_job_definition
    self.sagemaker_session.sagemaker_client.create_monitoring_schedule(
  File "/opt/conda/lib/python3.10/site-packages/botocore/client.py", line 553, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/opt/conda/lib/python3.10/site-packages/bo

ResourceInUse: An error occurred (ResourceInUse) when calling the CreateMonitoringSchedule operation: Monitoring Schedule arn:aws:sagemaker:us-east-1:414754026690:monitoring-schedule/safetydataquality already exists