# Load Pipeline Libraries

In [1]:
import sagemaker
import boto3
import os
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TrainingStep, ProcessingStep, CacheConfig
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.pytorch import PyTorch, PyTorchModel
from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.drift_check_baselines import DriftCheckBaselines

from botocore.exceptions import ClientError

sagemaker.config INFO - Fetched defaults config from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


# Intilise Global Variables

In [2]:
# Get SageMaker session, default bucket, and execution role
role = sagemaker.get_execution_role()
sess = sagemaker.Session()
region = sess.boto_region_name
default_bucket = sess.default_bucket()
# Project Prefix needs to be the location in S3 where your project resides
project_prefix = "/".join(sess.default_bucket_prefix.split("/")[0:2])

input_s3_uri = f"s3://{default_bucket}/{project_prefix}/Datasets/Cityscape_Dataset/"

pipeline_name = "Detectron2ModelPipeline-Pankaj"

#  Model Registry Creation for Model
model_package_group_name = "maskrcnn-R-50FPN-inst-seg-det2"
# Change to description of your model and purpose
model_package_group_desc = "Detectron 2 Instance Segementaion Model Fine-tuned on Cityscape"

py_version = "py3"
pytorch_version = "1.6.0"

# Instead of getting them here, we'll define them as parameters

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


# Create Model Package Group (Required)

In [3]:
# Create SageMaker client
sm_client = boto3.client('sagemaker', region_name=region)
print(f"SageMaker client created for region: {region}")

# Attempt to create the Model Package Group
try:
    model_package_group_input_dict = {
        "ModelPackageGroupName": model_package_group_name,
        "ModelPackageGroupDescription": model_package_group_desc,
    }
    print(f"Attempting to create Model Package Group: {model_package_group_name}...")
    create_model_package_group_response = sm_client.create_model_package_group(
        **model_package_group_input_dict
    )
    print(f"Successfully created Model Package Group.")
    print(f"ModelPackageGroup Arn : {create_model_package_group_response['ModelPackageGroupArn']}")

except ClientError as e:
    error_code = e.response.get('Error', {}).get('Code')
    error_message = e.response.get('Error', {}).get('Message', '')

    # Check if the error is because the group already exist
    # 'ResourceInUseException' is the typical error code for this scenario
    if error_code == 'ResourceInUseException' or 'already exists' in error_message.lower():
        print(f"Model Package Group '{model_package_group_name}' already exists.\nSkipping creation.")
        try:
            describe_response = sm_client.describe_model_package_group(ModelPackageGroupName=model_package_group_name)
            print(f"Existing ModelPackageGroup Arn : {describe_response['ModelPackageGroupArn']}")
        except ClientError as desc_e:
             print(f"Could not describe existing group '{model_package_group_name}': {desc_e}")

    else:
        print(f"Error creating Model Package Group '{model_package_group_name}': {error_message} (Code: {error_code})")
        raise e

SageMaker client created for region: ap-southeast-1
Attempting to create Model Package Group: maskrcnn-R-50FPN-inst-seg-det2...
Model Package Group 'maskrcnn-R-50FPN-inst-seg-det2' already exists.
Skipping creation.
Existing ModelPackageGroup Arn : arn:aws:sagemaker:ap-southeast-1:654654454011:model-package-group/maskrcnn-R-50FPN-inst-seg-det2


# Define Pipeline Parameters (Customise as Required)

## Pipeline Parameters

In [4]:
param_processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
param_processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge" # Default, can override
)
param_training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)
param_training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.g4dn.xlarge"
)

## Data Parameters

In [5]:
param_input_data_s3_uri = ParameterString(
    name="InputDataS3Uri",
    default_value=f"s3://{default_bucket}/{project_prefix}/Dataset/"
)

param_eval_data_s3_uri = ParameterString(
    name="EvalDataS3Uri",
    default_value="" # Set if evaluation requires separate data
)

## Model & Training Parameters

In [6]:
param_model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

### Project Specific Parameters

In [7]:
param_d2_model_config_name = ParameterString(
    name="Detectron2ModelConfig",
    default_value="COCO-InstanceSegmentation/mask_rcnn_R_101_DC5_3x.yaml",
)

param_d2_dataset_train_name = ParameterString(
    name="Detectron2DatasetTrainName",
    default_value="cityspace_train",
)

param_d2_learning_rate = ParameterFloat(name="LearningRate", default_value=0.00025)
param_d2_max_iter = ParameterInteger(name="MaxIter", default_value=400)
param_d2_ims_per_batch = ParameterInteger(name="ImagesPerBatch", default_value=2)
param_d2_num_workers = ParameterInteger(name="NumWorkers", default_value=2)
param_d2_checkpoint_period = ParameterInteger(name="CheckpointPeriod", default_value=500)
param_d2_freeze_at = ParameterInteger(name="FreezeAt", default_value=2)
param_d2_roi_batch_size_per_image = ParameterInteger(name="RoiBatchSizePerImage", default_value=256)
param_d2_resume_training = ParameterBoolean(name="ResumeTraining", default_value=True)

# Training Step

In [8]:
source_dir = "./" # Location of all the scripts

# Define the Pytorch Estimator
pytorch_estimator = PyTorch(
    entry_point="train.py",
    source_dir=source_dir,
    role=role,
    framework_version=pytorch_version,
    py_version=py_version,
    instance_count=param_training_instance_count,
    instance_type=param_training_instance_type,
    hyperparameters={
        "model-config-name": param_d2_model_config_name,
        "dataset-train-name": param_d2_dataset_train_name,
        "learning-rate": param_d2_learning_rate,
        "max-iter": param_d2_max_iter,
        "ims-per-batch": param_d2_ims_per_batch,
        "num-workers": param_d2_num_workers,
        "checkpoint-period": param_d2_checkpoint_period,
        "freeze-at": param_d2_freeze_at,
        "roi-batch-size-per-image": param_d2_roi_batch_size_per_image,
        "resume": param_d2_resume_training
    },
    # The output path for model artifacts is automatically managed by SageMaker
    # train.py should save the final model to os.environ['SM_MODEL_DIR']
    sagemaker_session=PipelineSession(),
    requirements_file=os.path.join(source_dir, "requirements.txt")
    # checkpoint_s3_uri: If resuming, SageMaker might handle this if configured correctly,
    # otherwise train.py needs logic to load from a specific path (e.g., passed via hyperparameter)
    # checkpoint_local_path="/opt/ml/checkpoints", # Default checkpoint location inside container
)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.SecurityGroupIds


In [9]:
# Define Training Step
step_train = TrainingStep(
    name="TrainDetectron2Model",
    estimator=pytorch_estimator,
    inputs={
        # Define training data input. train.py will access this via /opt/ml/input/data/training
        "training": TrainingInput(
            s3_data=param_input_data_s3_uri,
            distribution="FullyReplicated",
            content_type="application/octet-stream", # Adjust if needed
            s3_data_type="S3Prefix",
        )
        # Add more channels if train.py expects them (e.g., 'validation')
    },
    cache_config=CacheConfig(enable_caching=True, expire_after="30d") # Optional: Cache step results
)

# Evaluation Step

In [10]:
eval_processor = FrameworkProcessor(
    estimator_cls=PyTorch,
    framework_version=pytorch_version,
    py_version=py_version,
    role=role,
    instance_count=param_processing_instance_count,
    instance_type=param_processing_instance_type,
    sagemaker_session=PipelineSession(),
)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.SecurityGroupIds


In [11]:
# Define PropertyFile for capturing evaluation results
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation", # Must match output name in ProcessingStep
    path="evaluation.json",    # Must match the filename saved by evaluate.py
)

In [12]:
# Define Evaluation Processing Step
step_eval = ProcessingStep(
    name="EvaluateModel",
    processor=eval_processor,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts, # Get model from training step
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=param_eval_data_s3_uri, # Pass training data (or specific eval data)
            destination="/opt/ml/processing/input_data",
            input_name="input_data",
            s3_data_type="S3Prefix",
        ),
        # Add separate evaluation data input if needed
        # ProcessingInput(
        #     source=param_eval_data_s3_uri,
        #     destination="/opt/ml/processing/eval_data",
        #     input_name="eval_data",
        #     s3_data_type="S3Prefix",
        # ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", # Must match PropertyFile output_name
            source="/opt/ml/processing/evaluation", # evaluate.py saves report here
            destination=f"s3://{default_bucket}/{project_prefix}/evaluation"
        ),
    ],
    code=os.path.join(source_dir, "evaluate.py"), # Path to evaluation script
    property_files=[evaluation_report], # Link the PropertyFile
    job_arguments=[ # Pass arguments to evaluate.py if needed
        "--model-config-name", param_d2_model_config_name,
        "--dataset-train-name", param_d2_dataset_train_name, # Or a specific eval dataset name
    ]
)

# Register Model Step

In [13]:
from sagemaker.model import Model
from sagemaker.workflow.properties import PropertyFile

model = PyTorchModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    entry_point="inference.py",
    source_dir=source_dir,
    dependencies=["requirements.txt"],
    image_uri=pytorch_estimator.training_image_uri(),
    sagemaker_session=PipelineSession()
)

# # Create model step
# step_model_create = ModelStep(
#     name="MyModelCreationStep",
#     step_args=model.create()
# )

# Define model metrics
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

# Register model step
register_model_step_args = model.register(
    content_types=["image/jpeg", "image/png", "application/x-npy"],
    response_types=["image/jpeg"],
    inference_instances=["ml.g4dn.xlarge"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status="PendingManualApproval",
    description="Fine-tuned Detectron2 Instance Segmentation Model for Cityscapes"
)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.Model.VpcConfig


# Define Pipeline

In [14]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        param_processing_instance_count,
        param_processing_instance_type,
        param_training_instance_count,
        param_training_instance_type,
        param_input_data_s3_uri,
        param_eval_data_s3_uri,
        param_model_approval_status,
        param_d2_model_config_name,
        param_d2_dataset_train_name,
        param_d2_learning_rate,
        param_d2_max_iter,
        param_d2_ims_per_batch,
        param_d2_num_workers,
        param_d2_checkpoint_period,
        param_d2_freeze_at,
        param_d2_roi_batch_size_per_image,
        param_d2_resume_training,
    ],
    steps=[step_train, step_eval],# step_register], # Define the sequence of steps
    sagemaker_session=sagemaker.Session(boto_session=boto3.Session(region_name=region)),
)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


# Describe Pipeline

In [15]:
print("Pipeline Definition:")
print(pipeline.definition()) # Print JSON definition

Pipeline Definition:
{"Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "ProcessingInstanceCount", "Type": "Integer", "DefaultValue": 1}, {"Name": "ProcessingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "TrainingInstanceCount", "Type": "Integer", "DefaultValue": 1}, {"Name": "TrainingInstanceType", "Type": "String", "DefaultValue": "ml.g4dn.xlarge"}, {"Name": "InputDataS3Uri", "Type": "String", "DefaultValue": "s3://amazon-sagemaker-654654454011-ap-southeast-1-f5cec0caca51/dzd_aoermbikpd92e9/bciv16651x7di9/Dataset/"}, {"Name": "EvalDataS3Uri", "Type": "String", "DefaultValue": ""}, {"Name": "ModelApprovalStatus", "Type": "String", "DefaultValue": "PendingManualApproval"}, {"Name": "Detectron2ModelConfig", "Type": "String", "DefaultValue": "COCO-InstanceSegmentation/mask_rcnn_R_101_DC5_3x.yaml"}, {"Name": "Detectron2DatasetTrainName", "Type": "String", "DefaultValue": "cityspace_train"}, {"Name": "LearningRate", "Type": "Float", "DefaultValue

In [16]:
print(f"Upserting pipeline {pipeline_name}...")

Upserting pipeline Detectron2ModelPipeline-Pankaj...


In [17]:
pipeline.upsert(
    role_arn=role,
    description="Detectron2 Fine-tuning Pipeline",
)
print("Pipeline upsert complete.")

Pipeline upsert complete.


In [18]:
# Example of how to start an execution
execution = pipeline.start()