# Machine Learning In Production Using SageMaker

This notebook creates a [SageMaker Pipeline](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html) to build an end-to-end Machine Learning system to solve the problem of classifying penguin species. With a SageMaker Pipeline, you can create, automate, and manage end-to-end Machine Learning workflows at scale.

You can find more information about Amazon SageMaker in the [Amazon SageMaker Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/whatis.html). The [AWS Machine Learning Blog](https://aws.amazon.com/blogs/machine-learning/) is an excellent source to stay up-to-date with SageMaker.

This example uses the [Penguins dataset](https://www.kaggle.com/parulpandey/palmer-archipelago-antarctica-penguin-data), the [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html) library, and the [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/). 

<img src='https://imgur.com/orZWHly.png' alt='Penguins dataset' width="800">

## Table of Contents

1. [Exploratory Data Analysis](#Exploratory-Data-Analysis) 
2. [Training Pipeline](#Training-Pipeline)
   * [Preprocessing](#Preprocessing)
   * [Training](#Training)   
   * [Tuning](#Tuning)
   * [Evaluation](#Evaluation)
   * [Inference Pipeline](#Inference-Pipeline)
   * [Data Quality Baseline](#Data-Quality-Baseline)
   * [Model Quality Baseline](#Model-Quality-Baseline)
   * [Registration](#Registration)
   * [Condition Step](#Condition-Step)
   * [Pipeline](#Pipeline)
   * [Quality Baselines](#Quality-Baselines)
3. [Model Deployment](#Model-Deployment)
   * [Lambda](#Lambda)
   * [EventBridge](#EventBridge)
   * [Predictions](#Predictions)
   * [Data Capture](#Data-Capture)
   * [Clean up](#Clean-up)
4. [Model Monitoring](#Model-Monitoring)
   * [Fake Traffic](#Fake-Traffic)
   * [Fake Labels](#Fake-Labels)
   * [Monitoring Jobs](#Monitoring-Jobs)

This notebook is part of the [Machine Learning School](https://www.ml.school) program.

In [269]:
%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv

import sys
from pathlib import Path

CODE_FOLDER = Path("code")
CODE_FOLDER.mkdir(parents=True, exist_ok=True)

sys.path.append(f"./{CODE_FOLDER}")

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


To run this notebook in **Local Mode** set the `LOCAL_MODE` constant to `True`. 

To build our system using SageMaker, we need access to `ml.m5.xlarge` instances. By default, the quota on a new AWS account is zero, so you need to request a quota increase. You can do that under Service Quotas > AWS Services > Amazon SageMaker. Find `ml.m5.xlarge` and request a quota increase for processing jobs, training jobs, transform jobs, and endpoint usage. In the meantime, you can use `ml.t3.large` as a substitute.



In [270]:
import os
import logging

# By default, The SageMaker SDK logs events related to the default
# configuration using the INFO level. To prevent these from spoiling
# the output of this notebook cells, we can change the logging
# level to ERROR instead.
logging.getLogger("sagemaker.config").setLevel(logging.ERROR)

import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession

LOCAL_MODE = True
BUCKET = os.environ["BUCKET"]

role = os.environ["ROLE"]

# If you are running this notebook on an ARM64 machine, you will need
# to build a custom Docker image using the setup notebook. This is
# because SageMaker doesn't provide a TensorFlow image that supports 
# The Apple M chips.
architecture = !(uname -m)
IS_APPLE_M_CHIP = architecture[0] == "arm64"

# We'll use these two variables to configure the steps that do not support
# Local Mode.
pipeline_session = PipelineSession(default_bucket=BUCKET) if not LOCAL_MODE else None

if LOCAL_MODE:
    config = {
        "session": LocalPipelineSession(default_bucket=BUCKET),
        "instance_type": "local",

        # We need to use a custom Docker image when we run the pipeline
        # in Local Model on an ARM64 machine.
        "image": "sagemaker-tensorflow-training-toolkit-local" if IS_APPLE_M_CHIP else None,
        "framework_version": None if IS_APPLE_M_CHIP else "2.11",
        "py_version": None if IS_APPLE_M_CHIP else "py39",
    }
else:
    config = {
        "session": pipeline_session,
        "instance_type": "ml.m5.xlarge",
        "image": None,        
        "framework_version": "2.11",
        "py_version": "py39",
    }

# Exploratory Data Analysis

Let's run Exploratory Data Analysis on the dataset. The goal of this section is to understand the data and the problem we are trying to solve.

The first step is to create an S3 bucket where we will store the data and every resource we are going to create.

<div class="alert" style="background-color:#6e420c; color: #fff"><strong>Note:</strong> 
    If you want to create a bucket in a region other than <strong>us-east-1</strong>, you need to use the "--create-bucket-configuration" argument when creating the bucket. You can see an example below.
</div>

Example of how to specify a region different from `us-east-1` when creating a bucket:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint="eu-west-1"
```

The `LocationConstraint` argument should specify the region where you want to create the bucket. The example above creates the bucket in the `eu-west-1` region.

In [271]:
!aws s3api create-bucket --bucket $BUCKET

{
    "Location": "/mlschool"
}


After we have a bucket, we can download the [Penguins dataset](https://www.kaggle.com/parulpandey/palmer-archipelago-antarctica-penguin-data) and store it in the bucket.

In [272]:
import boto3
import pandas as pd
import numpy as np
import urllib.request
from sagemaker.s3 import S3Uploader


S3_LOCATION = f"s3://{BUCKET}/penguins"
DATA_FILEPATH = CODE_FOLDER / "data.csv"

urllib.request.urlretrieve(
    "https://storage.googleapis.com/download.tensorflow.org/data/palmer_penguins/penguins_size.csv", 
    DATA_FILEPATH
)

S3Uploader.upload(local_path=str(DATA_FILEPATH), desired_s3_uri=f"{S3_LOCATION}/data")

's3://mlschool/penguins/data/data.csv'

Let's load the Penguins dataset. The data contains the following columns:

1. `species`: The species of a penguin. This is the column we want to predict.
2. `island`: The island where the penguin was found
3. `culmen_length_mm`: The length of the penguin's culmen (bill) in millimeters
4. `culmen_depth_mm`: The depth of the penguin's culmen in millimeters
5. `flipper_length_mm`: The length of the penguin's flipper in millimeters
6. `body_mass_g`: The body mass of the penguin in grams
7. `sex`: The sex of the penguin

In [273]:
penguins = pd.read_csv(DATA_FILEPATH)
penguins.head()

Unnamed: 0,species,island,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g,sex
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,MALE
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,FEMALE
2,Adelie,Torgersen,40.3,18.0,195.0,3250.0,FEMALE
3,Adelie,Torgersen,,,,,
4,Adelie,Torgersen,36.7,19.3,193.0,3450.0,FEMALE


Now, let's get the summary statistics for the features in our dataset.

In [274]:
penguins.describe(include='all')

Unnamed: 0,species,island,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g,sex
count,344,344,342.0,342.0,342.0,342.0,334
unique,3,3,,,,,3
top,Adelie,Biscoe,,,,,MALE
freq,152,168,,,,,168
mean,,,43.92193,17.15117,200.915205,4201.754386,
std,,,5.459584,1.974793,14.061714,801.954536,
min,,,32.1,13.1,172.0,2700.0,
25%,,,39.225,15.6,190.0,3550.0,
50%,,,44.45,17.3,197.0,4050.0,
75%,,,48.5,18.7,213.0,4750.0,


The distribution of the categories in our dataset are:

- `species`: There are 3 species of penguins in the dataset: Adelie (152), Gentoo (124), and Chinstrap (68).
- `island`: Penguins are from 3 islands: Biscoe (168), Dream (124), and Torgersen (52).
- `sex`: We have 168 male penguins, 165 female penguins, and 1 penguin with an ambiguous gender ('.').

In [275]:
species_distribution = penguins['species'].value_counts()
island_distribution = penguins['island'].value_counts()
sex_distribution = penguins['sex'].value_counts()

print(species_distribution)
print()
print(island_distribution)
print()
print(sex_distribution)

species
Adelie       152
Gentoo       124
Chinstrap     68
Name: count, dtype: int64

island
Biscoe       168
Dream        124
Torgersen     52
Name: count, dtype: int64

sex
MALE      168
FEMALE    165
.           1
Name: count, dtype: int64


Let's replace the ambiguous value in the `sex` column with a null value.

In [276]:
penguins["sex"] = penguins["sex"].replace(".", np.nan)
penguins["sex"].value_counts()

sex
MALE      168
FEMALE    165
Name: count, dtype: int64

Next, let's check for any missing values in the dataset.

In [277]:
penguins.isnull().sum()

species               0
island                0
culmen_length_mm      2
culmen_depth_mm       2
flipper_length_mm     2
body_mass_g           2
sex                  11
dtype: int64

Let's get rid of the missing values. For now, we are going to replace the missing values with the most frequent value in the column. Later, we'll use a different strategy to replace missing numeric values.

In [278]:
from sklearn.impute import SimpleImputer

imputer = SimpleImputer(strategy="most_frequent")
penguins.iloc[:,:] = imputer.fit_transform(penguins)
penguins.isnull().sum()

species              0
island               0
culmen_length_mm     0
culmen_depth_mm      0
flipper_length_mm    0
body_mass_g          0
sex                  0
dtype: int64

Let's visualize the distribution of categorical features.

In [279]:
import plotly.express as px
import plotly.graph_objs as go
from plotly.subplots import make_subplots


fig = make_subplots(rows=3, cols=1)

fig.add_trace(go.Bar(x=species_distribution.index, y=species_distribution.values, name='species'), row=1, col=1)
fig.add_trace(go.Bar(x=island_distribution.index, y=island_distribution.values, name='island'), row=2, col=1)
fig.add_trace(go.Bar(x=sex_distribution.index, y=sex_distribution.values, name='sex'), row=3, col=1)

fig.update_layout(height=700, width=960, title_text="Distribution of Categorical Features")
fig.show()

Let's visualize the distribution of numerical columns.

In [280]:
fig = make_subplots(rows=2, cols=2)

fig.add_trace(go.Histogram(x=penguins['culmen_length_mm'], name='culmen_length_mm', nbinsx=20), row=1, col=1)
fig.add_trace(go.Histogram(x=penguins['culmen_depth_mm'], name='culmen_depth_mm', nbinsx=20), row=1, col=2)
fig.add_trace(go.Histogram(x=penguins['flipper_length_mm'], name='flipper_length_mm', nbinsx=20), row=2, col=1)
fig.add_trace(go.Histogram(x=penguins['body_mass_g'], name='body_mass_g', nbinsx=20), row=2, col=2)
fig.update_layout(height=700, width=960, title_text="Distribution of Numerical Features")

fig.show()

Let's display a scatter matrix with every numeric feature from our dataset.

In [281]:
fig = px.scatter_matrix(
    penguins, 
    dimensions=["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm", "body_mass_g"], 
    color="species")

fig.update_layout(height=800, width=960, title_text="Scatter Matrix of Numeric Features")

fig.show()

Let's display the covariance matrix of the dataset. The "covariance" measures how changes in one variable are associated with changes in a second variable. In other words, the covariance measures the degree to which two variables are linearly associated.

Here are three examples of what we get from interpreting the covariance matrix below:

1. Penguins that weight more tend to have a larger culmen.
2. The more a penguin weights, the shallower its culmen tends to be.
3. There's a small variance between the culmen depth of penguins.

In [282]:
penguins.cov(numeric_only=True)

Unnamed: 0,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
culmen_length_mm,29.679415,-2.516984,50.260588,2596.971151
culmen_depth_mm,-2.516984,3.877201,-16.108849,-742.66018
flipper_length_mm,50.260588,-16.108849,197.269501,9792.552037
body_mass_g,2596.971151,-742.66018,9792.552037,640316.716388


Let's now display the correlation matrix. "Correlation" measures both the strength and direction of the linear relationship between two variables.

Here are three examples of what we get from interpreting the correlation matrix below:

1. Penguins that weight more tend to have larger flippers.
2. Penguins with a shallower culmen tend to have larger flippers.
3. The length and depth of the culmen have a slight negative correlation.

In [283]:
penguins.corr(numeric_only=True)

Unnamed: 0,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
culmen_length_mm,1.0,-0.234635,0.656856,0.59572
culmen_depth_mm,-0.234635,1.0,-0.582472,-0.471339
flipper_length_mm,0.656856,-0.582472,1.0,0.871302
body_mass_g,0.59572,-0.471339,0.871302,1.0


Let's display the distribution of species by island.

In [284]:
fig = px.histogram(penguins, x="island", color="species", nbins=50)

fig.update_layout(
    height=800, 
    width=960, 
    title_text="Distribution of Species by Island",
    xaxis_title_text='island',
    yaxis_title_text='count',
    bargap=0.2,
    bargroupgap=0.1
)

fig.show()

Let's display the distribution of species by sex.

In [285]:
fig = px.histogram(penguins, x="sex", color="species", nbins=50)

fig.update_layout(
    height=800, 
    width=960, 
    title_text="Distribution of Species by Sex",
    xaxis_title_text='sex',
    yaxis_title_text='count',
    bargap=0.2,
    bargroupgap=0.1
)

fig.show()

# Training Pipeline

In this section, we'll create a [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) to automate the process of building, evaluating, and registering a model. We'll use the [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/) to create the Pipeline. Check the [SageMaker Pipelines Overview](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) for an introduction to the fundamental components of a SageMaker Pipeline.

We can define a caching policy to reuse the result of a previous successful run of a pipeline step. You can find more information about this topic in [Caching Pipeline Steps](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html).

In [286]:
import ipytest
from sagemaker.workflow.steps import CacheConfig


ipytest.autoconfig(raise_on_error=True)
sagemaker_session = sagemaker.session.Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
region = boto3.Session().region_name

cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

## Preprocessing

We'll use a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) to split and transform the data.

Let's create the preprocessing script. The Processing Step will spin up a Processing Job and run this script inside a container.

In [287]:
%%writefile {CODE_FOLDER}/preprocessor.py

import os
import sys
import argparse
import json
import tarfile
import tempfile
import time
import joblib
import numpy as np
import pandas as pd

from io import StringIO
from pathlib import Path
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler, OrdinalEncoder


def preprocess(base_directory):
    """
    This function loads the supplied data, splits it and transforms it.
    """

    df = _read_data_from_input_csv_files(base_directory)
    
    target_transformer = ColumnTransformer(
        transformers=[("species", OrdinalEncoder(), [0])]
    )
    
    numeric_transformer = make_pipeline(
        SimpleImputer(strategy="mean"),
        StandardScaler()
    )

    categorical_transformer = make_pipeline(
        SimpleImputer(strategy="most_frequent"),
        OneHotEncoder()
    )
    
    features_transformer = ColumnTransformer(
        transformers=[
            ("numeric", numeric_transformer, make_column_selector(dtype_exclude="object")),
            ("categorical", categorical_transformer, ["island"]),
        ]
    )

    df_train, df_validation, df_test = _split_data(df)

    _save_baseline(base_directory, df_test)

    y_train = target_transformer.fit_transform(np.array(df_train.species.values).reshape(-1, 1))
    y_validation = target_transformer.transform(np.array(df_validation.species.values).reshape(-1, 1))
    y_test = target_transformer.transform(np.array(df_test.species.values).reshape(-1, 1))
    
    df_train = df_train.drop("species", axis=1)
    df_validation = df_validation.drop("species", axis=1)
    df_test = df_test.drop("species", axis=1)

    X_train = features_transformer.fit_transform(df_train)
    X_validation = features_transformer.transform(df_validation)
    X_test = features_transformer.transform(df_test)

    _save_splits(base_directory, X_train, y_train, X_validation, y_validation, X_test, y_test)
    _save_model(base_directory, target_transformer, features_transformer)
    

def _read_data_from_input_csv_files(base_directory):
    """
    This function reads every CSV file available and concatenates
    them into a single dataframe.
    """

    input_directory = Path(base_directory) / "input"
    files = [file for file in input_directory.glob("*.csv")]
    
    if len(files) == 0:
        raise ValueError(f"The are no CSV files in {str(input_directory)}/")
        
    raw_data = [pd.read_csv(file) for file in files]
    df = pd.concat(raw_data)
    
    # Shuffle the data
    return df.sample(frac=1, random_state=42)


def _split_data(df):
    """
    Splits the data into three sets: train, validation and test.
    """

    df_train, temp = train_test_split(df, test_size=0.3)
    df_validation, df_test = train_test_split(temp, test_size=0.5)

    return df_train, df_validation, df_test


def _save_baseline(base_directory, df_test):
    """
    This function saves the untransformed test split to disk. This file will
    be used later as a baseline to monitor the performance of the model.
    """

    baseline_path = Path(base_directory) / f"baseline"
    baseline_path.mkdir(parents=True, exist_ok=True)
    df_test.to_csv(baseline_path / "baseline.csv", header=False, index=False)


def _save_splits(base_directory, X_train, y_train, X_validation, y_validation, X_test, y_test):
    """
    This function concatenates the transformed features and the target variable, and
    saves each one of the split sets to disk.
    """

    train = np.concatenate((X_train, y_train), axis=1)
    validation = np.concatenate((X_validation, y_validation), axis=1)
    test = np.concatenate((X_test, y_test), axis=1)

    train_path = Path(base_directory) / "train"
    validation_path = Path(base_directory) / "validation"
    test_path = Path(base_directory) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)


def _save_model(base_directory, target_transformer, features_transformer):
    """
    This function creates a model.tar.gz file that contains the two transformation
    pipelines we built to transform the data.
    """

    with tempfile.TemporaryDirectory() as directory:
        joblib.dump(target_transformer, os.path.join(directory, "target.joblib"))
        joblib.dump(features_transformer, os.path.join(directory, "features.joblib"))
    
        model_path = Path(base_directory) / "model"
        model_path.mkdir(parents=True, exist_ok=True)
    
        with tarfile.open(f"{str(model_path / 'model.tar.gz')}", "w:gz") as tar:
            tar.add(os.path.join(directory, "target.joblib"), arcname="target.joblib")
            tar.add(os.path.join(directory, "features.joblib"), arcname="features.joblib")

    
if __name__ == "__main__":
    preprocess(base_directory="/opt/ml/processing")

Overwriting code/preprocessor.py


Let's test the script to ensure everything is working as expected.

In [288]:
%%ipytest -s

import os
import shutil
import tarfile
import pytest
import tempfile
import joblib
from preprocessor import preprocess


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")
    
    directory = Path(directory)
    preprocess(base_directory=directory)
    
    yield directory
    
    shutil.rmtree(directory)


def test_preprocess_generates_data_splits(directory):
    output_directories = os.listdir(directory)
    
    assert "train" in output_directories
    assert "validation" in output_directories
    assert "test" in output_directories


def test_preprocess_generates_baseline(directory):
    output_directories = os.listdir(directory)

    assert "baseline" in output_directories


def test_preprocess_creates_two_models(directory):
    model_path = directory / "model"
    tar = tarfile.open(model_path / "model.tar.gz", "r:gz")

    assert "features.joblib" in tar.getnames()
    assert "target.joblib" in tar.getnames()


def test_splits_are_transformed(directory):
    train = pd.read_csv(directory / "train" / "train.csv", header=None)
    validation = pd.read_csv(directory / "validation" / "validation.csv", header=None)
    test = pd.read_csv(directory / "test" / "test.csv", header=None)

    # After transforming the data, the number of features should be 7:
    # * 3 - island (one-hot encoded)
    # * 1 - culmen_length_mm = 1
    # * 1 - culmen_depth_mm
    # * 1 - flipper_length_mm
    # * 1 - body_mass_g
    number_of_features = 7

    # The transformed splits should have an additional column for the target
    # variable.
    assert train.shape[1] == number_of_features + 1
    assert validation.shape[1] == number_of_features + 1
    assert test.shape[1] == number_of_features + 1


def test_baseline_is_not_transformed(directory):
    baseline = pd.read_csv(directory / "baseline" / "baseline.csv", header=None)

    island = baseline.iloc[:, 1].unique()

    assert "Biscoe" in island
    assert "Torgersen" in island
    assert "Dream" in island

[32m.[0m[32m.[0m

[32m.[0m[32m.[0m[32m.[0m
[32m[32m[1m5 passed[0m[32m in 0.30s[0m[0m


The first step we need in the pipeline is a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) to run the preprocessing script. This Processing Step will create a SageMaker Processing Job in the background, run the script, and upload the output to S3. You can use Processing Jobs to perform data preprocessing, post-processing, feature engineering, data validation, and model evaluation. Check the [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) SageMaker's SDK documentation for more information.

We can parameterize a SageMaker Pipeline to make it more flexible. To read more about these parameters, check [Pipeline Parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html). The `dataset_location` represents the location of the data we can to process in our pipeline. We can execute the pipeline with different datasets by changing the value of this parameter.

A processor gives the Processing Step information about the hardware and software that SageMaker should use to launch the Processing Job. To run the script, we need access to Scikit-Learn, so we can use the [SKLearnProcessor](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor) processor that comes out-of-the-box with the SageMaker's Python SDK. The [Data Processing with Framework Processors](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks.html) page discusses other built-in processors you can use. The [Docker Registry Paths and Example Code](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html) page contains information about the available framework versions for each region.

The [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) requires a list of inputs that we need on the preprocessing script. In this case, the input is the dataset we stored in S3. We also have a few outputs that we want SageMaker to capture when the Processing Job finishes.

In [289]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.parameters import ParameterString


dataset_location = ParameterString(
    name="dataset_location",
    default_value=f"{S3_LOCATION}/data",
)

processor = SKLearnProcessor(
    base_job_name="split-and-transform-data",
    framework_version="1.2-1",

    # By default, a new account doesn't have access to `ml.m5.xlarge` instances.
    # If you haven't requested a quota increase yet, you can use an
    # `ml.t3.medium` instance type instead. This will work out of the box, but
    # the Processing Job will take significantly longer than it should have.
    # To get access to `ml.m5.xlarge` instances, you can request a quota 
    # increase under the Service Quotas section in your AWS account.
    instance_type=config["instance_type"],
    
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)

split_and_transform_data_step = ProcessingStep(
    name="split-and-transform-data",
    step_args=processor.run(
        code=f"{CODE_FOLDER}/preprocessor.py",
        inputs=[
            ProcessingInput(source=dataset_location, destination="/opt/ml/processing/input"),  
        ],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
            ProcessingOutput(output_name="model", source="/opt/ml/processing/model"),
            
            # The baseline output points to the test set before transforming the data. This set
            # will be helpful to generate a quality baseline for the model performance.
            ProcessingOutput(output_name="baseline", source="/opt/ml/processing/baseline"),
        ]
    ),
    cache_config=cache_config
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3

Running within a PipelineSession, there will be No Wait, No Logs, and No Job being started.



## Training

We'll use a [Training Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) to build a model.

This following script is responsible for training a neural network using the train data, validating the model, and saving it so we can later use it.

In [290]:
%%writefile {CODE_FOLDER}/train.py

import os
import argparse

import numpy as np
import pandas as pd
import tensorflow as tf

from pathlib import Path
from sklearn.metrics import accuracy_score

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD


def train(model_directory, train_path, validation_path, epochs=50, batch_size=32):
    train_files = [file for file in Path(train_path).glob("*.csv")]
    validation_files = [file for file in Path(validation_path).glob("*.csv")]
    
    if len(train_files) == 0 or len(validation_files) == 0:
        raise ValueError("The are no train or validation files")
        
    train_data = [pd.read_csv(file, header=None) for file in train_files]
    X_train = pd.concat(train_data)
    y_train = X_train[X_train.columns[-1]]
    X_train.drop(X_train.columns[-1], axis=1, inplace=True)
    
    
    validation_data = [pd.read_csv(file, header=None) for file in validation_files]
    X_validation = pd.concat(validation_data)
    y_validation = X_validation[X_validation.columns[-1]]
    X_validation.drop(X_validation.columns[-1], axis=1, inplace=True)
    
    model = Sequential([
        Dense(10, input_shape=(X_train.shape[1],), activation="relu"),
        Dense(8, activation="relu"),
        Dense(3, activation="softmax"),
    ])
    
    model.compile(
        optimizer=SGD(learning_rate=0.01),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    model.fit(
        X_train, 
        y_train, 
        validation_data=(X_validation, y_validation),
        epochs=epochs, 
        batch_size=batch_size,
        verbose=2,
    )

    predictions = np.argmax(model.predict(X_validation), axis=-1)
    print(f"Validation accuracy: {accuracy_score(y_validation, predictions)}")
    
    model_filepath = Path(model_directory) / "001"
    model.save(model_filepath)    
    

if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to 
    # the entry point as script arguments. 
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=50)
    parser.add_argument("--batch_size", type=int, default=32)
    args, _ = parser.parse_known_args()
    

    train(
        # This is the location where we need to save our model. SageMaker will
        # create a model.tar.gz file with anything inside this directory when
        # the training script finishes.
        model_directory=os.environ["SM_MODEL_DIR"],

        # SageMaker creates one channel for each one of the inputs to the
        # Training Step.
        train_path=os.environ["SM_CHANNEL_TRAIN"],
        validation_path=os.environ["SM_CHANNEL_VALIDATION"],

        epochs=args.epochs,
        batch_size=args.batch_size,
    )

Overwriting code/train.py


Let's test the script to ensure everything is working as expected.

In [291]:
%%ipytest -s

import os
import shutil
import tarfile
import pytest
import tempfile
import joblib

from preprocessor import preprocess
from train import train


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")
    
    directory = Path(directory)
    
    preprocess(base_directory=directory)
    train(
        model_directory=directory / "model",
        train_path=directory / "train", 
        validation_path=directory / "validation",
        epochs=1
    )
    
    yield directory
    
    shutil.rmtree(directory)


def test_train_saves_a_folder_with_model_assets(directory):
    output = os.listdir(directory / "model")
    assert "001" in output
    
    assets = os.listdir(directory / "model" / "001")
    assert "saved_model.pb" in assets




8/8 - 0s - loss: 1.0332 - accuracy: 0.4417 - val_loss: 1.0067 - val_accuracy: 0.4808 - 202ms/epoch - 25ms/step
Validation accuracy: 0.4807692307692308
INFO:tensorflow:Assets written to: /var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmp04ai5la6/model/001/assets


INFO:tensorflow:Assets written to: /var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmp04ai5la6/model/001/assets


[32m.[0m
[32m[32m[1m1 passed[0m[32m in 0.45s[0m[0m


SageMaker uses the concept of an [Estimator](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html) to handle end-to-end training tasks. For this example, we will use the built-in [TensorFlow Estimator](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.tensorflow.html#tensorflow-estimator) to run the training script we wrote before. The [Docker Registry Paths and Example Code](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html) page contains information about the available framework versions for each region. Here, you can also check the available SageMaker [Deep Learning Container images](https://github.com/aws/deep-learning-containers/blob/master/available_images.md).

In [292]:
from sagemaker.tensorflow import TensorFlow


estimator = TensorFlow(
    base_job_name="training",
    entry_point=f"{CODE_FOLDER}/train.py",
    
    # SageMaker will pass these hyperparameters as arguments
    # to the entry point of the training script.
    hyperparameters={
        "epochs": 50,
        "batch_size": 32,
    },

    # SageMaker will track these metrics as part of the experiment
    # associated to this pipeline. The metric definitions tells 
    # SageMaker how to parse the values from the Training Job logs.
    metric_definitions=[
        {"Name": "loss", "Regex": "loss: ([0-9\\.]+)"},
        {"Name": "accuracy", "Regex": "accuracy: ([0-9\\.]+)"},
        {"Name": "val_loss", "Regex": "val_loss: ([0-9\\.]+)"},
        {"Name": "val_accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"},
    ],

    image_uri=config["image"],
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    disable_profiler=True,
    sagemaker_session=config["session"],
    role=role,
)

We can now create a [Training Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training). This Training Step will create a SageMaker Training Job in the background, run the training script, and upload the output to S3. Check the [TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep) SageMaker's SDK documentation for more information. 

This step will receive the train and validation split from the previous step as inputs.

Here, we are using two input channels, `train` and `validation`. SageMaker will automatically create an environment variable corresponding to each of these channels following the format `SM_CHANNEL_[channel_name]`:

* `SM_CHANNEL_TRAIN`: This environment variable will contain the path to the data in the `train` channel
* `SM_CHANNEL_VALIDATION`: This environment variable will contain the path to the data in the `validation` channel

In [293]:
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput


train_model_step = TrainingStep(
    name="train-model",
    step_args=estimator.fit(
        inputs={
            "train": TrainingInput(
                s3_data=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv"
            ),
            "validation": TrainingInput(
                s3_data=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv"
            )
        }
    ),
    cache_config=cache_config
)


Running within a PipelineSession, there will be No Wait, No Logs, and No Job being started.



## Tuning

We can also use a [Tuning Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-tuning) to build a model. This Tuning Step will create a SageMaker Hyperparameter Tuning Job in the background and use the training script to train different versions of the model to choose the best one.

Since we could use the Training of the Tuning Step to create the model, we'll define a constant `USE_TUNING_STEP` to indicate which approach we want to run.

In [294]:
USE_TUNING_STEP = False

The [TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep) requires a [HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html) reference to configure the Hyperparameter Tuning Job.

Here is the configuration that we'll use to find the best model:

1. `objective_metric_name`: This is the name of the metric the tuner will use to determine the best model. We'll use `val_accuracy` to select the model with the highest validation accuracy.
2. `objective_type`: This is the objective of the tuner. It can be `Minimize` or `Maximize`. Since we are using the validation accuracy of the model, we want the objective to be `Maximize.` If we were using the loss of the model, we would set the objective to `Minimize.`
3. `metric_definitions`: This defines how SageMaker will determine the metric's value by looking at the output logs of the training process.

The tuner expects the list of the hyperparameters you want to explore. You can use subclasses of the [Parameter](https://sagemaker.readthedocs.io/en/stable/api/training/parameter.html#sagemaker.parameter.ParameterRange) class to specify different types of hyperparameters. This example explores different values for the `epochs` hyperparameter.

You can control the number of jobs and how many of them will run in parallel using the following two arguments:

* `max_jobs`: Defines the maximum total number of training jobs to start for the hyperparameter tuning job.
* `max_parallel_jobs`: Defines the maximum number of parallel training jobs to start.

Finally, we'll create the Tuning Step using the tuner. Notice how the Tuning Step looks very similar to the Training Step.

In [295]:
from sagemaker.tuner import HyperparameterTuner
from sagemaker.parameter import IntegerParameter
from sagemaker.workflow.steps import TuningStep


tuner = HyperparameterTuner(
    estimator,
    objective_metric_name = "val_accuracy",
    objective_type="Maximize",
    hyperparameter_ranges = {
        "epochs": IntegerParameter(10, 50),
    },
    metric_definitions = [
        {"Name": "val_accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"}
    ],
    max_jobs=3,
    max_parallel_jobs=3,
)

tune_model_step = TuningStep(
    name = "tune-model",
    step_args=tuner.fit(
        inputs={
            "train": TrainingInput(
                s3_data=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv"
            ),
            "validation": TrainingInput(
                s3_data=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv"
            )
        },
    ),
    cache_config=cache_config
)

## Evaluation

We'll use a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) to evaluate the model.

Let's create the evaluation script. The Processing Step will spin up a Processing Job and run this script inside a container. This script is responsible for loading the model we created and evaluating it on the test set. Before finishing, this script will generate an evaluation report of the model.

In [296]:
%%writefile {CODE_FOLDER}/evaluation.py

import os
import json
import tarfile
import numpy as np
import pandas as pd

from pathlib import Path
from tensorflow import keras
from sklearn.metrics import accuracy_score


MODEL_PATH = "/opt/ml/processing/model/"
TEST_PATH = "/opt/ml/processing/test/"
OUTPUT_PATH = "/opt/ml/processing/evaluation/"


def evaluate(model_path, test_path, output_path):
    # The first step is to extract the model package so we can load 
    # it in memory.
    with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
        tar.extractall(path=Path(model_path))
        
    model = keras.models.load_model(Path(model_path) / "001")
    
    X_test = pd.read_csv(Path(test_path) / "test.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test.drop(X_test.columns[-1], axis=1, inplace=True)
    
    predictions = np.argmax(model.predict(X_test), axis=-1)
    accuracy = accuracy_score(y_test, predictions)
    print(f"Test accuracy: {accuracy}")

    # Let's create an evaluation report using the model accuracy.
    evaluation_report = {
        "metrics": {
            "accuracy": {
                "value": accuracy
            },
        },
    }
    
    Path(output_path).mkdir(parents=True, exist_ok=True)
    with open(Path(output_path) / "evaluation.json", "w") as f:
        f.write(json.dumps(evaluation_report))
        
        
if __name__ == "__main__":
    evaluate(
        model_path=MODEL_PATH, 
        test_path=TEST_PATH,
        output_path=OUTPUT_PATH
    )

Overwriting code/evaluation.py


Let's test the script to ensure everything is working as expected.

In [297]:
%%ipytest -s

import os
import shutil
import tarfile
import pytest
import tempfile
import joblib
import json

from preprocessor import preprocess
from train import train
from evaluation import evaluate


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")
    
    directory = Path(directory)
    
    preprocess(base_directory=directory)
    
    train(
        model_directory=directory / "model",
        train_path=directory / "train", 
        validation_path=directory / "validation",
        epochs=1
    )
    
    # After training a model, we need to prepare a package just like
    # SageMaker would. This package is what the evaluation script is
    # expecting as an input.
    with tarfile.open(directory / "model.tar.gz", "w:gz") as tar:
        tar.add(directory / "model" / "001", arcname="001")
        
    evaluate(
        model_path=directory, 
        test_path=directory / "test",
        output_path=directory / "evaluation",
    )

    yield directory / "evaluation"
    
    shutil.rmtree(directory)


def test_evaluate_generates_evaluation_report(directory):
    output = os.listdir(directory)
    assert "evaluation.json" in output


def test_evaluation_report_contains_accuracy(directory):
    with open(directory / "evaluation.json", 'r') as file:
        report = json.load(file)
        
    assert "metrics" in report
    assert "accuracy" in report["metrics"]
    



8/8 - 0s - loss: 1.7050 - accuracy: 0.2083 - val_loss: 1.5639 - val_accuracy: 0.1538 - 222ms/epoch - 28ms/step
Validation accuracy: 0.15384615384615385
INFO:tensorflow:Assets written to: /var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmpf79nisc5/model/001/assets


INFO:tensorflow:Assets written to: /var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmpf79nisc5/model/001/assets


Test accuracy: 0.19607843137254902
[32m.[0m



8/8 - 0s - loss: 1.2137 - accuracy: 0.0458 - val_loss: 1.1446 - val_accuracy: 0.0769 - 220ms/epoch - 27ms/step
Validation accuracy: 0.07692307692307693
INFO:tensorflow:Assets written to: /var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmpd_5_ehu4/model/001/assets


INFO:tensorflow:Assets written to: /var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmpd_5_ehu4/model/001/assets


Test accuracy: 0.0392156862745098
[32m.[0m
[32m[32m[1m2 passed[0m[32m in 1.26s[0m[0m


To run the evaluation script, we'll use a Processing Step with a [TensorFlowProcessor](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks-tensorflow.html). 

We can use the `USE_TUNING_STEP` flag to determine whether we created the model using a Training Step or a Tuning Step. In case we are using the Tuning Step, we can use the [TuningStep.get_top_model_s3_uri()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri) function to get the model artifacts from the top performing training job of the Hyperparameter Tuning Job.

The [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) lets us specify a list of [PropertyFile](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.properties.PropertyFile) instances from the output of the job. We can use this to map the evaluation report generated in the evaluation script. Check [How to Build and Manage Property Files](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html) for more information.

In [298]:
from sagemaker.tensorflow import TensorFlowProcessor
from sagemaker.workflow.properties import PropertyFile


tensorflow_processor = TensorFlowProcessor(
    base_job_name="evaluation-processor",
    image_uri=config["image"],
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)

# We want to map the evaluation report that we generate inside
# the evaluation script so we can later reference it.
evaluation_report = PropertyFile(
    name="evaluation-report",
    output_name="evaluation",
    path="evaluation.json"
)

# Notice how this step uses the model generated by the tuning or training
# step, and the test set generated by the preprocessing step.
evaluate_model_step = ProcessingStep(
    name="evaluate-model",
    step_args=tensorflow_processor.run(
        inputs=[
            ProcessingInput(
                source=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test"
            ),
            ProcessingInput(
                source=(
                    tune_model_step.get_top_model_s3_uri(top_k=0, s3_bucket=config["session"].default_bucket()) 
                    if USE_TUNING_STEP 
                    else train_model_step.properties.ModelArtifacts.S3ModelArtifacts
                ),
                destination="/opt/ml/processing/model",
            )
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ],
        code=f"{CODE_FOLDER}/evaluation.py"
    ),
    property_files=[evaluation_report],
    cache_config=cache_config
)


Running within a PipelineSession, there will be No Wait, No Logs, and No Job being started.



## Inference Pipeline

Deploying the model we trained directly to an endpoint doesn't lets us control the data that goes in and comes out of the endpoint. The TensorFlow model we trained requires the data to come in a specific format, which makes it useless to other applications. Fortunately, we can create an Inference Pipeline using SageMaker to control the data that goes in and comes out of the endpoint.

Our inference pipeline will have three components:

1. A preprocessing transformer that will transform the input data into the format the model expects. 
2. The TensorFlow model we trained.
3. A postprocessing transformer that will transform the output of the model into a human-readable format.

We want our endpoint to handle unprocessed data in CSV and JSON format and return the penguin's species. Here is an example of the payload input we want the endpoint to support:

```
{
    "island": "Biscoe",
    "culmen_length_mm": 48.6,
    "culmen_depth_mm": 16.0,
    "flipper_length_mm": 230.0,
    "body_mass_g": 5800.0,
}
```

And here is an example of the output we'd like to get from the endpoint:

```
{
    "prediction": "Adelie", 
    "confidence": 0.802672
}
```

Let's start by setting up a local folder where we will create the `inference.py` script.

In [299]:
INFERENCE_CODE_FOLDER = CODE_FOLDER / "inference"
Path(INFERENCE_CODE_FOLDER).mkdir(parents=True, exist_ok=True)
sys.path.append(f"./{INFERENCE_CODE_FOLDER}")

### Preprocessing Script

The first component of our inference pipeline is a transformer that will transform the input data into the format the model expects. We'll use the Scikit-Learn transformer we saved when we split and transformed the data. To deploy this transformer as part of an inference pipeline, we need to write a script that loads the transformer, uses it to modify the input data, and returns the output in the format the TensorFlow model expects.

In [300]:
%%writefile {INFERENCE_CODE_FOLDER}/preprocessing_component.py

import os
import numpy as np
import pandas as pd
import json
import joblib

from io import StringIO

try:
    from sagemaker_containers.beta.framework import encoders, worker
except ImportError:
    # We don't have access to the `worker` instance when testing locally. 
    # We'll set it to None so we can change the way functions create a response.
    worker = None


TARGET_COLUMN = "species"
FEATURE_COLUMNS = [
    "island",
    "culmen_length_mm",
    "culmen_depth_mm", 
    "flipper_length_mm",
    "body_mass_g",
    "sex"
]


def input_fn(input_data, content_type):
    """
    Parses the input payload and creates a Pandas DataFrame.
    
    This function will check whether the target column is present in the
    input data, and will remove it.
    """
    
    if content_type == "text/csv":
        df = pd.read_csv(StringIO(input_data), header=None, skipinitialspace=True)

        if len(df.columns) == len(FEATURE_COLUMNS) + 1:
            df = df.drop(df.columns[0], axis=1)
        
        df.columns = FEATURE_COLUMNS
        return df
    
    if content_type == "application/json":
        df = pd.DataFrame([json.loads(input_data)])
        
        if "species" in df.columns:
            df = df.drop("species", axis=1)
        
        return df
    
    else:
        raise ValueError(f"{content_type} is not supported.!")


def output_fn(prediction, accept):
    """
    Formats the prediction output to generate a response.
    
    The default accept/content-type between containers for serial inference is JSON. 
    Since this model will preceed a TensorFlow model, we want to return a JSON object
    following TensorFlow's input requirements.
    """
    
    if prediction is None:
        raise Exception(f"There was an error transforming the input data")

    if accept == "text/csv":
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept) if worker else prediction, accept 
    
    if accept == "application/json":
        instances = [p for p in prediction.tolist()]
        response = {"instances": instances}
        return worker.Response(json.dumps(response), mimetype=accept) if worker else (response, accept)

    raise Exception(f"{accept} accept type is not supported.")


def predict_fn(input_data, model):
    """
    Preprocess the input using the transformer.
    """
    
    try:
        response = model.transform(input_data)
        return response
    except ValueError as e:
        print("Error transforming the input data", e)
        return None


def model_fn(model_dir):
    """
    Deserializes the model that will be used in this container.
    """
    
    return joblib.load(os.path.join(model_dir, "features.joblib"))

Overwriting code/inference/preprocessing_component.py


Let's test the script to ensure everything is working as expected.

In [301]:
%%ipytest

import json

from preprocessing_component import input_fn, predict_fn, output_fn, model_fn


@pytest.fixture(scope="function", autouse=False)
def directory():
    directory = tempfile.mkdtemp()
    input_directory = Path(directory) / "input"
    input_directory.mkdir(parents=True, exist_ok=True)
    shutil.copy2(DATA_FILEPATH, input_directory / "data.csv")
    
    directory = Path(directory)
    
    preprocess(base_directory=directory)
    
    with tarfile.open(directory / "model" / "model.tar.gz") as tar:
        tar.extractall(path=directory / "model")
    
    yield directory / "model"
    
    shutil.rmtree(directory)



def test_input_csv_drops_target_column_if_present():
    input_data = """
    Adelie, Torgersen, 39.1, 18.7, 181, 3750, MALE
    """
    
    df = input_fn(input_data, "text/csv")
    assert len(df.columns) == 6 and "species" not in df.columns


def test_input_json_drops_target_column_if_present():
    input_data = json.dumps({
        "species": "Adelie", 
        "island": "Torgersen",
        "culmen_length_mm": 44.1,
        "culmen_depth_mm": 18.0,
        "flipper_length_mm": 210.0,
        "body_mass_g": 4000.0,
        "sex": "MALE"
    })
    
    df = input_fn(input_data, "application/json")
    assert len(df.columns) == 6 and "species" not in df.columns


def test_input_csv_works_without_target_column():
    input_data = """
    Torgersen, 39.1, 18.7, 181, 3750, MALE
    """
    
    df = input_fn(input_data, "text/csv")
    assert len(df.columns) == 6


def test_input_json_works_without_target_column():
    input_data = json.dumps({
        "island": "Torgersen",
        "culmen_length_mm": 44.1,
        "culmen_depth_mm": 18.0,
        "flipper_length_mm": 210.0,
        "body_mass_g": 4000.0,
        "sex": "MALE"
    })
    
    df = input_fn(input_data, "application/json")
    assert len(df.columns) == 6


def test_output_csv_raises_exception_if_prediction_is_none():
    with pytest.raises(Exception):
        output_fn(None, "text/csv")
    
    
def test_output_json_raises_exception_if_prediction_is_none():
    with pytest.raises(Exception):
        output_fn(None, "application/json")
    
    
def test_output_csv_returns_prediction():
    prediction = np.array([
        [-1.3944109908736013,1.15488062669371,-0.7954340636549508,-0.5536447804097907,0.0,1.0,0.0],
        [1.0557485835338234,0.5040085971987002,-0.5824506029515057,-0.5851840035995248,0.0,1.0,0.0]
    ])
    
    response = output_fn(prediction, "text/csv")
    
    assert response == (prediction, "text/csv")
    
    
def test_output_json_returns_tensorflow_ready_input():
    prediction = np.array([
        [-1.3944109908736013,1.15488062669371,-0.7954340636549508,-0.5536447804097907,0.0,1.0,0.0],
        [1.0557485835338234,0.5040085971987002,-0.5824506029515057,-0.5851840035995248,0.0,1.0,0.0]
    ])
    
    response = output_fn(prediction, "application/json")
    
    assert response[0] == {
        "instances": [
            [-1.3944109908736013,1.15488062669371,-0.7954340636549508,-0.5536447804097907,0.0,1.0,0.0],
            [1.0557485835338234,0.5040085971987002,-0.5824506029515057,-0.5851840035995248,0.0,1.0,0.0]
        ]
    }
    
    assert response[1] == "application/json"

    
def test_predict_transforms_data(directory):
    input_data = """
    Torgersen, 39.1, 18.7, 181, 3750, MALE
    """
    
    model = model_fn(str(directory))
    df = input_fn(input_data, "text/csv")
    response = predict_fn(df, model)
    assert type(response) is np.ndarray
    

def test_predict_returns_none_if_invalid_input(directory):
    input_data = """
    Invalid, 39.1, 18.7, 181, 3750, MALE
    """
    
    model = model_fn(str(directory))
    df = input_fn(input_data, "text/csv")
    assert predict_fn(df, model) is None

[32m.[0m

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                                   [100%][0m
[32m[32m[1m10 passed[0m[32m in 0.06s[0m[0m


### Postprocessing Script

The final component of our inference pipeline is a transformer that will transform the output from the model into a human-readable format. We'll use the Scikit-Learn target transformer we saved when we split and transformed the data. To deploy this transformer as part of an inference pipeline, we need to write a script that loads the transformer, uses it to modify the output from the model, and returns a human-readable format.

In [302]:
%%writefile {INFERENCE_CODE_FOLDER}/postprocessing_component.py

import os
import numpy as np
import pandas as pd
import argparse
import json
import tarfile
import joblib

from pathlib import Path
from io import StringIO

from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler, OrdinalEncoder
from pickle import dump, load


try:
    from sagemaker_containers.beta.framework import encoders, worker
except ImportError:
    # We don't have access to the `worker` instance when testing locally. 
    # We'll set it to None so we can change the way functions create a response.
    worker = None


def input_fn(input_data, content_type):
    if content_type == "application/json":
        predictions = json.loads(input_data)["predictions"]
        return predictions
    
    else:
        raise ValueError(f"{content_type} is not supported.!")


def output_fn(prediction, accept):
    if accept == "text/csv":
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept) if worker else (prediction, accept)
    
    if accept == "application/json":
        response = []
        for p, c in prediction:
            response.append({
                "prediction": p,
                "confidence": c
            })
            
        return worker.Response(json.dumps(response), mimetype=accept) if worker else (response, accept)
    
    raise RuntimeException(f"{accept} accept type is not supported.")


def predict_fn(input_data, model):
    """
    Transforms the prediction into its corresponding category.
    """
    
    predictions = np.argmax(input_data, axis=-1)
    confidence = np.max(input_data, axis=-1)
    return [(confidence, model[prediction]) for confidence, prediction in zip(confidence, predictions)]


def model_fn(model_dir):
    """
    Deserializes the target model and returns the list of fitted categories.
    """
    
    model = joblib.load(os.path.join(model_dir, "target.joblib"))
    return model.named_transformers_["species"].categories_[0]

Overwriting code/inference/postprocessing_component.py


Let's test the script to ensure everything is working as expected.

In [303]:
%%ipytest

import json
import numpy as np

from postprocessing_component import predict_fn


def test_predict_returns_prediction_as_last_column():
    input_data = [
        [0.6, 0.2, 0.2], 
        [0.1, 0.8, 0.1],
        [0.2, 0.1, 0.7]
    ]
    
    categories = ["Adelie", "Gentoo", "Chinstrap"]
    
    response = predict_fn(input_data, categories)
    
    assert response == [
        (0.6, "Adelie"),
        (0.8, "Gentoo"),
        (0.7, "Chinstrap")
    ]

[32m.[0m[32m                                                                                            [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.01s[0m[0m


### Pipeline Model

We can now create a [PipelineModel](https://sagemaker.readthedocs.io/en/stable/api/inference/pipeline.html#sagemaker.pipeline.PipelineModel) to define our inference pipeline. This Pipeline Model will create a SageMaker Model in the background and use the preprocessing and postprocessing scripts to transform the input and output of the model.

In [304]:
from sagemaker.workflow.functions import Join
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.pipeline import PipelineModel


# We'll use the model we generated from the first step of the
# pipeline as the input to the first and last components of the
# inference pipeline. This model.tar.gz file contains the two
# transformers we need to preprocess and postprocess the data.
transformation_pipeline_model = Join(
    on="/",
    values=[
        split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs[
            "model"
        ].S3Output.S3Uri,
        "model.tar.gz",
    ],
)

# This is the first component of the inference pipeline. It will
# preprocess the data before sending it to the TensorFlow model.
preprocessing_model = SKLearnModel(
    model_data=transformation_pipeline_model,
    entry_point="preprocessing_component.py",
    source_dir=str(INFERENCE_CODE_FOLDER),
    framework_version="1.2-1",
    sagemaker_session=config["session"],
    role=role,
)

tensorflow_model = TensorFlowModel(
    model_data=(
        tune_model_step.get_top_model_s3_uri(
            top_k=0, s3_bucket=config["session"].default_bucket()
        )
        if USE_TUNING_STEP
        else train_model_step.properties.ModelArtifacts.S3ModelArtifacts
    ),
    image_uri=config["image"],
    framework_version=config["framework_version"],
    sagemaker_session=config["session"],
    role=role,
)

# This is the last component of the inference pipeline. It will
# postprocess the output from the TensorFlow model before sending 
# it back to the user.
post_processing_model = SKLearnModel(
    model_data=transformation_pipeline_model,
    entry_point="postprocessing_component.py",
    source_dir=str(INFERENCE_CODE_FOLDER),
    framework_version="1.2-1",
    sagemaker_session=config["session"],
    role=role,
)

# We can now create the inference pipeline using the three models.
pipeline_model = PipelineModel(
    name="inference-model",
    models=[preprocessing_model, tensorflow_model, post_processing_model],
    sagemaker_session=config["session"],
    role=role,
)

## Data Quality Baseline

We need to create a data quality baseline to compare against the real-time traffic our endpoint receives. This baseline will help us detect any data dristribution shifts.

We'll use a [Quality Check Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-quality-check) to generate the baseline and configure the instance that will run the quality check using the [CheckJobConfig](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#checkjobconfig) class.

In [305]:
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import (
    DataQualityCheckConfig,
    QualityCheckStep,
)
from sagemaker.model_monitor.dataset_format import DatasetFormat


DATA_QUALITY_LOCATION = f"{S3_LOCATION}/monitoring/data-quality"


data_quality_baseline_step = QualityCheckStep(
    name="generate-data-quality-baseline",
    check_job_config=CheckJobConfig(
        instance_type="ml.c5.xlarge",
        instance_count=1,
        volume_size_in_gb=20,
        sagemaker_session=pipeline_session,
        role=role,
    ),
    quality_check_config=DataQualityCheckConfig(
        baseline_dataset=f"{S3_LOCATION}/data",
        dataset_format=DatasetFormat.csv(header=True, output_columns_position="END"),
        output_s3_uri=DATA_QUALITY_LOCATION,
    ),
    skip_check=True,
    register_new_baseline=True,
    cache_config=cache_config,
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


## Model Quality Baseline

To monitor the performance of the model, we need to generate a baseline performance. This baseline will help us detect any performance degradation.

To create the baseline, we must generate predictions for the test set and compare them with the predictions from the model. We can do this by running a [Batch Transform Job](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html) to generate predictions for every sample from the test set. We can use a [Transform Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-transform) as part of the pipeline to run this job. We'll configure the [Batch Transform Job](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html) using a [Transform Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-transform).

In [306]:
from sagemaker.workflow.steps import TransformStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.transformer import Transformer

# The Transform Step requires a model to generate predictions, 
# so we need to create the inference pipeline model.
create_model_step = ModelStep(
    name="create",
    display_name="create-model",
    step_args=pipeline_model.create(
        instance_type="ml.m5.xlarge"
    ),
)

transformer = Transformer(
    model_name=create_model_step.properties.ModelName,
    instance_type=config["instance_type"],
    instance_count=1,
    strategy="MultiRecord",
    accept="text/csv",
    assemble_with="Line",
    output_path=f"{S3_LOCATION}/transform",
    sagemaker_session=config["session"]
)

generate_test_predictions_step = TransformStep(
    name="generate-test-predictions",
    step_args=transformer.transform(
        # We will use the baseline set we generated when we split the data.
        # This set corresponds to the test split before the transformation step.
        data=split_and_transform_data_step.properties.ProcessingOutputConfig.Outputs["baseline"].S3Output.S3Uri,
        
        join_source="Input",
        split_type="Line",
        content_type="text/csv",
        input_filter="$",
        
        # We want to output the first and the last field from the joint set.
        # The first field corresponds to the groundtruth, and the last field
        # corresponds to the prediction.
        output_filter="$[0,-1]",
    ),
    cache_config=cache_config
)


Running within a PipelineSession, there will be No Wait, No Logs, and No Job being started.

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


Let's now configure the [Quality Check Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-quality-check) and feed it the data we generated in the Transform Step.

We'll use a [Model Quality Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model-quality) to generate the baseline, and we'll configure the instance that will run the quality check using the [ModelQualityJobConfig](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#modelqualityjobconfig) class.

In [307]:
from sagemaker.workflow.quality_check_step import ModelQualityCheckConfig


MODEL_QUALITY_LOCATION = f"{S3_LOCATION}/monitoring/model-quality"

model_quality_baseline_step = QualityCheckStep(
    name="generate-model-quality-baseline",
    
    check_job_config = CheckJobConfig(
        instance_type="ml.c5.xlarge",
        instance_count=1,
        volume_size_in_gb=20,
        sagemaker_session=pipeline_session,
        role=role,
    ),
    
    quality_check_config = ModelQualityCheckConfig(
        # We are going to use the output of the Transform Step to generate
        # the model quality baseline.
        baseline_dataset=generate_test_predictions_step.properties.TransformOutput.S3OutputPath,
        dataset_format=DatasetFormat.csv(header=False),

        # We need to specify the problem type and the fields where the prediction
        # and groundtruth are so the process knows how to interpret the results.
        problem_type="MulticlassClassification",
        
        # Since the data doesn't have headers, SageMaker will autocreate headers for it.
        # _c0 corresponds to the first column, and _c1 corresponds to the second column.
        ground_truth_attribute="_c0",
        inference_attribute="_c1",

        output_s3_uri=MODEL_QUALITY_LOCATION,
    ),
    
    skip_check=True,
    register_new_baseline=True,
    cache_config=cache_config
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .


INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


## Registration

We can now register the inference pipeline in the Model Registry.

When we register a model, we can specify a set of [ModelMetrics](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_metrics.ModelMetrics) that will be saved in the Model Registry. We'll use the metrics that we calculated using the Quality Check Steps.

In [308]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.drift_check_baselines import DriftCheckBaselines


model_metrics = ModelMetrics(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    
    model_constraints=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
)

drift_check_baselines = DriftCheckBaselines(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    )
)

Let's use a [ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep) to register the model.

In [309]:
MODEL_PACKAGE_GROUP = "penguins"

register_model_step = ModelStep(
    name="register",
    display_name="register-model",
    step_args=pipeline_model.register(
        model_package_group_name=MODEL_PACKAGE_GROUP,
        model_metrics=model_metrics,
        drift_check_baselines=drift_check_baselines,
        approval_status="PendingManualApproval",
        
        # Our inference pipeline model supports two content 
        # types: text/csv and application/json.
        content_types=["text/csv", "application/json"],
        response_types=["text/csv", "application/json"],       

        # This is the suggested inference instance types when 
        # deploying the model or using it as part of a batch
        # transform job.
        inference_instances=["ml.m5.xlarge"],
        transform_instances=["ml.m5.xlarge"],
        
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="TENSORFLOW",
        framework_version=config["framework_version"],
    )
)

## Condition Step

We only want to register the model and generate the baseline predictions if the model's accuracy exceeds a predefined threshold. We can use a [Condition Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) together with the evaluation report we generated to accomplish this. Check the [ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#conditionstep) SageMaker's SDK documentation for more information. In this example, we will use a [ConditionGreaterThanOrEqualTo](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.conditions.ConditionGreaterThanOrEqualTo) condition to compare the model's accuracy with the threshold. Look at the [Conditions](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#conditions) section in the documentation for more information about the types of supported conditions.

If the model's accuracy is not greater than or equal our threshold, we will send the pipeline to a [Fail Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-fail) with the appropriate error message. Check the [FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep) SageMaker's SDK documentation for more information.

We are going to use a new [Pipeline Parameter](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html) in our pipeline to specify the minimum accuracy that the model should reach for it to be registered.

In [310]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import ParameterFloat
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep


accuracy_threshold = ParameterFloat(name="accuracy_threshold", default_value=0.70)

condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[
        # We want to check whether the accuracy of the model is greater than
        # the threshold we defined.
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=evaluate_model_step.name,
                property_file=evaluation_report,
                json_path="metrics.accuracy.value",
            ),
            right=accuracy_threshold,
        )
    ],
    if_steps=[]
    if LOCAL_MODE
    else [
        create_model_step,
        generate_test_predictions_step,
        model_quality_baseline_step,
        register_model_step,
    ],
    # If the condition is not met, we want to fail the execution
    # of the pipeline by sending it to a FailStep.
    else_steps=[
        FailStep(
            name="fail",
            error_message=Join(
                on=" ",
                values=[
                    "Execution failed because the model's accuracy was lower than",
                    accuracy_threshold,
                ],
            ),
        )
    ],
)

## Pipeline

We can now create the SageMaker Pipeline and submit its definition to the SageMaker Pipelines service to create the pipeline if it doesn't exist or update it if it does.

In [311]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig


pipeline = Pipeline(
    name="cohort-pipeline",
    parameters=[
        dataset_location,
        accuracy_threshold,
    ],
    steps=[
        split_and_transform_data_step,
        train_model_step if not USE_TUNING_STEP else tune_model_step,
        evaluate_model_step,
        condition_step
    ],
    pipeline_definition_config=PipelineDefinitionConfig(use_custom_job_prefix=True),
    sagemaker_session=config["session"],
)

if not LOCAL_MODE:
    # SageMaker doesn't support running any of these steps in Local Mode.
    pipeline.steps.extend([data_quality_baseline_step])

pipeline.upsert(role_arn=role)

{'PipelineArn': 'cohort-pipeline'}

<div class="alert" style="background-color:#0066cc;">To run the pipeline, comment out the <code style="background-color:#0066cc;">%%script</code> cell magic line to execute the cell.</div>

In [312]:
# %%script false --no-raise-error

pipeline.start()

INFO:sagemaker.processing:Uploaded None to s3://mlschool/evaluation-processor-2023-10-20-15-03-23-222/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool/evaluation-processor-2023-10-20-15-03-23-222/source/runproc.sh
INFO:sagemaker.processing:Uploaded None to s3://mlschool/evaluation-processor-2023-10-20-15-03-23-989/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool/evaluation-processor-2023-10-20-15-03-23-989/source/runproc.sh
INFO:sagemaker.processing:Uploaded None to s3://mlschool/evaluation-processor-2023-10-20-15-03-24-565/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool/evaluation-processor-2023-10-20-15-03-24-565/source/runproc.sh


Starting execution for pipeline cohort-pipeline. Execution ID is ef2a475a-e7e7-4a54-8ff4-9e3b2cd4eac3


INFO:sagemaker.processing:Uploaded None to s3://mlschool/evaluation-processor-2023-10-20-15-03-25-535/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool/evaluation-processor-2023-10-20-15-03-25-535/source/runproc.sh
INFO:sagemaker.processing:Uploaded None to s3://mlschool/evaluation-processor-2023-10-20-15-03-26-086/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool/evaluation-processor-2023-10-20-15-03-26-086/source/runproc.sh
INFO:sagemaker.processing:Uploaded None to s3://mlschool/evaluation-processor-2023-10-20-15-03-26-534/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool/evaluation-processor-2023-10-20-15-03-26-534/source/runproc.sh


Starting pipeline step: 'split-and-transform-data'


INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.local_session:Starting processing job
INFO:sagemaker.local.image:Using the long-lived AWS credentials found in session
INFO:sagemaker.local.image:docker compose file: 
networks:
  sagemaker-local:
    name: sagemaker-local
services:
  algo-1-11uyr:
    container_name: z95a9txez0-algo-1-11uyr
    entrypoint:
    - python3
    - /opt/ml/processing/input/code/preprocessor.py
    environment:
    - '[Masked]'
    - '[Masked]'
    image: 683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3
    networks:
      sagemaker-local:
        aliases:
        - algo-1-11uyr
    stdin_open: true
    tty: true
    volumes:
    - /private/var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmp76takbc2/algo-1-11uyr/config:/opt/ml/config
    - /private/var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmp76takbc2/algo-1-11uyr/output:/opt/ml/output
    - /private/var/folders/4c/v1q3hy1x4mb5w0wpc

Container z95a9txez0-algo-1-11uyr  Creating
algo-1-11uyr The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested 
Container z95a9txez0-algo-1-11uyr  Created
Attaching to z95a9txez0-algo-1-11uyr
z95a9txez0-algo-1-11uyr exited with code 0
Aborting on container exit...
Container z95a9txez0-algo-1-11uyr  Stopping
Container z95a9txez0-algo-1-11uyr  Stopped
===== Job Complete =====
Pipeline step 'split-and-transform-data' SUCCEEDED.
Starting pipeline step: 'train-model'


INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.local_session:Starting training job
INFO:sagemaker.local.image:Using the long-lived AWS credentials found in session
INFO:sagemaker.local.image:docker compose file: 
networks:
  sagemaker-local:
    name: sagemaker-local
services:
  algo-1-vb8is:
    command: train
    container_name: lyrronf17q-algo-1-vb8is
    environment:
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    - '[Masked]'
    image: sagemaker-tensorflow-training-toolkit-local
    networks:
      sagemaker-local:
        aliases:
        - algo-1-vb8is
    stdin_open: true
    tty: true
    volumes:
    - /private/var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmp8r1o388c/algo-1-vb8is/output/data:/opt/ml/output/data
    - /private/var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmp8r1o388c/algo-1-vb8is/input:/opt/ml/input
    - /private/var/folders/4c/v1q3hy1x4mb5w0wpc72zl3_w0000gp/T/tmp8r1o388c/algo-1-vb8is/output:/opt/ml/outpu

Container lyrronf17q-algo-1-vb8is  Creating
Container lyrronf17q-algo-1-vb8is  Created
Attaching to lyrronf17q-algo-1-vb8is
lyrronf17q-algo-1-vb8is  | 2023-10-20 15:03:37,168 botocore.credentials INFO     Found credentials in environment variables.
lyrronf17q-algo-1-vb8is  | 2023-10-20 15:03:37,467 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
lyrronf17q-algo-1-vb8is  | 2023-10-20 15:03:37,471 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)
lyrronf17q-algo-1-vb8is  | 2023-10-20 15:03:37,481 sagemaker-training-toolkit INFO     instance_groups entry not present in resource_config
lyrronf17q-algo-1-vb8is  | 2023-10-20 15:03:37,487 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)
lyrronf17q-algo-1-vb8is  | 2023-10-20 15:03:37,490 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)
lyrronf17q-algo-1-vb8is  | 2023-10-20 15:03:37,500 sagemaker-tr

INFO:sagemaker.processing:Uploaded None to s3://mlschool/evaluation-processor-2023-10-20-15-03-42-919/source/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://mlschool/evaluation-processor-2023-10-20-15-03-42-919/source/runproc.sh

Running within a PipelineSession, there will be No Wait, No Logs, and No Job being started.

INFO:sagemaker.local.image:'Docker Compose' found using Docker CLI.
INFO:sagemaker.local.local_session:Starting processing job
INFO:sagemaker.local.image:Using the long-lived AWS credentials found in session
INFO:sagemaker.local.image:docker compose file: 
networks:
  sagemaker-local:
    name: sagemaker-local
services:
  algo-1-2t8nd:
    container_name: ymgdk4sfdc-algo-1-2t8nd
    entrypoint:
    - /bin/bash
    - /opt/ml/processing/input/entrypoint/runproc.sh
    environment:
    - '[Masked]'
    - '[Masked]'
    image: sagemaker-tensorflow-training-toolkit-local
    networks:
      sagemaker-local:
        aliases:
        - algo-1-2t8nd
    

Container ymgdk4sfdc-algo-1-2t8nd  Creating
Container ymgdk4sfdc-algo-1-2t8nd  Created
Attaching to ymgdk4sfdc-algo-1-2t8nd
ymgdk4sfdc-algo-1-2t8nd  | Test accuracy: 1.0
ymgdk4sfdc-algo-1-2t8nd exited with code 0
Aborting on container exit...
Container ymgdk4sfdc-algo-1-2t8nd  Stopping
Container ymgdk4sfdc-algo-1-2t8nd  Stopped
===== Job Complete =====
Pipeline step 'evaluate-model' SUCCEEDED.
Starting pipeline step: 'check-model-accuracy'
Pipeline step 'check-model-accuracy' SUCCEEDED.
Pipeline execution ef2a475a-e7e7-4a54-8ff4-9e3b2cd4eac3 SUCCEEDED


<sagemaker.local.entities._LocalPipelineExecution at 0x31e4af6d0>

## Quality Baselines

Our pipeline generated data baseline statistics and constraints using our train set. We can take a look at what these values look like by downloading them from S3.

In [243]:
import json
from IPython.display import JSON
from sagemaker.s3 import S3Downloader


statistics = f"{DATA_QUALITY_LOCATION}/statistics.json"

response = None
try:
    response = json.loads(S3Downloader.read_file(statistics))
except Exception as e:
    pass

JSON(response or {})

<IPython.core.display.JSON object>

In [244]:
constraints = f"{DATA_QUALITY_LOCATION}/constraints.json"

response = None
try:
    response = json.loads(S3Downloader.read_file(constraints))
except Exception as e:
    pass

JSON(response or {})

<IPython.core.display.JSON object>

We also generated the baseline performance using the test set.

In [245]:
constraints = f"{MODEL_QUALITY_LOCATION}/constraints.json"

response = None
try:
    response = json.loads(S3Downloader.read_file(constraints))
except Exception as e:
    pass

JSON(response or {})

<IPython.core.display.JSON object>

# Model Deployment

To deploy the model, we will use [Amazon EventBridge](https://aws.amazon.com/pm/eventbridge/) to trigger a Lambda function that will deploy the model whenever its status changes to "Approved."


## Lambda 

Let's start by writing the Lambda function to take the model information and create a new endpoint.

We'll enable [Data Capture](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-data-capture.html) as part of the endpoint configuration. With Data Capture we can record the inputs and outputs of the endpoint to use them later for monitoring the model:
* `InitialSamplingPercentage` represents the percentage of traffic that we want to capture. 
* `DestinationS3Uri` specifies the S3 location where we want to store the captured data.


In [246]:
%%writefile {CODE_FOLDER}/lambda.py

import os
import json
import boto3
import time

sagemaker = boto3.client("sagemaker")

def lambda_handler(event, context):
    model_package_arn = event["detail"]["ModelPackageArn"]
    approval_status = event["detail"]["ModelApprovalStatus"]
    
    print(f'Model: "{model_package_arn}". Approval Status: "{approval_status}"')
    
    # We only want to deploy the model if it's new approval
    # status is "Approved."
    if approval_status != "Approved":
        return {
            "statusCode": 200,
            "body": json.dumps(f'Skipping deployment. Approval status: "{approval_status}"')
        }    
    
    
    endpoint_name = os.environ["ENDPOINT"]
    data_capture_destination = os.environ["DATA_CAPTURE_DESTINATION"]
    role = os.environ["ROLE"]
    
    timestamp = time.strftime("%m%d%H%M%S", time.localtime())
    model_name = f"{endpoint_name}-model-{timestamp}"
    endpoint_config_name = f"{endpoint_name}-config-{timestamp}"

    sagemaker.create_model(
        ModelName=model_name, 
        ExecutionRoleArn=role, 
        Containers=[{
            "ModelPackageName": model_package_arn
        }] 
    )

    sagemaker.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[{
            "ModelName": model_name,
            "InstanceType": "ml.m5.xlarge",
            "InitialVariantWeight": 1,
            "InitialInstanceCount": 1,
            "VariantName": "AllTraffic",
        }],
        
        # We can enable Data Capture to record the inputs and outputs of the endpoint
        # to use them later for monitoring the model. 
        DataCaptureConfig={
            "EnableCapture": True,
            "InitialSamplingPercentage": 100,
            "DestinationS3Uri": data_capture_destination,
            "CaptureOptions": [
                {
                    "CaptureMode": "Input"
                },
                {
                    "CaptureMode": "Output"
                },
            ],
            "CaptureContentTypeHeader": {
                "CsvContentTypes": [
                    "text/csv",
                    "application/octect-stream"
                ],
                "JsonContentTypes": [
                    "application/json",
                    "application/octect-stream"
                ]
            }
        },
    )
    
    response = sagemaker.list_endpoints(NameContains=endpoint_name, MaxResults=1)

    if len(response["Endpoints"]) == 0:
        # If the endpoint doesn't exist, let's create it.
        sagemaker.create_endpoint(
            EndpointName=endpoint_name, 
            EndpointConfigName=endpoint_config_name,
        )
    else:
        # If the endpoint already exist, let's update it with the
        # new configuration.
        sagemaker.update_endpoint(
            EndpointName=endpoint_name, 
            EndpointConfigName=endpoint_config_name,
        )
    
    return {
        "statusCode": 200,
        "body": json.dumps("Endpoint deployed successfully")
    }

Overwriting code/lambda.py


We need to ensure our Lambda function has permission to interact with SageMaker, so let's create a new role and then create the lambda function.

In [247]:
ENDPOINT = "penguins-endpoint"
DATA_CAPTURE_DESTINATION = f"{S3_LOCATION}/monitoring/data-capture"


lambda_role_name = "lambda-deployment-role"
lambda_role_arn = None

try:
    response = iam_client.create_role(
        RoleName = lambda_role_name,
        AssumeRolePolicyDocument = json.dumps({
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "Service": [
                            "lambda.amazonaws.com",
                            "events.amazonaws.com"
                        ]
                    },
                    "Action": "sts:AssumeRole",
                }
            ]
        }),
        Description="Lambda Endpoint Deployment"
    )

    lambda_role_arn = response["Role"]["Arn"]
    
    iam_client.attach_role_policy(
        RoleName="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
        PolicyArn=lambda_role_arn
    )
    
    iam_client.attach_role_policy(
        RoleName="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
        PolicyArn=lambda_role_arn
    )
    
    print(f'Role "{lambda_role_name}" created with ARN "{lambda_role_arn}".')
except iam_client.exceptions.EntityAlreadyExistsException:
    print(f"Role {lambda_role_name} already exists.")
    response = iam_client.get_role(RoleName=lambda_role_name)
    lambda_role_arn = response["Role"]["Arn"]

Role lambda-deployment-role already exists.


Let's create the Lambda function.

In [248]:
from sagemaker.lambda_helper import Lambda


deploy_lambda_fn = Lambda(
    function_name="deploy_fn",
    execution_role_arn=lambda_role_arn,
    script=str(CODE_FOLDER / "lambda.py"),
    handler="lambda.lambda_handler",
    timeout=600,
    session=sagemaker_session,
    runtime="python3.11",
    environment={
        "Variables": {
            "ENDPOINT": ENDPOINT,
            "DATA_CAPTURE_DESTINATION": DATA_CAPTURE_DESTINATION,
            "ROLE": role,
        }
    }
)

lambda_response = None
if not LOCAL_MODE:
    lambda_response = deploy_lambda_fn.upsert()

lambda_response

{'ResponseMetadata': {'RequestId': '1e676d09-bacc-4b88-af4b-086aed5abe16',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 19 Oct 2023 19:51:18 GMT',
   'content-type': 'application/json',
   'content-length': '1428',
   'connection': 'keep-alive',
   'x-amzn-requestid': '1e676d09-bacc-4b88-af4b-086aed5abe16'},
  'RetryAttempts': 0},
 'FunctionName': 'deploy_fn',
 'FunctionArn': 'arn:aws:lambda:us-east-1:325223348818:function:deploy_fn',
 'Runtime': 'python3.11',
 'Role': 'arn:aws:iam::325223348818:role/lambda-deployment-role',
 'Handler': 'lambda.lambda_handler',
 'CodeSize': 3142,
 'Description': '',
 'Timeout': 600,
 'MemorySize': 128,
 'LastModified': '2023-10-19T19:51:18.000+0000',
 'CodeSha256': 'GvIH6E8ZyghrSf4h2g/j+qIcE0HVvoFrno9erm8Qheo=',
 'Version': '$LATEST',
 'Environment': {'Variables': {'ROLE': 'arn:aws:iam::325223348818:role/service-role/AmazonSageMaker-ExecutionRole-20230312T160501',
   'DATA_CAPTURE_DESTINATION': 's3://mlschool/penguins/monitoring/data-captur

## EventBridge

Let's create an EventBridge rule that triggers the deployment process whenever a model approval status becomes "Approved". 

In [249]:
events_client = boto3.client("events")

event_pattern = f"""
{{
  "source": ["aws.sagemaker"],
  "detail-type": ["SageMaker Model Package State Change"],
  "detail": {{
    "ModelPackageGroupName": ["{MODEL_PACKAGE_GROUP}"],
    "ModelApprovalStatus": ["Approved"]
  }}
}}
"""

rule_response = None
if not LOCAL_MODE:
  rule_response = events_client.put_rule(
      Name="model-approval-rule",
      EventPattern=event_pattern,
      State="ENABLED",
      RoleArn=role,
  )

  response = events_client.put_targets(
      Rule="model-approval-rule",
      Targets=[
          {
              "Id": "1",
              "Arn": lambda_response["FunctionArn"],
          }
      ]
  )

rule_response

{'RuleArn': 'arn:aws:events:us-east-1:325223348818:rule/model-approval-rule',
 'ResponseMetadata': {'RequestId': 'a8d3fda6-0bf6-49f6-a28e-7cd6aac2b28f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a8d3fda6-0bf6-49f6-a28e-7cd6aac2b28f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '76',
   'date': 'Thu, 19 Oct 2023 19:51:18 GMT'},
  'RetryAttempts': 0}}

We need to give the event permissions to invoke the Lambda function.

In [250]:
lambda_client = boto3.client("lambda")

if not LOCAL_MODE:
    try:
        response = lambda_client.add_permission(
            Action="lambda:InvokeFunction",
            FunctionName=lambda_response["FunctionName"],
            Principal="events.amazonaws.com",
            SourceArn=rule_response["RuleArn"],
            StatementId="EventBridge",
        )
    except lambda_client.exceptions.ResourceConflictException as e:
        print(f'Function "{lambda_response["FunctionName"]}" already has the correct permissions.')

Function "deploy_fn" already has the correct permissions.


## Predictions

Let's now test the endpoint we deployed automatically with the pipeline. We will use the function to create a predictor with a JSON encoder and decoder. 

<div class="alert" style="background-color:#0066cc;">Uncomment the <code style="background-color:#0066cc;">%%script</code> cell magic line to execute this cell.</div>

In [251]:
%%script false --no-raise-error

from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer


waiter = sagemaker_client.get_waiter("endpoint_in_service")
waiter.wait(
    EndpointName=ENDPOINT,
    WaiterConfig={
        "Delay": 10,
        "MaxAttempts": 30
    }
)

predictor = Predictor(
    endpoint_name=ENDPOINT, 
    serializer=CSVSerializer(),
    sagemaker_session=sagemaker_session
)

data = pd.read_csv(DATA_FILEPATH)
data = data.drop("species", axis=1)

payload = data.iloc[:3].to_csv(header=False, index=False)
response = predictor.predict(payload, initial_args={"ContentType": "text/csv"})
print(response.decode("utf-8"))

## Data Capture

Let's check the S3 location where the endpoint stores the requests and responses that it receives.

Notice that it make take a few minutes for the first few files to show up in S3. Keep running the following line until you get some.

In [252]:
files = S3Downloader.list(DATA_CAPTURE_DESTINATION)[:3]
files

['s3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2023/09/25/13/15-40-735-9cc3750d-ba42-472c-903d-969695d2096d.jsonl',
 's3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2023/09/27/15/45-31-289-001fc69f-c352-4da2-b57a-a3a69fe3fecf.jsonl',
 's3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2023/10/05/16/50-04-992-e16242d1-925c-4b07-9289-dffa0e026679.jsonl']

These files contain the data captured by the endpoint in a SageMaker-specific JSON-line format. Each inference request is captured in a single line in the `jsonl` file. The line contains both the input and output merged together.

Let's read the first line from the first file:

In [253]:
if len(files):
    lines = S3Downloader.read_file(files[0])
    print(json.dumps(json.loads(lines.split("\n")[0]), indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "Torgersen,39.1,18.7,181.0,3750.0,MALE\nTorgersen,39.5,17.4,186.0,3800.0,FEMALE\nTorgersen,40.3,18.0,195.0,3250.0,FEMALE\n",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "application/json",
      "mode": "OUTPUT",
      "data": "[{\"prediction\": \"Adelie\", \"confidence\": 0.775418103}, {\"prediction\": \"Adelie\", \"confidence\": 0.775709867}, {\"prediction\": \"Adelie\", \"confidence\": 0.67967391}]",
      "encoding": "JSON"
    }
  },
  "eventMetadata": {
    "eventId": "d33f9a23-5ae3-4403-9aa1-3759d7fa8015",
    "inferenceTime": "2023-09-25T13:15:40Z"
  },
  "eventVersion": "0"
}


## Clean up

Let's now delete the endpoint.

<div class="alert" style="background-color:#0066cc;">Uncomment the <code style="background-color:#0066cc;">%%script</code> cell magic line to execute this cell.</div>

In [254]:
%%script false --no-raise-error

predictor.delete_endpoint()

# Model Monitoring

In this session we'll set up a monitoring process to analyze the quality of the data our endpoint receives and the endpoint predictions. For this, we need to check the data received by the endpoint, generate ground truth labels, and compare them with a baseline performance.

To enable this functionality, we need a couple of steps:

1. Create baselines we can use to compare against real-time traffic.
2. Set up a schedule to continuously evaluate and compare against the baselines.

Notice that we use the baseline datasets we generated during the Processing Step. These baseline datasets are the same unprocessed data in JSON format. We do this because we need raw data to compare against the endpoint input.

Check [Amazon SageMaker Model Monitor](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_monitoring.html) for a brief explanation of how to use SageMaker's Model Monitoring functionality. [Monitor models for data and model quality, bias, and explainability](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html) is a much more extensive guide to monitoring in Amazon SageMaker.

In [255]:
GROUND_TRUTH_LOCATION = f"{S3_LOCATION}/monitoring/groundtruth"

## Fake Traffic

To test the monitoring functionality, we need to generate traffic to the endpoint.

To generate traffic, we will repeatedly send every sample from the dataset to the endpoint to simulate real prediction requests.

The following function will generate the traffic to the endpoint.

In [256]:
from time import sleep
from threading import Thread, Event


def generate_traffic(predictor):
    
    def _predict(data, predictor, stop_traffic_thread):
        for index, row in data.iterrows():
            data = row.tolist()
            data = ','.join(map(str, data))
            predictor.predict(data, inference_id=str(index), initial_args={"ContentType": "text/csv"})
            
            sleep(1)

            if stop_traffic_thread.is_set():
                break

    def _generate_prediction_data(data, predictor, stop_traffic_thread):
        while True:
            print(f"Generating {data.shape[0]} predictions...")
            _predict(data, predictor, stop_traffic_thread)
            
            if stop_traffic_thread.is_set():
                break

                
    stop_traffic_thread = Event()
    
    data = pd.read_csv(DATA_FILEPATH, header=0).dropna()
    data.drop(["species"], axis=1, inplace=True)
    
    traffic_thread = Thread(
        target=_generate_prediction_data,
        args=(data, predictor, stop_traffic_thread,)
    )
    
    traffic_thread.start()
    
    return stop_traffic_thread, traffic_thread

Let's wait for the endpoint to be in service, and then we can start generating traffic to the endpoint.

<div class="alert" style="background-color:#0066cc;">Uncomment the <code style="background-color:#0066cc;">%%script</code> cell magic line to execute this cell.</div>

In [257]:
%%script false --no-raise-error

waiter = sagemaker_client.get_waiter("endpoint_in_service")
waiter.wait(
    EndpointName=ENDPOINT,
    WaiterConfig={
        "Delay": 10,
        "MaxAttempts": 30
    }
)

predictor = Predictor(
    endpoint_name=ENDPOINT, 
    serializer=CSVSerializer(),
    sagemaker_session=sagemaker_session
)

stop_traffic_thread, traffic_thread = generate_traffic(predictor)

## Fake Labels

To test the performance of the model, we need to label the samples captured by the endpoint. We can simulate the labeling process by generating a random label for every sample. Check [Ingest Ground Truth Labels and Merge Them With Predictions](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-merge.html) for more information about this.

In [258]:
import random
from datetime import datetime


def generate_ground_truth_data(ground_truth_location):
    
    def _generate_ground_truth_record(inference_id):
        random.seed(inference_id)

        return {
            "groundTruthData": {
                "data": random.choice(["Adelie", "Chinstrap", "Gentoo"]),
                "encoding": "CSV",
            },
            "eventMetadata": {
                "eventId": str(inference_id),
            },
            "eventVersion": "0",
        }


    def _upload_ground_truth(records, upload_time):
        records = [json.dumps(r) for r in records]
        data = "\n".join(records)
        uri = f"{ground_truth_location}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"

        print(f"Uploading ground truth data to {uri}...")

        S3Uploader.upload_string_as_file_body(data, uri)    

                
    def _generate_ground_truth_data(max_records, stop_ground_truth_thread):
        while True:
            records = [_generate_ground_truth_record(i) for i in range(max_records)]
            _upload_ground_truth(records, datetime.utcnow())

            if stop_ground_truth_thread.is_set():
                break

            sleep(30)


    stop_ground_truth_thread = Event()
    data = pd.read_csv(DATA_FILEPATH).dropna()
    
    groundtruth_thread = Thread(
        target=_generate_ground_truth_data,
        args=(len(data), stop_ground_truth_thread,)
    )
    
    groundtruth_thread.start()
    
    return stop_ground_truth_thread, groundtruth_thread

We can now start generating fake labels.

<div class="alert" style="background-color:#0066cc;">Uncomment the <code style="background-color:#0066cc;">%%script</code> cell magic line to execute this cell.</div>

In [259]:
%%script false --no-raise-error

stop_ground_truth_thread, groundtruth_thread = generate_ground_truth_data(
    GROUND_TRUTH_LOCATION
)

## Monitoring Jobs

We can now schedule the Monitoring Jobs to continuously monitor the data going into the endpoint and the model performance. We will use the baseline we generated in the pipeline to determine when there's drift. Check [Schedule Data Quality Monitoring Jobs](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-schedule-data-monitor.html) and [Schedule Model Quality Monitoring Jobs](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-schedule.html) for more information.

The following functions will help us work with monitoring schedules later on.

In [260]:
def describe_monitoring_schedules(endpoint_name):
    schedules = []
    response = sagemaker_client.list_monitoring_schedules(EndpointName=endpoint_name)["MonitoringScheduleSummaries"]
    for item in response:
        name = item["MonitoringScheduleName"]
        schedule = {
            "MonitoringScheduleName": name,
            "MonitoringType": item["MonitoringType"]
        }
        
        description = sagemaker_client.describe_monitoring_schedule(
            MonitoringScheduleName=name
        )
        
        schedule["Status"] = description["LastMonitoringExecutionSummary"]["MonitoringExecutionStatus"]
        
        if schedule["Status"] == "Failed":
            schedule["FailureReason"] = description["LastMonitoringExecutionSummary"]["FailureReason"]
        elif schedule["Status"] == "CompletedWithViolations":
            processing_job_arn = description["LastMonitoringExecutionSummary"]["ProcessingJobArn"]
            execution = MonitoringExecution.from_processing_arn(
                sagemaker_session=sagemaker_session, 
                processing_job_arn=processing_job_arn
            )
            execution_destination = execution.output.destination

            violations_filepath = os.path.join(execution_destination, "constraint_violations.json")
            violations = json.loads(S3Downloader.read_file(violations_filepath))["violations"]
            
            schedule["Violations"] = violations

        schedules.append(schedule)
        
    return schedules

def describe_monitoring_schedule(endpoint_name, monitoring_type):
    found = False
    
    schedules = describe_monitoring_schedules(endpoint_name)
    for schedule in schedules:
        if schedule["MonitoringType"] == monitoring_type:
            found = True
            print(json.dumps(schedule, indent=2))

    if not found:            
        print(f"There's no {monitoring_type} Monitoring Schedule.")


def describe_data_monitoring_schedule(endpoint_name):
    describe_monitoring_schedule(endpoint_name, "DataQuality")

    
def describe_model_monitoring_schedule(endpoint_name):
    describe_monitoring_schedule(endpoint_name, "ModelQuality")

    
def delete_monitoring_schedule(endpoint_name, monitoring_type):
    attempts = 30
    found = False
    
    response = sagemaker_client.list_monitoring_schedules(EndpointName=endpoint_name)["MonitoringScheduleSummaries"]
    for item in response:
        if item["MonitoringType"] == monitoring_type:
            found = True
            status = sagemaker_client.describe_monitoring_schedule(
                MonitoringScheduleName=item["MonitoringScheduleName"]
            )["MonitoringScheduleStatus"]
            while status in ("Pending", "InProgress") and attempts > 0:
                attempts -= 1
                print(f"Monitoring schedule status: {status}. Waiting for it to finish.")
                sleep(30)
                
                status = sagemaker_client.describe_monitoring_schedule(
                    MonitoringScheduleName=item["MonitoringScheduleName"]
                )["MonitoringScheduleStatus"]

            if status not in ("Pending", "InProgress"):
                sagemaker_client.delete_monitoring_schedule(
                    MonitoringScheduleName=item["MonitoringScheduleName"]
                )
                print("Monitoring schedule deleted.")
            else:
                print("Waiting for monitoring schedule timed out")
                
    if not found:            
        print(f"There's no {monitoring_type} Monitoring Schedule.")

        
def delete_data_monitoring_schedule(endpoint_name):
    delete_monitoring_schedule(endpoint_name, "DataQuality")

    
def delete_model_monitoring_schedule(endpoint_name):
    delete_monitoring_schedule(endpoint_name, "ModelQuality")

### Data Monitoring

SageMaker looks for violations in the data captured by the endpoint. By default, it combines the input data with the endpoint output and compare the result with the baseline we generated. If we let SageMaker do this, we will get a few violations, for example an "extra column check" violation because the field `confidence` doesn't exist in the baseline data.

We can fix these violations by creating a preprocessing script configuring the data we want the monitoring job to use. Check [Preprocessing and Postprocessing](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-pre-and-post-processing.html) for more information about how to configure these scripts.

In [261]:
DATA_QUALITY_PREPROCESSOR = "data_quality_preprocessor.py"

In [262]:
%%writefile {CODE_FOLDER}/{DATA_QUALITY_PREPROCESSOR}
import json

def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    output_data = json.loads(inference_record.endpoint_output.data)
    
    response = json.loads(input_data)
    response["species"] = output_data["prediction"]

    # The `response` variable contains the data that we want the
    # monitoring job to use to compare with the baseline.
    return response

Overwriting code/data_quality_preprocessor.py


The monitoring schedule expects an S3 location pointing to the preprocessing script. Let's upload the script to the default bucket.

In [263]:
import os

if not LOCAL_MODE:
    bucket = boto3.Session().resource("s3").Bucket(pipeline_session.default_bucket())
    prefix = "penguins-monitoring"
    bucket.Object(os.path.join(prefix, DATA_QUALITY_PREPROCESSOR)).upload_file(str(CODE_FOLDER / DATA_QUALITY_PREPROCESSOR))
    data_quality_preprocessor = f"s3://{os.path.join(bucket.name, prefix, DATA_QUALITY_PREPROCESSOR)}"
    data_quality_preprocessor

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


We can now set up the Data Quality Monitoring Job using the [DefaultModelMonitor](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_monitor.model_monitoring.DefaultModelMonitor) class. Notice how we specify the `record_preprocessor_script` using the S3 location where we uploaded our script.

<div class="alert" style="background-color:#0066cc;">Uncomment the <code style="background-color:#0066cc;">%%script</code> cell magic line to execute this cell.</div>

In [264]:
%%script false --no-raise-error

from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor

data_monitor = DefaultModelMonitor(
    instance_type="ml.m5.xlarge",
    instance_count=1,
    max_runtime_in_seconds=3600,
    role=role,
)

data_monitor.create_monitoring_schedule(
    monitor_schedule_name="penguins-data-monitoring-schedule",
    endpoint_input=ENDPOINT,
    record_preprocessor_script=data_quality_preprocessor,
    statistics=f"{DATA_QUALITY_LOCATION}/statistics.json",
    constraints=f"{DATA_QUALITY_LOCATION}/constraints.json",
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

We can check the results of the monitoring job by looking at whether it generated any violations.

In [265]:
describe_data_monitoring_schedule(ENDPOINT)

There's no DataQuality Monitoring Schedule.


### Model Monitoring

To set up a Model Quality Monitoring Job, we can use the [ModelQualityMonitor](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_monitor.model_monitoring.ModelQualityMonitor) class. The [EndpointInput](https://sagemaker.readthedocs.io/en/v2.24.2/api/inference/model_monitor.html#sagemaker.model_monitor.model_monitoring.EndpointInput) instance configures the attribute the monitoring job should use to determine the prediction from the model.

Check [Amazon SageMaker Model Quality Monitor](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker_model_monitor/model_quality/model_quality_churn_sdk.html) for a complete tutorial on how to run a Model Monitoring Job in SageMaker.

<div class="alert" style="background-color:#0066cc;">Uncomment the <code style="background-color:#0066cc;">%%script</code> cell magic line to execute this cell.</div>

In [266]:
%%script false --no-raise-error

from sagemaker.model_monitor import ModelQualityMonitor, EndpointInput


model_monitor = ModelQualityMonitor(
    instance_type="ml.m5.xlarge",
    instance_count=1,
    max_runtime_in_seconds=1800,
    role=role
)

model_monitor.create_monitoring_schedule(
    monitor_schedule_name="penguins-model-monitoring-schedule",
    
    endpoint_input = EndpointInput(
        endpoint_name=ENDPOINT,
        inference_attribute="prediction",
        destination="/opt/ml/processing/input_data",
    ),
    
    problem_type="MulticlassClassification",
    ground_truth_input=GROUND_TRUTH_LOCATION,
    
    constraints=f"{MODEL_QUALITY_LOCATION}/constraints.json",
    
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    output_s3_uri=f"{S3_LOCATION}/monitoring/model-quality",
    enable_cloudwatch_metrics=True,
)

We can check the results of the monitoring job by looking at whether it generated any violations.

In [267]:
describe_model_monitoring_schedule(ENDPOINT)

There's no ModelQuality Monitoring Schedule.


### Clean up

The following code will stop the generation of traffic and labels, delete the monitoring jobs, and delete the endpoint.

<div class="alert" style="background-color:#0066cc;">Uncomment the <code style="background-color:#0066cc;">%%script</code> cell magic line to execute this cell.</div>

In [268]:
%%script false --no-raise-error

stop_traffic_thread.set()
traffic_thread.join()

stop_ground_truth_thread.set()
groundtruth_thread.join()

delete_data_monitoring_schedule(ENDPOINT)
delete_model_monitoring_schedule(ENDPOINT)