# Build a SageMaker pipeline to orchestrate ML project steps

In [1]:
!pip install -U sagemaker -q

## setup 

In [2]:
import os
import numpy as np
import sagemaker, boto3, json
from sagemaker.session import Session

sagemaker_session = Session()
account_id = sagemaker_session.account_id()
aws_role = sagemaker_session.get_caller_identity_arn()
aws_region = boto3.Session().region_name
sess = sagemaker.Session()

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [3]:
bucket_name = f"sagemaker-cv-bootcamp-{aws_region}-{account_id}"
# If your labeling job name is different, update the following variable.
labeling_job_name = "cv-bootcamp-manuf-at-scale-cap-no-cap"

s3_manifest_file_path = (f"s3://{bucket_name}/labeled-data/{labeling_job_name}/manifests/output/output.manifest")

In [4]:
model_filename = "ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz"

## Pipeline parameters

In [5]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

In [6]:
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.large")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.g4dn.xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

## Prepare the data processing job to be part of a pipeline

- data processing:
    - data_processor = TensorFlowProcessor(..),
    - Create a script that contains the data processing logic
    - We configured the ProcessingInput and ProcessingOutput
    - We configured and run the processor using data_processor.run(..)

In [7]:
import boto3
import sagemaker
from sagemaker import get_execution_role

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.pipeline_context import PipelineSession

In [8]:
pipeline_session = PipelineSession()
role = get_execution_role()
region = sagemaker.Session().boto_region_name

In [9]:
from sagemaker.tensorflow import TensorFlowProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

# Initialize the TensorFlowProcessor
data_preprocessor = TensorFlowProcessor(
    framework_version='2.3',
    role=get_execution_role(),
    instance_type='ml.m5.large',
    instance_count=1,
    base_job_name="img-data-preprocessing",
    py_version='py37',
    sagemaker_session=pipeline_session
)

In [10]:
# Run the processing job
data_processor_args = data_preprocessor.run(
    code='preprocessing.py',
    source_dir='preprocess',
    inputs=[
        ProcessingInput(
            input_name='manifest',
            source=s3_manifest_file_path,
            destination='/opt/ml/processing/input/manifest'
        ),
        ProcessingInput(
            input_name='images',
            source=f"s3://{bucket_name}/raw-data/manufacturing-at-scale-cap-no-cap/",
            destination='/opt/ml/processing/input/images'
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='manifests',
            source='/opt/ml/processing/output/manifests',
            destination=f's3://{bucket_name}/prepared_data/manifests/',
            s3_upload_mode="Continuous"
        ),
        ProcessingOutput(
            output_name='augmented_train_images',
            source='/opt/ml/processing/output/augmented_train_images',
            destination=f's3://{bucket_name}/prepared_data/train/images/',
            s3_upload_mode="Continuous"
        ),
        ProcessingOutput(
            output_name='augmented_validation_images',
            source='/opt/ml/processing/output/augmented_validation_images',
            destination=f's3://{bucket_name}/prepared_data/validation/images/',
            s3_upload_mode="Continuous"
        )
    ],
    arguments=[
        "--num_augmentations_per_img", str(5),
        "--output_s3_bucket_name", bucket_name,
        "--label_attribute_name", "cap-no-cap"
    ]
)



In [11]:
from sagemaker.workflow.steps import ProcessingStep

data_process_step = ProcessingStep(
    name="DataPreprocessing-ImageAugmentation",
    step_args=data_processor_args
)

## Prepare the training job to be part of a pipeline


- model training:
    - model_estimator = Estimator(..),
    - Create a script that contains the model training logic.
    - We configured the TrainingInput, SageMaker by default copies the trained model from a specific location to S3.
    - We configured and run the training job using model_estimator.fit({"train": .., "test": ..})

In [12]:
from sagemaker import image_uris

training_instance_type = "ml.p3.2xlarge"
# training_instance_type = "ml.g4dn.xlarge"
# training_instance_type = "ml.m5.xlarge"

# model version can be found in
# https://sagemaker.readthedocs.io/en/stable/doc_utils/pretrainedmodels.html
# available images: https://github.com/aws/deep-learning-containers/blob/master/available_images.md
# Retrieve the docker image
train_image_uri = image_uris.retrieve(
    region="us-east-1",
    framework="tensorflow",
    version="2.4",
    image_scope="training",
    instance_type=training_instance_type
)
train_image_uri

'763104351884.dkr.ecr.us-east-1.amazonaws.com/tensorflow-training:2.4-gpu-py37'

In [13]:
hyperparameters = {
    "batch_size": "3",
    "reinitialize_top_layer": "Auto",
    "train_only_top_layer": "False",
    "optimizer": "adam",
    "learning_rate": "0.001",
    "beta_1": "0.9",
    "beta_2": "0.999",
    "momentum": "0.9",
    "epsilon": "1e-07",
    "rho": "0.95",
    "initial_accumulator_value": "0.1",
    "early_stopping": "False",
    "early_stopping_patience": "5",
    "early_stopping_min_delta": "0.0",
    "epochs": "1"
}

In [14]:
!flake8 --max-line-length=120 --ignore=E402 train/custom_object_detection_training.py

train/custom_object_detection_training.py:420:1: W293 blank line contains whitespace


In [15]:
from sagemaker.estimator import Estimator
from sagemaker.utils import name_from_base
from sagemaker.tuner import HyperparameterTuner

training_job_name = name_from_base(f"cv-bootcamp-manuf1-model")

training_metric_definitions = [
    {"Name": "val_localization_loss", "Regex": "Val_localization=([0-9\\.]+)"},
    {"Name": "val_classification_loss", "Regex": "Val_classification=([0-9\\.]+)"},
    {"Name": "train_loss", "Regex": "loss=([0-9\\.]+)."},
]

# Create SageMaker Estimator instance
eager_tf_od_estimator = Estimator(
    entry_point="custom_object_detection_training.py",
    source_dir="train",
    role=aws_role,
    image_uri=train_image_uri,
    model_uri=f"s3://{bucket_name}/pre-trained-models/{model_filename}",
    instance_count=1,
    instance_type=training_instance_type,
    # instance_type="local",
    max_run=360000,
    volume_size=50,
    hyperparameters=hyperparameters,
    output_path=f"s3://{bucket_name}/manuf1-model/output/",
    base_job_name=training_job_name,
    metric_definitions=training_metric_definitions,
    sagemaker_session=pipeline_session
)

In [16]:
from sagemaker.inputs import TrainingInput

In [17]:
train_dataset_s3_path = f"s3://{bucket_name}/prepared_data/train/"
validation_dataset_s3_path = f"s3://{bucket_name}/prepared_data/validation/"
manifests_dataset_s3_path = f"s3://{bucket_name}/prepared_data/manifests/"

In [18]:
!aws s3 ls {manifests_dataset_s3_path}

In [19]:
training_data_input = TrainingInput(
    s3_data=data_process_step.properties.ProcessingOutputConfig.Outputs["augmented_train_images"].S3Output.S3Uri
)
validation_data_input = TrainingInput(
    s3_data=data_process_step.properties.ProcessingOutputConfig.Outputs["augmented_validation_images"].S3Output.S3Uri
)
manifests_data_input = TrainingInput(
    s3_data=data_process_step.properties.ProcessingOutputConfig.Outputs["manifests"].S3Output.S3Uri
)

In [20]:
%%time
# Launch a SageMaker Training job by passing s3 path of the training data
training_args = eager_tf_od_estimator.fit({
    "training": training_data_input,
    "validation": validation_data_input,
    "manifests": manifests_dataset_s3_path
#})
}, logs=False, wait=False)

CPU times: user 210 µs, sys: 0 ns, total: 210 µs
Wall time: 307 µs


In [21]:
from sagemaker.workflow.steps import TrainingStep

object_detection_model_training_step = TrainingStep(
    name="Manuf1-Eager-TF-Object-Detection",
    step_args=training_args
)

## Pipeline execution

In [22]:
from sagemaker.workflow.pipeline import Pipeline

In [23]:
eager_object_detection_pipeline = Pipeline(
    name="eager-object-detection-training-pipeline",
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        training_instance_count
    ],
    steps=[
        data_process_step,
        object_detection_model_training_step
    ]
)

In [24]:
eager_object_detection_pipeline.name

'eager-object-detection-training-pipeline'

In [25]:
eager_object_detection_pipeline.steps

[<sagemaker.workflow.steps.ProcessingStep at 0x7fd0de1d4520>,
 <sagemaker.workflow.steps.TrainingStep at 0x7fd0dee77610>]

In [26]:
eager_object_detection_pipeline.parameters

[ParameterString(name='ProcessingInstanceType', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='ml.m5.large'),
 ParameterInteger(name='ProcessingInstanceCount', parameter_type=<ParameterTypeEnum.INTEGER: 'Integer'>, default_value=1),
 'ml.p3.2xlarge',
 ParameterInteger(name='TrainingInstanceCount', parameter_type=<ParameterTypeEnum.INTEGER: 'Integer'>, default_value=1)]

In [27]:
import json

json.loads(eager_object_detection_pipeline.definition())

INFO:sagemaker.processing:Uploaded preprocess to s3://sagemaker-us-east-1-594841669104/eager-object-detection-training-pipeline/code/62a239d48b9848b8e1cde9200f9c6c18/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-594841669104/eager-object-detection-training-pipeline/code/54f0ef6bee583ff9186b762aaf572190/runproc.sh


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'DataPreprocessing-ImageAugmentation',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.large',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '763104351884.dkr.ecr.us-east-1.amazonaws.com/tensorflow-training:2.3-cpu-py37',
     'ContainerArguments': ['--num_augmentations_per_img',
      '5',
      '--output_s3_bucket_name',
      'sagemaker-cv-bootcamp-us-east-1-594841669104',
      '--label_attribute_name',
      'cap-no-cap']

### Creating, updating and starting a pipeline

Submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps.



In [28]:
eager_object_detection_pipeline.upsert(role_arn=role)

INFO:sagemaker.processing:Uploaded preprocess to s3://sagemaker-us-east-1-594841669104/eager-object-detection-training-pipeline/code/62a239d48b9848b8e1cde9200f9c6c18/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-us-east-1-594841669104/eager-object-detection-training-pipeline/code/54f0ef6bee583ff9186b762aaf572190/runproc.sh


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:594841669104:pipeline/eager-object-detection-training-pipeline',
 'ResponseMetadata': {'RequestId': '3355121b-c96d-45b8-bc8e-2c0fb8f4e60e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '3355121b-c96d-45b8-bc8e-2c0fb8f4e60e',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '108',
   'date': 'Wed, 14 Feb 2024 06:01:17 GMT'},
  'RetryAttempts': 0}}

In [29]:
execution = eager_object_detection_pipeline.start()

In [30]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:594841669104:pipeline/eager-object-detection-training-pipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:594841669104:pipeline/eager-object-detection-training-pipeline/execution/goomkbjgk4yn',
 'PipelineExecutionDisplayName': 'execution-1707890479492',
 'PipelineExecutionStatus': 'Failed',
 'PipelineExperimentConfig': {'ExperimentName': 'eager-object-detection-training-pipeline',
  'TrialName': 'goomkbjgk4yn'},
 'FailureReason': 'Step failure: One or multiple steps failed.',
 'CreationTime': datetime.datetime(2024, 2, 14, 6, 1, 19, 421000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 2, 14, 6, 1, 21, 572000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:594841669104:user-profile/d-ullkugtmf0xx/default-1674085267568',
  'UserProfileName': 'default-1674085267568',
  'DomainId': 'd-ullkugtmf0xx'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:594841669104:u