# MLOps workshop with Amazon SageMaker

## Module 04: Automate the whole dataset preparation and model training pipeline with SageMaker Pipelines.

In the last module of this workshop we will create a repeatable production workflow which is typically run outside notebooks. To demonstrate automating the workflow, we'll use [Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines) for workflow orchestration. Purpose-built for machine learning (ML), SageMaker Pipelines helps you automate different steps of the ML workflow including data processing, model training, and batch prediction (scoring), and apply conditions such as approvals for model quality. It also includes a model registry and model lineage tracker.

## Workflow Automation with SageMaker Pipelines <a class="anchor" id="WorkflowAutomation">

In the previous parts of this notebook, we prototyped various steps of a TensorFlow project within the notebook itself, with some steps being run on external SageMaker resources (hosted training, model tuning, hosted endpoints).  Notebooks are great for prototyping, but generally are  not used in production-ready machine learning pipelines.  

A very simple pipeline in SageMaker includes processing the dataset to get it ready for training, performing the actual training, and then using the model to perform some form of inference such as batch predition (scoring). We'll use SageMaker Pipelines to automate these steps, keeping the pipeline simple for now: it easily can be extended into a far more complex pipeline.

### Pipeline parameters <a class="anchor" id="PipelineParameters">

Before we begin to create the pipeline itself, we should think about how to parameterize it.  For example, we may use different instance types for different purposes, such as CPU-based types for data processing and GPU-based or more powerful types for model training.  These are all "knobs" of the pipeline that we can parameterize.  Parameterizing enables custom pipeline executions and schedules without having to modify the pipeline definition.

In [None]:
import os
import sagemaker

sess = sagemaker.session.Session()
bucket = sess.default_bucket() 

raw_s3 = f"s3://{bucket}/tf-2-workflow/data/raw"    # sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)

# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.c5.2xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

# batch inference step parameters
batch_instance_type = ParameterString(name="BatchInstanceType", default_value="ml.c5.xlarge")
batch_instance_count = ParameterInteger(name="BatchInstanceCount", default_value=1)

### Processing Step <a class="anchor" id="ProcessingStep">

The first step in the pipeline will preprocess the data to prepare it for training. We create a `SKLearnProcessor` object similar to the one above, but now parameterized so we can separately track and change the job configuration as needed, for example to increase the instance type size and count to accommodate a growing dataset.

In [None]:
%%writefile preprocessing.py

import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

if __name__=='__main__':
    
    input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))
    print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
    scaler = StandardScaler()
    x_train = np.load(os.path.join('/opt/ml/processing/input', 'x_train.npy'))
    scaler.fit(x_train)
    for file in input_files:
        raw = np.load(file)
        # only transform feature columns
        if 'y_' not in file:
            transformed = scaler.transform(raw)
        if 'train' in file:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/train', 'y_train.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TRAINING DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TRAINING DATA FILE\n')
        else:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/test', 'y_test.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TEST DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TEST DATA FILE\n')

In [None]:
import boto3
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor

sess = sagemaker.session.Session()
bucket = sess.default_bucket() 
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="tf-2-workflow-process",
    sagemaker_session=sess,
    role=role,
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="TF2Process",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input", s3_data_distribution_type='ShardedByS3Key'),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="./preprocessing.py",
)

### Training and Model Creation Steps <a class="anchor" id="TrainingModelCreation">

The following code sets up a pipeline step for a training job. We start by specifying which SageMaker prebuilt TensorFlow 2 training container to use for the job.

In [None]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


tensorflow_version = '2.3.1'
python_version = 'py37'

image_uri_train = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version=tensorflow_version,
    py_version=python_version,
    instance_type=training_instance_type,
    image_scope="training"
)

Next, we specify an `Estimator` object, and define a `TrainingStep` to insert the training job in the pipeline with inputs from the previous SageMaker Processing step. Notice that we have used the hyperparameters from the best estimator in the tuning job we ran before. AutoTuning integration with SageMaker Pipelines is still under development, when it is available, we should use it to find the best model.

In [None]:
import time

model_path = f"s3://{bucket}/TF2WorkflowTrain"
training_parameters = {'epochs': 21, 'batch_size': 247, 'learning_rate': 0.138448, 'for_pipeline': 'true'}

estimator = TensorFlow(
    image_uri=image_uri_train,
    source_dir='code',
    entry_point='train.py',
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    role=role,
    base_job_name="tf-2-workflow-train",
    output_path=model_path,
    hyperparameters=training_parameters
)

In [None]:
step_train = TrainingStep(
    name="TF2WorkflowTrain",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri
        )
    },
)

As another step, we create a SageMaker `Model` object to wrap the model artifact, and associate it with a separate SageMaker prebuilt TensorFlow Serving inference container to potentially use later.

In [None]:
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

image_uri_inference = sagemaker.image_uris.retrieve(
                                        framework="tensorflow",
                                        region=region,
                                        version=tensorflow_version,
                                        py_version=python_version,
                                        instance_type=batch_instance_type,
                                        image_scope="inference"
                                       )
model = Model(
    image_uri=image_uri_inference,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

inputs_model = CreateModelInput(
    instance_type=batch_instance_type
)

step_create_model = CreateModelStep(
    name="TF2WorkflowCreateModel",
    model=model,
    inputs=inputs_model,
)

### Batch Scoring Step <a class="anchor" id="BatchScoringStep">
    
The next step in this pipeline is offline, batch scoring (inference/prediction).  The inputs to this step will be the model we trained earlier, and the test data.  A simple, ordinary Python script is all we need to do the actual batch inference.

In [None]:
%%writefile batch-score.py

import os
import subprocess
import sys
import numpy as np
import pathlib
import tarfile
import json

def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

if __name__ == "__main__":
    
    install('tensorflow==2.3.1')
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path, 'r:gz') as tar:
        tar.extractall('./model')
    import tensorflow as tf
    model = tf.keras.models.load_model('./model/1')
    test_path = "/opt/ml/processing/test/"
    x_test = np.load(os.path.join(test_path, 'x_test.npy'))
    y_test = np.load(os.path.join(test_path, 'y_test.npy'))
    scores = model.evaluate(x_test, y_test, verbose=2)
    print("\nTest MSE :", scores)
    report_dict = {
        "metrics": {
            "mse": {
                "value": scores
            },
        },
    }
    
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, 'w') as writer:
         writer.write(json.dumps(report_dict))

In regard to the SageMaker features we could use to perform batch scoring, we have several choices, including SageMaker Processing and SageMaker Batch Transform.  We'll use SageMaker Processing here. After pipeline execution, you can examine the resulting evaluation.json for analysis.

In [None]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

batch_scorer = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=batch_instance_type,
    instance_count=batch_instance_count,
    base_job_name="tf-2-workflow-batch",
    sagemaker_session=sess,
    role=role
)

step_batch = ProcessingStep(
    name="TF2WorkflowBatchScoring",
    processor=batch_scorer,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="./batch-score.py",
    property_files=[evaluation_report]
)

### Define a Register Model Step to Create a Model Package

Use the estimator instance specified in the training step to construct an instance of RegisterModel. The result of executing RegisterModel in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients required for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A model package group is a collection of model packages. A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.

The construction of RegisterModel is similar to an estimator instance's register method in the Python SDK.

Specifically, pass in the S3ModelArtifacts from the TrainingStep, step_train properties. The TrainingStep properties attribute matches the object model of the DescribeTrainingJob response object.

Note that the specific model package group name provided in this notebook can be used in the model registry and CI/CD work with SageMaker Projects.

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel

model_package_group_name = f"TF2ModelPackageGroup"
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="Approved"
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_batch.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

step_register = RegisterModel(
    name="TF2WorkflowRegisterModel",
    estimator=estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    #model_metrics=model_metrics,
)

### Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry

In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step step_eval, exceeded a specified value. A ConditionStep enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties.

In the following section, you:

- Define a ConditionLessThanOrEqualTo on the accuracy value found in the output of the evaluation step. We're using a large MSE to make sure any model will pass.
- Use the condition in the list of conditions in a ConditionStep.
- Pass the RegisterModel step collection into the if_steps of the ConditionStep, which are only executed, if the condition evaluates to True

In [None]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_batch,
        property_file=evaluation_report,
        json_path="metrics.mse.value",
    ),
    right=100.0
)

step_cond = ConditionStep(
    name="TF2WorkflowMSECond",
    conditions=[cond_lte],
    if_steps=[step_create_model, step_register],
    else_steps=[], 
)

### Creating and executing the pipeline <a class="anchor" id="CreatingExecutingPipeline">

With all of the pipeline steps now defined, we can define the pipeline itself as a `Pipeline` object comprising a series of those steps.  Parallel and conditional steps also are possible.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"TF2Workflow"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_type, 
        processing_instance_count, 
        training_instance_type, 
        training_instance_count,
        model_approval_status,
        batch_instance_type,
        batch_instance_count
    ],
    steps=[step_process, 
           step_train, 
           step_batch,
           step_cond
          ],
    sagemaker_session=sess
)

We can inspect the pipeline definition in JSON format:

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

After upserting its definition, we can start the pipeline with the `Pipeline` object's `start` method:

In [None]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

We can now confirm that the pipeline is executing.  In the log output below, confirm that `PipelineExecutionStatus` is `Executing`.

In [None]:
execution.describe()

#### Review the Pipeline

After the pipeline started executing, you can view the pipeline run. 

To view them, choose the SageMakers Components and registries button.
On the Components and registires drop down, select Pipelines.

![sageMakers_components_and_registries_button](./img/pipelines_execution_1.png)


Click the `TF2Workflow` pipeline, and then double click on the execution.

![click_the_pipeline_execution](./img/pipelines_execution_2.png)


Now you can see the pipeline executing. Click on `TF2Process` step to see additional details.

![view_pipeline_execution](./img/pipelines_execution_3.png)


On this specific step, you'll be able to see the output, logs and infomation.

![view_step_details](./img/pipelines_execution_4.png)

Typically this pipeline should take about 10 minutes to complete.  We can wait for completion by invoking `wait()`. After execution is complete, we can list the status of the pipeline steps.

In [None]:
execution.wait()
execution.list_steps()

### Check the score report

After the batch scoring job in the pipeline is complete, the batch scoring report is uploaded to S3.  For simplicity, this report simply states the test MSE, but in general reports can include as much detail as desired.  Reports such as these also can be formatted for use in conditional approval steps in SageMaker Pipelines.  For example, the pipeline could have a condition step that only allows further steps to proceed only if the MSE is lower than some threshold.  

In [None]:
report_path = f"{step_batch.outputs[0].destination}/evaluation.json"
!aws s3 cp {report_path} ./evaluation.json --quiet && cat evaluation.json

## ML Lineage Tracking <a class="anchor" id="LineageOfPipelineArtifacts">

SageMaker ML Lineage Tracking creates and stores information about the steps of a ML workflow from data preparation to model deployment. With the tracking information you can reproduce the workflow steps, track model and dataset lineage, and establish model governance and audit standards.

Let's now check out the lineage of the model generated by the pipeline above.  The lineage table identifies the resources used in training, including the timestamped train and test data sources, and the specific version of the TensorFlow 2 container in use during the training job.  

In [None]:
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'TF2WorkflowTrain':
        display(viz.show(pipeline_execution_step=execution_step))

## Extensions <a class="anchor" id="Extensions">

We've covered a lot of content in this notebook:  SageMaker Processing for data transformation, Automatic Model Tuning, and SageMaker hosted training and inference.  These are central elements for most deep learning workflows in SageMaker.  Additionally, we examined how SageMaker Pipelines helps automate deep learning workflows after completion of the prototyping phase of a project.

Besides all of the SageMaker features explored above, there are many other features that may be applicable to your project.  For example, to handle common problems during deep learning model training such as vanishing or exploding gradients, **SageMaker Debugger** is useful.  To manage common problems such as data drift after a model is in production, **SageMaker Model Monitor** can be applied.