# Bird Object Detection - SageMaker Pipelines

Now that we have trained models and deployed them with SageMaker, we will look into how we can create parametrizable and reproducible ML pipelines with SageMaker Pipelines.

Amazon SageMaker Model Building Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines. It also enables them to deploy custom-build models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts. They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.

In [None]:
import sagemaker
from sagemaker import get_execution_role

sess = sagemaker.Session()
# this will create a 'default' sagemaker bucket if it doesn't exist (sagemaker-region-accountid)
bucket = sess.default_bucket()

prefix = "DEMO-ObjectDetection-birds"

role = get_execution_role()

## Pipeline parameters

Define Parameters to Parametrize Pipeline Execution

The supported parameter types include:

* ParameterString - represents a str Python type

* ParameterInteger - represents an int Python type

* ParameterFloat - represents a float Python type

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

batch_data_uri = "s3://{}/{}".format(bucket, prefix + '/batch')

processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.large"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.p3.8xlarge")

batch_data_input = ParameterString(name="BatchDataInput", default_value=batch_data_uri)

## Define a Processing Step for Data Preparation

Here we will use the code we work on the first notebook, data preparation, to launch in a SageMaker Processing job. Have a look at src/preprocess.py and note how it does the same things we did cell by cell - downloading the birds dataset and preparing the training and validation datasets.

In [None]:
from sagemaker.mxnet.estimator import MXNet
from sagemaker.processing import FrameworkProcessor

from sagemaker import image_uris

training_image = image_uris.retrieve("mxnet", sess.boto_region_name, instance_type="ml.m5.xlarge", image_scope="training", version="1.8")

mxnet_processor = FrameworkProcessor(
    MXNet,
    '1.8',
    image_uri=training_image,
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="mxnet-processor",
    role=role,
    command=["python3"]
)

Below is how we would run the processing job outside of the pipeline, note how the same parameters to run() are passed to the ProcessingStep

```python
mxnet_processor.run(
    code='preprocess.py',
    source_dir='src',
    outputs=[ProcessingOutput(source='/opt/ml/processing/output/train'),
        ProcessingOutput(source='/opt/ml/processing/output/validation')]
)
```

For using the MXNet as a processing container in a pipeline, the ProcessingStep does not recognize the 'command' parameter of the FrameworkProcessor and does not change the entrypoing to python3 from /bin/bash. As such we need to do a couple of tricks, first we copy our python pre-processing script to a location in S3 (together with the utils functions). Then we define the entrypoint `code` to be our bash script that downloads those scripts from S3 and executes them.

In [None]:
!aws s3 cp ./src/preprocess.py s3://$bucket/$prefix/pipeline_script/
!aws s3 cp ./src/utils.py s3://$bucket/$prefix/pipeline_script/

We also define a new 'cell magic' in this Jupyter Notebook that allows us to writefile with variables `write template` so that we can pass our bucket and prefix values.

In [None]:
from IPython.core.magic import register_line_cell_magic

@register_line_cell_magic
def writetemplate(line, cell):
    with open(line, 'w') as f:
        f.write(cell.format(**globals()))

In [None]:
%%writetemplate src/preprocess_pipeline.sh
#!/bin/bash
aws s3 cp s3://{bucket}/{prefix}/pipeline_script/ . --recursive
python3 preprocess.py

We are now ready to define our pipeline ProcessingStep. Note the parameter we have defined called **cache_config** this will allow Pipeline steps to cache their results and not run when the step parameters are similar across executions. This allows for example to quickly iterate on the training job parameters without needing to re-process the data. The cache is set to expire after a certain time, in this case, 30 days.

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, CacheConfig

step_process = ProcessingStep(
    name="BirdDetectionDataPreparation",
    processor=mxnet_processor,
    cache_config = CacheConfig(enable_caching=True, expire_after='p30d'),
    code='./src/preprocess_pipeline.sh',
    outputs=[
        ProcessingOutput(output_name="train", source='/opt/ml/processing/output/train'),
        ProcessingOutput(output_name="validation", source='/opt/ml/processing/output/validation')
    ]
)

## Define a Training Step

Now let's use our Object Detection estimator to define a training step, that takes as inputs the outputs of the previous step.


In [None]:
from sagemaker import image_uris

training_image = image_uris.retrieve( "object-detection", sess.boto_region_name, version="1")

s3_output_location = "s3://{}/{}/output".format(bucket, prefix)

num_epochs, lr_steps = (100, "33,67")

od_model = sagemaker.estimator.Estimator(
    training_image,
    role,
    instance_count=1,
    instance_type=training_instance_type,
    volume_size=50,
    max_run=3600,
    input_mode="File",
    output_path=s3_output_location,
    sagemaker_session=sess,
    hyperparameters={
        "base_network": "resnet-50",
        "use_pretrained_model": 1,
        "num_classes": 5,
        "mini_batch_size": 16,
        "epochs": 70,
        "learning_rate": 0.01,
        "lr_scheduler_step": lr_steps,
        "lr_scheduler_factor": 0.1,
        "optimizer": "sgd",
        "momentum": 0.9,
        "weight_decay": 0.0005,
        "overlap_threshold": 0.5,
        "nms_threshold": 0.45,
        "image_shape": 512,
        "label_width": 350,
        "num_training_samples": 238}
)

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="BirdDetectionTrain",
    estimator=od_model,
    cache_config = CacheConfig(enable_caching=True, expire_after='p30d'),
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="application/x-recordio",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="application/x-recordio",
        ),
    },
)

## And a Batch Transform job

Let's move our 'unseen' bird pictures to S3, and trigger a Batch transform job

In [None]:
unseen_image_input = sess.upload_data(path="./unseen/", bucket=bucket, key_prefix=prefix + '/unseen')

Before we can 'deploy' our model in a Batch fashion, we need to register it as a model with SageMaker (currently it's only an S3 artifact that the Training Job returned. To register as a deployable model, we need to specify the allowed instance types and accelerator types to be used for model deployment.

In [None]:
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

# we are using the same image for 'hosting' the model as we used for trianing it, since the image works for both use-cases
model = Model(
    image_uri=training_image,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

inputs = CreateModelInput(
    instance_type=["ml.m5.large", "ml.m5.xlarge", "ml.c5.large"],
    accelerator_type="ml.eia1.medium"
)
step_create_model = CreateModelStep(
    name="BirdDetectionCreateModel",
    model=model,
    inputs=inputs,
)

and now we create the Transformer and TransformStep. Note that transformer will use the image specified in our model object above.

In [None]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

from sagemaker.transformer import Transformer

# define a location for the Batch Job outputs
batch_image_output = f"s3://{bucket}/{prefix}/pipeline_batch_out"

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=batch_image_output,
)

step_transform = TransformStep(
    name="BirdDetectionBatchTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data_input)
)

## Finally Let's define and create our pipeline

Let's put together the steps before and create our pipeline. The steps provided don't need to be in order, as the dependencies between them will be resolved in the final Pipeline graph.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"BirdDetectionPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        training_instance_type,
        batch_data_input
    ],
    steps=[step_process, step_train, step_create_model, step_transform]
)

In [None]:
pipeline.upsert(role_arn=role)

## Running our Pipeline

If you navigate to the pipeline in SageMaker Studio 'Resources' tab -> 'Pipelines', you will see the below:

![SageMaker Pipeline](./images/sm-pipeline.png)

We can start an execution here or from the SageMaker Studio Pipelines view. When starting an execution, we can also define the values for the parameters of our pipeline.

In [None]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        BatchDataInput=unseen_image_input,
    )
)

execution.describe()

In [None]:
execution.wait()

You can view and start pipeline executions from the Studio -> SageMaker Resources -> Pipelines -> BirdDetectionPipeline.

Since we enabled caching, if we now re-run the pipeline with a changed value for a Training job parametere, the processing step will re-use the outputs generated previously.

In [None]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        BatchDataInput=unseen_image_input,
        TrainingInstanceType="ml.p3.2xlarge",
    )
)

execution.wait()

## Visualizing Pipeline Results

As before we can now review the results of our batch processing on the unseen images, hopefully with much better confidence scores due to the better learning_rate parameter pick.

In [None]:
import boto3
from src import utils
import pandas as pd

s3_client = boto3.client('s3')

prediction_files = s3_client.list_objects(Bucket=bucket,
                               Prefix=prefix+'/pipeline_batch_out')['Contents']
for prediction_file in prediction_files:

    predictions = pd.read_json('s3://{}/{}'.format(bucket, prediction_file['Key']))

    filename = prediction_file['Key'].rsplit('/', 1)[1].rsplit('.', 1)[0]

    utils.visualize_detection(
        './unseen/'+filename,
        predictions['prediction'].tolist(),
        ["017.Cardinal", "036.Northern_Flicker", "047.American_Goldfinch", "068.Ruby_throated_Hummingbird", "073.Blue_Jay"],
        thresh=0.7
    )

## Lineage Tracking

Notice how the different steps of the pipeline are organised in 'Experiments and trials' tab as a separate 'Trial' and how lineage can be achieved in this way.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sess)
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

In [None]:
execution.list_steps()[0]

In [None]:
step_train.properties.FinalMetricDataList.__dict__