# Penguins in Production

This notebook aims to create a [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) to build an end-to-end Machine Learning system to solve the problem of classifying penguin species.

This example uses the [Penguins dataset](https://www.kaggle.com/parulpandey/palmer-archipelago-antarctica-penguin-data).

<img src='https://imgur.com/orZWHly.png' alt='Penguins dataset' width="900">

Amazon SageMaker is free to try. Your free tier starts from the first month you create your first SageMaker resource and lasts two months. Check out the  [Amazon SageMaker Pricing](https://aws.amazon.com/sagemaker/pricing/) for more information. Also, we'll be working extensively with [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html) and the [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/). Keep their documentation handy.

This notebook was created by [Santiago L. Valdarrama](https://twitter.com/svpino) as part of the [Machine Learning School](https://www.ml.school) program.

Let's ensure we are running the latest version of the SakeMaker SDK. **Restart the Kernel** after you run the following cell.

In [2]:
!pip install -q --upgrade pip
!pip install -q --upgrade awscli boto3
!pip install -q --upgrade sagemaker==2.146.0
!pip show sagemaker

[0mName: sagemaker
Version: 2.146.0
Summary: Open source library for training and deploying models on Amazon SageMaker.
Home-page: https://github.com/aws/sagemaker-python-sdk/
Author: Amazon Web Services
Author-email: 
License: Apache License 2.0
Location: /usr/local/lib/python3.8/site-packages
Requires: attrs, boto3, google-pasta, importlib-metadata, jsonschema, numpy, packaging, pandas, pathos, platformdirs, protobuf, protobuf3-to-dict, PyYAML, schema, smdebug-rulesconfig
Required-by: 


In [2]:
%load_ext autoreload
%autoreload 2

# Initial Setup

Let's start by preparing the S3 bucket where we will organize every resource we are going to use during the program. Make sure you set `BUCKET` to the bucket name you want to use. This name has to be unique. The [command line interface](https://docs.aws.amazon.com/cli/latest/index.html) is a simple way to interact with the AWS services. You can combine Python code with bash commands in the same notebook cell, which makes notebooks a very flexible tool.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

After we have a bucket, we can download the [Penguins dataset](https://www.kaggle.com/parulpandey/palmer-archipelago-antarctica-penguin-data) and store it in a folder inside the bucket. Our SageMaker Pipeline will use this dataset.

In [3]:
BUCKET = "mlschool"

!aws s3api create-bucket --bucket $BUCKET

{
    "Location": "/mlschool"
}


In [4]:
import pandas as pd
import sagemaker
import urllib.request

from pathlib import Path


PENGUINS_FOLDER = Path("penguins")
S3_FILEPATH = f"s3://{BUCKET}/{PENGUINS_FOLDER}"
LOCAL_FILEPATH = Path(PENGUINS_FOLDER)/ "data.csv"

# Create the local folder if it doesn't exist.
PENGUINS_FOLDER.mkdir(parents=True, exist_ok=True)

# Download the official Penguins dataset and store it locally.
urllib.request.urlretrieve(
    "https://storage.googleapis.com/download.tensorflow.org/data/palmer_penguins/penguins_size.csv", 
    LOCAL_FILEPATH
)

# Upload the dataset to S3. We need to do this to make it available to 
# the preprocessing step.
INPUT_DATA_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(LOCAL_FILEPATH), 
    desired_s3_uri=S3_FILEPATH,
)

print(f"Dataset S3 location: {INPUT_DATA_URI}")

Dataset S3 location: s3://mlschool/penguins/data.csv


We can now load and display the dataset.

In [5]:
df = pd.read_csv(LOCAL_FILEPATH)
df

Unnamed: 0,species,island,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g,sex
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,MALE
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,FEMALE
2,Adelie,Torgersen,40.3,18.0,195.0,3250.0,FEMALE
3,Adelie,Torgersen,,,,,
4,Adelie,Torgersen,36.7,19.3,193.0,3450.0,FEMALE
...,...,...,...,...,...,...,...
339,Gentoo,Biscoe,,,,,
340,Gentoo,Biscoe,46.8,14.3,215.0,4850.0,FEMALE
341,Gentoo,Biscoe,50.4,15.7,222.0,5750.0,MALE
342,Gentoo,Biscoe,45.2,14.8,212.0,5200.0,FEMALE


# Session 1 - Data Preprocessing

This session aims to build a simple [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) with one step to preprocess the [Penguins dataset](https://www.kaggle.com/parulpandey/palmer-archipelago-antarctica-penguin-data). We'll use a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) with a [SKLearnProcessor](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor) to execute a preprocessing script. Check the [SageMaker Pipelines Overview](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) for an introduction to the fundamental components of a SageMaker Pipeline.

Here is what the Pipeline will look like at the end of this session:

<img src='penguins/images/session1-pipeline.png' alt='Session 1 Pipeline' width="600">


In [6]:
import os
import numpy as np
import boto3
import json
import numpy as np
import argparse
import tempfile

from botocore.exceptions import ClientError
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig

Let's start by defining a few variables we'll use throughout this notebook:

* `sagemaker_client`: We'll use a [boto3 SageMaker Client](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html) instance to access SageMaker.
* `iam_client`: We'll use a [boto3 IAM Client](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html) instance to access IAM.
* `role`: This is the execution role attached to this notebook. We can use this role with any of the SageMaker services that need it to ensure they run with the appropriate permissions.
* `region`: The current region attached to our session. 
* `sagemaker_session`: The current SageMaker session.

In [7]:
iam_client = boto3.client("iam")
sagemaker_client = boto3.client("sagemaker")
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

## Step 1 - Preprocessing the Dataset

Let's create a script to do feature engineering on the original dataset. We will run this script using a SageMaker Processing Job later in this session.

The script should split the data into train, validation, and test sets so we can later train and evaluate a model. We will save the Scikit-Learn pipeline that we use to preprocess the data to use it during inference time.

The script uses the [np.split()](https://numpy.org/doc/stable/reference/generated/numpy.split.html) function to split the dataset into three sets in the following way:

1. The train set will use the top 70% of the data.
2. The validation set will use 15% of the data, starting with the sample after the 70% used for the train set.
3. Finally, the test set will use the remaining 15% of the data.

Pay special attention to the way the Scikit-Learn pipeline `preprocessor` is used to process the three sets:

* First, we use the `fit_transform()` to fit the pipeline on the train set.
* Then, we consecutively transform the validation and test sets using `transform()`.

Always use `fit_transform()` on the training data to fit the scaling parameters we need to transform the data. For example, `fit_transform()` will learn the mean and variance of the features of the training set. It can then use these same parameters to scale the validation and test sets.

That's why we want to save this Scikit-Learn pipeline to use later to scale production data using the same parameters we learned on the train set.

In [90]:
%%writefile {PENGUINS_FOLDER}/preprocessor.py

import os
import numpy as np
import pandas as pd
import tempfile

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from pickle import dump


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIRECTORY = "/opt/ml/processing"
DATA_FILEPATH = Path(BASE_DIRECTORY) / "input" / "data.csv"


def _save_splits(base_directory, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """
    
    train_path = Path(base_directory) / "train" 
    validation_path = Path(base_directory) / "validation" 
    test_path = Path(base_directory) / "test"
    
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)
    
    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)
    

def _save_pipeline(base_directory, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_directory) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", 'wb'))
    

def _save_classes(base_directory, classes):
    """
    Saves the list of classes from the dataset.
    """
    path = Path(base_directory) / "classes"
    path.mkdir(parents=True, exist_ok=True)
    
    print("CLASSES", np.asarray(classes))

    np.asarray(classes).tofile(path / "classes.csv", sep = ",") 
    

def _generate_baseline_dataset(split_name, base_directory, X, y):
    """
    To monitor the data and the quality of our model we need to compare the 
    production quality and results against a baseline. To create those baselines, 
    we need to use a dataset to compute statistics and constraints. That dataset
    should contain information in the same format as expected by the production
    endpoint. This function will generate a baseline dataset and save it to 
    disk so we can later use it.
    
    """
    baseline_path = Path(base_directory) / f"{split_name}-baseline" 
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = X.copy()
    
    # The baseline dataset needs a column containing the groundtruth.
    df["groundtruth"] = y
    df["groundtruth"] = df["groundtruth"].values.astype(str)
    
    # We will use the baseline dataset to generate baselines
    # for monitoring data and model quality. To simplify the process, 
    # we don't want to include any NaN rows.
    df = df.dropna()

    df.to_json(baseline_path / f"{split_name}-baseline.json", orient='records', lines=True)
    
    
def preprocess(base_directory, data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train, validation,
    and a test set.
    """
    
    df = pd.read_csv(data_filepath)
    
    numerical_columns = [column for column in df.columns if df[column].dtype in ["int64", "float64"]]
    
    numerical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="mean")),
        ("scaler", StandardScaler())
    ])

    categorical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore"))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ("numerical", numerical_preprocessor, numerical_columns),
            ("categorical", categorical_preprocessor, ["island"]),
        ]
    )
    

    X = df.drop(["sex"], axis=1)
    columns = list(X.columns)
    
    X = X.to_numpy()
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7 * len(X)), int(.85 * len(X))])
    
    X_train = pd.DataFrame(train, columns=columns)
    X_validation = pd.DataFrame(validation, columns=columns)
    X_test = pd.DataFrame(test, columns=columns)
    
    y_train = X_train.species
    y_validation = X_validation.species
    y_test = X_test.species
    
    label_encoder = LabelEncoder()
    
    y_train = label_encoder.fit_transform(y_train)
    y_validation = label_encoder.transform(y_validation)
    y_test = label_encoder.transform(y_test)
    
    X_train.drop(["species"], axis=1, inplace=True)
    X_validation.drop(["species"], axis=1, inplace=True)
    X_test.drop(["species"], axis=1, inplace=True)

    # Let's generate a dataset that we can later use to compute
    # baseline statistics and constraints about the data that we
    # used to train our model.
    _generate_baseline_dataset("train", base_directory, X_train, y_train)
    
    # To generate baseline constraints about the quality of the
    # model's predictions, we will use the test set.
    _generate_baseline_dataset("test", base_directory, X_test, y_test)
    
    # Transform the data using the Scikit-Learn pipeline.
    X_train = preprocessor.fit_transform(X_train)
    X_validation = preprocessor.transform(X_validation)
    X_test = preprocessor.transform(X_test)
    
    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)
    
    _save_splits(base_directory, train, validation, test)
    _save_pipeline(base_directory, pipeline=preprocessor)
    _save_classes(base_directory, label_encoder.classes_)
        

if __name__ == "__main__":
    preprocess(BASE_DIRECTORY, DATA_FILEPATH)


Overwriting penguins/preprocessor.py


## Step 2 - Testing the Preprocessing Script

We can now load the script we just created and run it locally to ensure it outputs every file we need.

We will set up a SageMaker Processing Job to run this script, but we always want to test the code locally. In this case, we can call the `preprocess()` function with the local directory and the local copy of the dataset.

In [91]:
from penguins.preprocessor import preprocess


def print_baseline(split_name):
    print()
    print(f"Baseline {split_name}:")
    with open(Path(directory) / f"{split_name}-baseline" / f"{split_name}-baseline.json") as baseline:
        lines = [next(baseline) for _ in range(5)]
        
    for l in lines:
        print(l[:-1])
    

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        data_filepath=LOCAL_FILEPATH
    )
    
    print(f"Folders: {os.listdir(directory)}")

CLASSES ['Adelie' 'Chinstrap' 'Gentoo']
Folders: ['train-baseline', 'test-baseline', 'train', 'validation', 'test', 'pipeline', 'classes']


## Step 3 - Pipeline Configuration

When creating a SageMaker Pipeline, we can specify a list of parameters we can use on individual pipeline steps. To read more about these parameters, check [Pipeline Parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html).

These are the parameters that we need right now:

* `dataset_location`: This parameter represents the dataset's location in S3. We will use this parameter to indicate the SageMaker Processing Job where to find the dataset. The Processing Job will download the dataset from S3 and make it available on the instance running the script.
* `preprocessor_destination`: We need to define the location where the SageMaker Processing Job will store the output. When it finishes, the Processing Job will copy the script's output to the S3 location specified by this parameter. By default, SageMaker uploads the output of a job to a custom location in S3, but unfortunately, if we rely on that functionality, we can't cache the Processing Step in the Pipeline.
* `train_dataset_baseline_destination`: This parameter represents the location where we will store the train dataset to compute constraints and statistic baselines in Session 6.
* `test_dataset_baseline_destination`: This parameter represents the location where we will store the test dataset to compute constraints and statistic baselines in Session 6.
* `timestamp_signature`: We'll use this parameter to automatically generate resources using a unique suffix to avoid collisions.

In [10]:
dataset_location = ParameterString(
    name="dataset_location",
    default_value=INPUT_DATA_URI,
)

preprocessor_destination = ParameterString(
    name="preprocessor_destination",
    default_value=f"{S3_FILEPATH}/preprocessing",
)

train_dataset_baseline_destination = ParameterString(
    name="train_dataset_baseline_destination",
    default_value=f"{S3_FILEPATH}/preprocessing/baselines/train",
)

test_dataset_baseline_destination = ParameterString(
    name="test_dataset_baseline_destination",
    default_value=f"{S3_FILEPATH}/preprocessing/baselines/test",
)

timestamp_signature = ParameterString(
    name="timestamp_signature",
    default_value="",
)

## Step 4 - Caching Pipeline Steps

While building a pipeline, you only want to rerun every step if you expect a different result. To accomplish this, you can instruct SageMaker to reuse the result of a previous successful run of a pipeline step. You can find more information about this topic in [Caching Pipeline Steps](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html).

In [11]:
cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

## Step 5 - Setting up a Processing Step

The first step we need in the pipeline is a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) to run the preprocessing script. This Processing Step will create a SageMaker Processing Job in the background, run the script, and upload the output to S3. You can use Processing Jobs to perform data preprocessing, post-processing, feature engineering, data validation, and model evaluation. Check the [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) SageMaker's SDK documentation for more information.

A processor gives the Processing Step information about the hardware and software that SageMaker should use to launch the Processing Job. To run the script, we need access to Scikit-Learn, so we can use the [SKLearnProcessor](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor) processor that comes out-of-the-box with the SageMaker's Python SDK. The [Data Processing with Framework Processors](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks.html) page discusses other built-in processors you can use. The [Docker Registry Paths and Example Code](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html) page contains information about the available framework versions for each region.

The [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) requires a list of inputs that we need on the preprocessing script. In this case, the input is the dataset we stored in S3. We also have a few outputs that we want SageMaker to capture when the Processing Job finishes. SageMaker will upload every one of these outputs to the location specified by the `preprocessor_destination` parameter except the baseline data, which we will upload to the location specified by the `baseline_destination` parameter.

In [12]:
sklearn_processor = SKLearnProcessor(
    base_job_name="penguins-preprocessing",
    framework_version="0.23-1",
    instance_type="ml.t3.medium",
    instance_count=1,
    role=role,
)

preprocess_data_step = ProcessingStep(
    name="preprocess-data",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=dataset_location, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=preprocessor_destination),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=preprocessor_destination),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=preprocessor_destination),
        ProcessingOutput(output_name="pipeline", source="/opt/ml/processing/pipeline", destination=preprocessor_destination),
        ProcessingOutput(output_name="classes", source="/opt/ml/processing/classes", destination=preprocessor_destination),
        ProcessingOutput(output_name="train-baseline", source="/opt/ml/processing/train-baseline", destination=train_dataset_baseline_destination),
        ProcessingOutput(output_name="test-baseline", source="/opt/ml/processing/test-baseline", destination=test_dataset_baseline_destination),
    ],
    code=f"{PENGUINS_FOLDER}/preprocessor.py",
    cache_config=cache_config
)

## Step 6 - Running the Pipeline

Let's define and run the SageMaker Pipeline. Check [Pipeline Structure and Execution](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-pipeline.html) for more information about how to define a pipeline and [Run a Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/run-pipeline.html) for information about how to run it.

The pipeline uses the parameters we defined before and the Preprocess Step.

In [13]:
session1_pipeline = Pipeline(
    name="penguins-session1-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination
    ],
    steps=[
        preprocess_data_step, 
    ]
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist or update it if it does.

In [199]:
session1_pipeline.upsert(role_arn=role)
execution = session1_pipeline.start()

# Session 2 - Model Training and Tuning

This session extends the [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) we built in the previous session with a step to train a model. We'll explore the [Training](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) and the [Tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-tuning) steps. 

Here is what the Pipeline will look like at the end of this session:

<img src='penguins/images/session2-pipeline.png' alt='Session 2 Pipeline' width="600">


In [14]:
from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep
from sagemaker.parameter import IntegerParameter
from sagemaker.inputs import TrainingInput
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession

## Step 1 - Training the Model

This script is responsible for training a simple neural network on the train data, validating the model, and saving it so we can later use it.

In [15]:
%%writefile {PENGUINS_FOLDER}/train.py

import os
import argparse

import numpy as np
import pandas as pd
import tensorflow as tf

from pathlib import Path
from sklearn.metrics import accuracy_score

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD


def train(base_directory, train_path, validation_path, epochs=50, batch_size=32):
    X_train = pd.read_csv(Path(train_path) / "train.csv")
    y_train = X_train[X_train.columns[-1]]
    X_train.drop(X_train.columns[-1], axis=1, inplace=True)
    
    X_validation = pd.read_csv(Path(validation_path) / "validation.csv")
    y_validation = X_validation[X_validation.columns[-1]]
    X_validation.drop(X_validation.columns[-1], axis=1, inplace=True)
    
    model = Sequential([
        Dense(10, input_shape=(X_train.shape[1],), activation="relu"),
        Dense(8, activation="relu"),
        Dense(3, activation="softmax"),
    ])

    model.compile(
        optimizer=SGD(learning_rate=0.01),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    model.fit(
        X_train, 
        y_train, 
        validation_data=(X_validation, y_validation),
        epochs=epochs, 
        batch_size=batch_size,
        verbose=2,
    )

    predictions = np.argmax(model.predict(X_validation), axis=-1)
    print(f"Validation accuracy: {accuracy_score(y_validation, predictions)}")
    
    model_filepath = Path(base_directory) / "model" / "001"
    model.save(model_filepath)
    
if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to the entry point
    # as script arguments. SageMaker will also provide a list of special parameters
    # that you can capture here. Here is the full list: 
    # https://github.com/aws/sagemaker-training-toolkit/blob/master/src/sagemaker_training/params.py
    parser = argparse.ArgumentParser()
    parser.add_argument("--base_directory", type=str, default="/opt/ml/")
    parser.add_argument("--train_path", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", None))
    parser.add_argument("--validation_path", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION", None))
    parser.add_argument("--epochs", type=int, default=50)
    parser.add_argument("--batch_size", type=int, default=32)
    args, _ = parser.parse_known_args()
    
    train(
        base_directory=args.base_directory,
        train_path=args.train_path,
        validation_path=args.validation_path,
        epochs=args.epochs,
        batch_size=args.batch_size
    )

Overwriting penguins/train.py


## Step 2 - Testing the Training Script

Let's test the script we just created by running it locally.

In [16]:
from penguins.preprocessor import preprocess
from penguins.train import train


with tempfile.TemporaryDirectory() as directory:
    # First, we preprocess the data and create the 
    # dataset splits.
    preprocess(
        base_directory=directory, 
        data_filepath=LOCAL_FILEPATH
    )

    # Then, we train a model using the train and 
    # validation splits.
    train(
        base_directory=directory, 
        train_path=Path(directory) / "train", 
        validation_path=Path(directory) / "validation",
        epochs=10
    )

Epoch 1/10
Extension horovod.torch has not been built: /usr/local/lib/python3.8/site-packages/horovod/torch/mpi_lib/_mpi_lib.cpython-38-x86_64-linux-gnu.so not found
If this is not expected, reinstall Horovod with HOROVOD_WITH_PYTORCH=1 to debug the build error.
[2023-06-12 13:31:38.740 tensorflow-2-6-cpu-py-ml-t3-medium-9169b2e75617c45c79c40579f6a8:66 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None
[2023-06-12 13:31:38.800 tensorflow-2-6-cpu-py-ml-t3-medium-9169b2e75617c45c79c40579f6a8:66 INFO profiler_config_parser.py:111] Unable to find config at /opt/ml/input/config/profilerconfig.json. Profiler is disabled.
8/8 - 1s - loss: 1.2221 - accuracy: 0.4937 - val_loss: 1.2504 - val_accuracy: 0.3333
Epoch 2/10
8/8 - 0s - loss: 1.1652 - accuracy: 0.5439 - val_loss: 1.1918 - val_accuracy: 0.4118
Epoch 3/10
8/8 - 0s - loss: 1.1212 - accuracy: 0.5439 - val_loss: 1.1484 - val_accuracy: 0.4314
Epoch 4/10
8/8 - 0s - loss: 1.0865 - accuracy: 0.5690 - val_loss: 1.1156 - val_accuracy: 0.4314
E

INFO:tensorflow:Assets written to: /tmp/tmpjezhq4p0/model/001/assets


## Step 3 - Setting up a Training Step

We can now create a [Training Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) that we can add to the pipeline. This Training Step will create a SageMaker Training Job in the background, run the training script, and upload the output to S3. Check the [TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep) SageMaker's SDK documentation for more information. 

SageMaker uses the concept of an [Estimator](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html) to handle end-to-end training and deployment tasks. For this example, we will use the built-in [TensorFlow Estimator](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.tensorflow.html#tensorflow-estimator) to run the training script we wrote before. The [Docker Registry Paths and Example Code](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html) page contains information about the available framework versions for each region. Here, you can also check the available SageMaker [Deep Learning Container images](https://github.com/aws/deep-learning-containers/blob/master/available_images.md).

Notice the list of hyperparameters defined below. SageMaker will pass these hyperparameters as arguments to the entry point of the training script.

In [17]:
estimator = TensorFlow(
    entry_point=f"{PENGUINS_FOLDER}/train.py",
    
    hyperparameters={
        "epochs": 50,
        "batch_size": 32
    },
    
    framework_version="2.6",
    py_version="py38",
    instance_type="ml.m5.large",
    instance_count=1,
    script_mode=True,
    
    disable_profiler=True,
    role=role,
)

We can now create the [TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep) using the estimator we defined before.

This step will receive the train and validation split from the preprocessing step as inputs. Notice how we reference both splits using the `preprocess_data_step` variable. This creates a dependency between the Training and Processing Step we defined in Session 1. When we build a new Pipeline, we'll see that the Training Step will run once the Processing Step finishes.

In [18]:
train_model_step = TrainingStep(
    name="train-model",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

## Step 4 - Setting up a Tuning Step

Let's now create a [Tuning Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-tuning) to add it to our pipeline. This Tuning Step will create a SageMaker Hyperparameter Tuning Job in the background and use the training script to train different model variants and choose the best one. Check the [TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep) SageMaker's SDK documentation for more information.

The Tuning Step requires a [HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html) reference to configure the Hyperparameter Tuning Job. In this example, the tuner will use the same `Estimator` we defined to train the model.

Here is the configuration that we'll use to find the best model:

1. `objective_metric_name`: This is the name of the metric the tuner will use to determine the best model.
2. `objective_type`: This is the objective of the tuner. Should it "Minimize" the metric or "Maximize" it? In this example, since we are using the validation accuracy of the model, we want the objective to be "Maximize." If we were using the loss of the model, we would set the objective to "Minimize."
3. `metric_definitions`: Defines how the tuner will determine the metric's value by looking at the output logs of the training process.

The tuner expects the list of the hyperparameters you want to explore. You can use subclasses of the [Parameter](https://sagemaker.readthedocs.io/en/stable/api/training/parameter.html#sagemaker.parameter.ParameterRange) class to specify different types of hyperparameters. This example explores different values for the `epochs` hyperparameter.

Finally, you can control the number of jobs and how many of them will run in parallel using the following two arguments:

* `max_jobs`: Defines the maximum total number of training jobs to start for the hyperparameter tuning job.
* `max_parallel_jobs`: Defines the maximum number of parallel training jobs to start.

In [19]:
objective_metric_name = "val_accuracy"
objective_type = "Maximize"
metric_definitions = [{"Name": objective_metric_name, "Regex": "val_accuracy: ([0-9\\.]+)"}]
    
hyperparameter_ranges = {
    "epochs": IntegerParameter(10, 50),
}

tuner = HyperparameterTuner(
    estimator,
    objective_metric_name,
    hyperparameter_ranges,
    metric_definitions,
    objective_type=objective_type,
    max_jobs=3,
    max_parallel_jobs=3,
)

We can now create the [TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep). 

This step will use the tuner we configured before and will receive the train and validation split from the preprocessing step as inputs. Notice how we reference both splits using the `preprocess_data_step` variable. This creates a dependency between the Tuning and Processing Steps we defined in Session 1. When we build a new Pipeline, we'll see that the Tuning Step will run once the Processing Step finishes.

In [20]:
tune_model_step = TuningStep(
    name = "tune-model",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

## Step 5 - Switching Between Training and Tuning

We could use a [Training Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) or use a [Tuning Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-tuning) to create the model.

In this notebook, we will alternate between both methods and use the `USE_TUNING_STEP` flag to indicate which approach we want to run.

In [21]:
USE_TUNING_STEP = False

## Step 6 - Running the Pipeline

We can now define and run the SageMaker Pipeline using the Training or Tuning Step.

In [22]:
session2_pipeline = Pipeline(
    name="penguins-session2-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
    ],
    steps=[
        preprocess_data_step, 
        tune_model_step if USE_TUNING_STEP else train_model_step
    ]
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist or update it if it does.

In [44]:
session2_pipeline.upsert(role_arn=role)
execution = session2_pipeline.start()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


# Session 3 - Model Registration

This session extends the [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) with a step to evaluate the model and a step to register a new model if it reaches a predefined accuracy threshold. We'll use a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) with a [ScriptProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor) running TensorFlow to execute an evaluation script. We'll use a [Condition Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) to determine whether the model's accuracy is above a threshold and a [Model Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model) to register the model. After we register the model, we'll deploy it manually. To learn more about the Model Registry, check [Register and Deploy Models with Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html).

Here is what the Pipeline will look like at the end of this session:

<img src='penguins/images/session3-pipeline.png' alt='Session 3 Pipeline' width="600">

In [23]:
import time
import tarfile

from sagemaker import ModelPackage
from sagemaker.tensorflow import TensorFlowProcessor
from sagemaker.model import Model
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.predictor import Predictor
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.functions import Join
from sagemaker.workflow.properties import PropertyFile

## Step 1 - Evaluating the Model

This script is responsible for loading the model we created and evaluating it on the test set. Before finishing, this script will generate an evaluation report of the model.

In [24]:
%%writefile {PENGUINS_FOLDER}/evaluation.py

import os
import json
import tarfile
import numpy as np
import pandas as pd

from pathlib import Path
from tensorflow import keras
from sklearn.metrics import accuracy_score


MODEL_PATH = "/opt/ml/processing/model/"
TEST_PATH = "/opt/ml/processing/test/"
OUTPUT_PATH = "/opt/ml/processing/evaluation/"


def evaluate(model_path, test_path, output_path):
    # The first step is to extract the model package provided
    # by SageMaker.
    with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
        tar.extractall(path=Path(model_path))
        
    # We can now load the model from disk.
    model = keras.models.load_model(Path(model_path) / "001")
    
    X_test = pd.read_csv(Path(test_path) / "test.csv")
    y_test = X_test[X_test.columns[-1]]
    X_test.drop(X_test.columns[-1], axis=1, inplace=True)
    
    predictions = np.argmax(model.predict(X_test), axis=-1)
    accuracy = accuracy_score(y_test, predictions)
    print(f"Test accuracy: {accuracy}")

    # Let's add the accuracy of the model to our evaluation report.
    evaluation_report = {
        "metrics": {
            "accuracy": {
                "value": accuracy
            },
        },
    }
    
    # We need to save the evaluation report to the output path.
    Path(output_path).mkdir(parents=True, exist_ok=True)
    with open(Path(output_path) / "evaluation.json", "w") as f:
        f.write(json.dumps(evaluation_report))


if __name__ == "__main__":
    evaluate(
        model_path=MODEL_PATH, 
        test_path=TEST_PATH,
        output_path=OUTPUT_PATH
    )

Overwriting penguins/evaluation.py


## Step 2 - Testing the Evaluation Script

Let's test the script we just created by running it locally.

In [25]:
from penguins.preprocessor import preprocess
from penguins.train import train
from penguins.evaluation import evaluate


with tempfile.TemporaryDirectory() as directory:
    # First, we preprocess the data and create the 
    # dataset splits.
    preprocess(
        base_directory=directory, 
        data_filepath=LOCAL_FILEPATH
    )

    # Then, we train a model using the train and 
    # validation splits.
    train(
        base_directory=directory, 
        train_path=Path(directory) / "train", 
        validation_path=Path(directory) / "validation",
        epochs=10
    )
    
    # After training a model, we need to prepare a package just like
    # SageMaker would. This package is what the evaluation script is
    # expecting as an input.
    with tarfile.open(Path(directory) / "model.tar.gz", "w:gz") as tar:
        tar.add(Path(directory) / "model" / "001", arcname="001")
        
    
    # We can now call the evaluation script.
    evaluate(
        model_path=directory, 
        test_path=Path(directory) / "test",
        output_path=Path(directory) / "evaluation",
    )

Epoch 1/10
8/8 - 1s - loss: 1.2669 - accuracy: 0.3975 - val_loss: 1.3113 - val_accuracy: 0.2941
Epoch 2/10
8/8 - 0s - loss: 1.1508 - accuracy: 0.4017 - val_loss: 1.1886 - val_accuracy: 0.2941
Epoch 3/10
8/8 - 0s - loss: 1.0648 - accuracy: 0.4059 - val_loss: 1.0958 - val_accuracy: 0.2941
Epoch 4/10
8/8 - 0s - loss: 0.9951 - accuracy: 0.4142 - val_loss: 1.0179 - val_accuracy: 0.2941
Epoch 5/10
8/8 - 0s - loss: 0.9346 - accuracy: 0.4393 - val_loss: 0.9484 - val_accuracy: 0.3529
Epoch 6/10
8/8 - 0s - loss: 0.8804 - accuracy: 0.5523 - val_loss: 0.8878 - val_accuracy: 0.5294
Epoch 7/10
8/8 - 0s - loss: 0.8327 - accuracy: 0.6611 - val_loss: 0.8367 - val_accuracy: 0.7059
Epoch 8/10
8/8 - 0s - loss: 0.7904 - accuracy: 0.7406 - val_loss: 0.7896 - val_accuracy: 0.7451
Epoch 9/10
8/8 - 0s - loss: 0.7515 - accuracy: 0.7741 - val_loss: 0.7472 - val_accuracy: 0.8235
Epoch 10/10
8/8 - 0s - loss: 0.7161 - accuracy: 0.7866 - val_loss: 0.7085 - val_accuracy: 0.8431
Validation accuracy: 0.8431372549019608

INFO:tensorflow:Assets written to: /tmp/tmpx6p5enxu/model/001/assets


Test accuracy: 0.7647058823529411


## Step 3 - Setting up a Processing Step

To run the evaluation script, we can use a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing). Check the [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) SageMaker's SDK documentation for more information.

Whenever you want to run a Processing Job using a machine learning framework, you can use an instance of the [FrameworkProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.FrameworkProcessor) class. For example, the [TensorFlowProcessor](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks-tensorflow.html) subclass will give you access to TensorFlow. You can also configure a Processing Job from scratch using a [ScriptProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor) instance combined with the [sagemaker.image_uris.retrieve()](https://sagemaker.readthedocs.io/en/stable/api/utility/image_uris.html) function for generating the URI of one of the SageMaker pre-built docker images. This time, we will use a [TensorFlowProcessor](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks-tensorflow.html) because we need our script to have access to TensorFlow and Scikit-Learn.

The inputs of this Processing Step will be the model we created and the test set we generated during the preprocessing phase. The output will be the evaluation report file.

At this point, we create a model using either a Training Step or a Tuning Step, so we can use the `USE_TUNING_STEP` flag to configure the input to the Processing Step. In case we are using the Tuning Step, we can use the [TuningStep.get_top_model_s3_uri()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri) function to get the model artifacts from the top performing training job of the Hyperparameter Tuning Job.

The [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) lets us specify a list of [PropertyFile](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.properties.PropertyFile) instances from the output of the job. We can use this to map the evaluation report generated in the evaluation script. Check [How to Build and Manage Property Files](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html) for more information.

We also need to define a new Pipeline parameter with the location where the Processing Step will store the report.

In [26]:
tensorflow_processor = TensorFlowProcessor(
    framework_version="2.6",
    py_version="py38",
    base_job_name="penguins-evaluation-processor",
    instance_type="ml.m5.large",
    instance_count=1,
    role=role
)

# By default, the TensorFlowProcessor runs the script using
# /bin/bash as its entrypoint. We want to ensure we run it 
# using python3.
tensorflow_processor.framework_entrypoint_command = ["python3"]


# We want to map the evaluation report that we generate inside
# the evaluation script so we can later reference it.
evaluation_report = PropertyFile(
    name="evaluation-report",
    output_name="evaluation",
    path="evaluation.json"
)


# Notice how this step uses the model generated by the tuning or training
# step, and the test set generated by the preprocessing step.
evaluate_model_step = ProcessingStep(
    name="evaluate-model",
    processor=tensorflow_processor,
    inputs=[
        ProcessingInput(
            source=(
                tune_model_step.get_top_model_s3_uri(top_k=0, s3_bucket=sagemaker_session.default_bucket()) 
                if USE_TUNING_STEP 
                else train_model_step.properties.ModelArtifacts.S3ModelArtifacts
            ),
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=preprocess_data_step.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=f"{S3_FILEPATH}/evaluation"),
    ],
    code=f"{PENGUINS_FOLDER}/evaluation.py",
    property_files=[evaluation_report],
    cache_config=cache_config
)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


## Step 4 - Configuring the Model Metrics

When we register a model, we can specify a set of [ModelMetrics](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_metrics.ModelMetrics). We can use the evaluation report we generated during the Evaluation step to populate these statistics.

In [183]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(on="/", values=[
            evaluate_model_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'],
            "evaluation.json"]
        ),
        content_type="application/json",
    )
)

Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3610.jsonl...


## Step 5 - Registering the Model

We can now create a [Model Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model) to register the model. Check the [ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep) SageMaker's SDK documentation for more information. We aim to create a new version of the model and register it in the Model Registry. Check [Register a Model Version](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-version.html) for more information about model registration.

The model we trained uses TensorFlow, so we can use the built-in [TensorFlowModel](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.tensorflow.html#tensorflow-serving-model) class to create an instance of the model.

Notice that we use an instance of the [PipelineSession](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline_context.PipelineSession) class to create the model. This special session does not register the model immediately when you call `model.register()`, instead, it captures the arguments required to register a model, and delegate it to the `ModelStep` to register the model later during pipeline execution.

In [28]:
model_package_group_name = "penguins"

model = TensorFlowModel(
    model_data=(
        tune_model_step.get_top_model_s3_uri(top_kabs=0, s3_bucket=sagemaker_session.default_bucket())
        if USE_TUNING_STEP
        else train_model_step.properties.ModelArtifacts.S3ModelArtifacts
    ),
    framework_version="2.6",
    sagemaker_session=PipelineSession(),
    role=role,
)

register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        approval_status="Approved",
        
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.m5.large"],
        transform_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="TENSORFLOW",
        framework_version="2.6",
    ),
)

INFO:sagemaker.tensorflow.model:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


## Step 6 - Setting up a Condition Step

We only want to register a new model if its accuracy exceeds a predefined threshold. We can use a [Condition Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) together with the evaluation report we generated in the Evaluation step to accomplish this. Check the [ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#conditionstep) SageMaker's SDK documentation for more information.

In this example, we will use a [ConditionGreaterThanOrEqualTo](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.conditions.ConditionGreaterThanOrEqualTo) condition to compare the model's accuracy with the threshold. Look at the [Conditions](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#conditions) section in the documentation for more information about the types of supported conditions.

If the model's accuracy is not greater than or equal our threshold, we will send the pipeline to a [Fail Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-fail) with the appropriate error message. Check the [FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep) SageMaker's SDK documentation for more information.

We are going to use a new [Pipeline Parameter](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html) in our pipeline to specify the minimum accuracy that the model should reach for it to be registered.

In [29]:
accuracy_threshold = ParameterFloat(
    name="accuracy_threshold", 
    default_value=0.70
)

condition_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluate_model_step.name,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value"
    ),
    right=accuracy_threshold
)

fail_step = FailStep(
    name="fail",
    error_message=Join(
        on=" ", 
        values=[
            "Execution failed because the model's accuracy was lower than", 
            accuracy_threshold
        ]
    ),
)

condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition_gte],
    if_steps=[register_model_step],
    else_steps=[fail_step], 
)

## Step 7 - Running the Pipeline

We can now add the registration of the model to the pipeline. Notice how we add the Condition Step, which will call the Model Step if the condition passes.

In [30]:
session3_pipeline = Pipeline(
    name="penguins-session3-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
        accuracy_threshold,
    ],
    steps=[
        preprocess_data_step, 
        tune_model_step if USE_TUNING_STEP else train_model_step, 
        evaluate_model_step,
        condition_step
    ],
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist or update it if it does.

In [55]:
session3_pipeline.upsert(role_arn=role)
execution = session3_pipeline.start()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


# Session 4 - Model Deployment

This session extends the [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) with a step to deploy the model to an endpoint. We'll use a [Lambda Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-lambda) to create an endpoint and deploy the model. To control the endpoint's inputs and outputs, we'll modify the model's assets to include code that customizes the processing of a request. 

At the end of this session, our Pipeline will look like this:

<img src='penguins/images/session4-pipeline.png' alt='Session 4 Pipeline' width="600">


In [162]:
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.tensorflow.model import TensorFlowPredictor
from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum
from sagemaker.workflow.parameters import ParameterBoolean
from sagemaker.lambda_helper import Lambda
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.s3 import S3Downloader

Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3440.jsonl...


## Step 1 - Deploy Latest Model From Registry

Let's get the latest approved model from the Model Registry and deploy it to an endpoint.

We can use `boto3` to query the list of approved models and get the latest one. Check the [boto3 SageMaker Client API](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html) for a list of every available method.

In [126]:
response = sagemaker_client.list_model_packages(
    ModelPackageGroupName=model_package_group_name,
    ModelApprovalStatus="Approved",
    SortBy="CreationTime",
    MaxResults=1,
)

package = response["ModelPackageSummaryList"][0]
package

{'ModelPackageGroupName': 'penguins',
 'ModelPackageVersion': 15,
 'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/15',
 'CreationTime': datetime.datetime(2023, 6, 12, 15, 33, 34, 494000, tzinfo=tzlocal()),
 'ModelPackageStatus': 'Completed',
 'ModelApprovalStatus': 'Approved'}

Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/3749.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/3819.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/3849.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/3919.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/3950.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/4020.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/4050.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/4120.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/4150.jsonl...
Generating 334 predictions...
Uploading ground truth da

Let's define the name of the endpoint where we'll deploy the model.

In [163]:
endpoint_name = "penguins-endpoint"

Using the ARN of the model package from the Model Registry, we can deploy the model by creating a [ModelPackage](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.ModelPackage) instance and calling its `deploy()` function. The model information lives in the Model Registry, so we don't need to specify anything else.

In [90]:
model_package = ModelPackage(
    model_package_arn=package["ModelPackageArn"], 
    sagemaker_session=sagemaker_session,
    role=role, 
)

model_package.deploy(
    endpoint_name=endpoint_name,
    initial_instance_count=1, 
    instance_type="ml.m5.large",
)

INFO:sagemaker:Creating model with name: 9-2023-06-06-13-33-18-267
INFO:sagemaker:Creating endpoint-config with name penguins-endpoint
INFO:sagemaker:Creating endpoint with name penguins-endpoint


----!

Using a [Predictor](https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html#sagemaker.predictor.Predictor) from the endpoint name, we can test our model.

The payload we need to provide the model is in CSV format. Notice how the model expects data that's already transformed. We can't provide the original data from our dataset because the model will not work with it.

In [None]:
predictor = Predictor(endpoint_name=endpoint_name)

payload = """
0.6569590202313976, -1.0813829646495108, 1.2097102831892812, 0.9226343641317372, 1.0, 0.0, 0.0
-0.7751048801481084, 0.8822689351285553,  -1.2168066120762704, 0.9226343641317372, 0.0, 1.0, 0.0
-0.837387834894918, 0.3386660813829646, -0.26237731892812, -1.92351941317372, 0.0, 0.0, 1.0
"""

response = predictor.predict(payload, initial_args={"ContentType": "text/csv"})

# We can decode the output of the endpoint and print the "predictions" key.
predictions = json.loads(response.decode("utf-8"))["predictions"]
print(f"Prediction: {np.argmax(predictions, axis=1)}")
predictions

Let's now delete the endpoint.predictor.delete_endpoint()

In [None]:
predictor.delete_endpoint()

## Step 2 - Preparing the Inference Code

Deploying the model we trained directly to an endpoint doesn't lets us control the data that goes in and comes out of the endpoint. Fortunately, SageMaker allows us to include an `inference.py` file with the model assets from where we can control how the endpoint works. You can see more information about how this works by checking the [SageMaker TensorFlow Serving Container](https://github.com/aws/sagemaker-tensorflow-serving-container) documentation.

We want our endpoint to handle unprocessed data in JSON format. Here is an example of the payload we want the endpoint to support:

```
{
    "island": "Biscoe",
    "culmen_length_mm": 48.6,
    "culmen_depth_mm": 16.0,
    "flipper_length_mm": 230.0,
    "body_mass_g": 5800.0,
}
```


Let's start by setting up a local folder where we will create the `inference.py` script.

In [164]:
CODE_FOLDER = PENGUINS_FOLDER / "code"

Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-109-f71db0bcdb29>", line 22, in _generate_prediction_data
  File "<ipython-input-109-f71db0bcdb29>", line 13, in _predict
  File "/usr/local/lib/python3.8/site-packages/sagemaker/predictor.py", line 161, in predict
    response = self.sagemaker_session.sagemaker_runtime_client.invoke_endpoint(**request_args)
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 530, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 964, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.ValidationError: An error occurred (ValidationError) when calling the InvokeEnd

We will include the inference code as part of the model assets to control the inference process on the SageMaker endpoint. SageMaker will automatically call the `handler()` function for every request to the endpoint.

In [165]:
%%writefile $CODE_FOLDER/inference.py

import os
import json
import boto3
import requests
import numpy as np
import pandas as pd

from pickle import load
from pathlib import Path


PIPELINE_FILE = Path("/tmp") / "pipeline.pkl"
CLASSES_FILE = Path("/tmp") / "classes.csv"

s3 = boto3.resource("s3")


def handler(data, context):
    """
    This is the entrypoint that will be called by SageMaker when the endpoint
    receives a request. You can see more information at 
    https://github.com/aws/sagemaker-tensorflow-serving-container.
    """
    print("Handling endpoint request")
    
    instance = _process_input(data, context)
    output = _predict(instance, context)
    return _process_output(output, context)


def _process_input(data, context):
    print("Processing input data...")
    
    if context is None:
        # The context will be None when we are testing the code
        # directly from a notebook. In that case, we can use the
        # data directly.
        endpoint_input = data
    elif context.request_content_type in ("application/json", "application/octet-stream"):
        # When the endpoint is running, we will receive a context
        # object. We need to parse the input and turn it into 
        # JSON in that case.
        endpoint_input = json.loads(data.read().decode("utf-8"))

        if endpoint_input is None:
            raise ValueError("There was an error parsing the input request.")
    else:
        raise ValueError(f"Unsupported content type: {context.request_content_type or 'unknown'}")
        
    return _transform(endpoint_input)


def _predict(instance, context):
    print("Sending input data to model to make a prediction...")
    
    model_input = json.dumps({"instances": [instance]})
    
    if context is None:
        # The context will be None when we are testing the code
        # directly from a notebook. In that case, we want to return
        # a fake prediction back.
        result = {
            "predictions": [
                [0.2, 0.5, 0.3]
            ]
        }
    else:
        # When the endpoint is running, we will receive a context
        # object. In that case we need to send the instance to the
        # model to get a prediction back.
        response = requests.post(context.rest_uri, data=model_input)
        
        if response.status_code != 200:
            raise ValueError(response.content.decode('utf-8'))
            
        result = json.loads(response.content)
    
    print(f"Response: {result}")
    return result


def _process_output(output, context):
    print("Processing prediction received from the model...")
    
    response_content_type = "application/json" if context is None else context.accept_header
    
    prediction = np.argmax(output["predictions"][0])
    confidence = output["predictions"][0][prediction]
    
    print(f"Prediction: {prediction}. Confidence: {confidence}")
    
    result = json.dumps({
        "species": _get_class(prediction),
        "prediction": int(prediction),
        "confidence": confidence
    }), response_content_type
    
    return result


def _get_pipeline():
    """
    This function will download the Scikit-Learn pipeline from S3 if it doesn't
    already exist. The function will use the `S3_LOCATION` environment
    variable to determine the location of the pipeline.
    """
    
    if not PIPELINE_FILE.exists():
        s3_uri = os.environ.get("S3_LOCATION", None)
        
        s3_parts = s3_uri.split('/', 3)
        bucket = s3_parts[2]
        key = s3_parts[3]

        s3.Bucket(bucket).download_file(f"{key}/pipeline.pkl", str(PIPELINE_FILE))
        
    return load(open(PIPELINE_FILE, 'rb'))


def _get_class(prediction):
    """
    This function returns the class name of a given prediction. 
    
    The function downloads the file with the list of classes from S3 if it doesn't
    already exist. The function will use the `S3_LOCATION` environment
    variable to determine the location of the file.
    """
    
    if not CLASSES_FILE.exists():
        s3_uri = os.environ.get("S3_LOCATION", None)
        
        s3_parts = s3_uri.split('/', 3)
        bucket = s3_parts[2]
        key = s3_parts[3]

        s3.Bucket(bucket).download_file(f"{key}/classes.csv", str(CLASSES_FILE))
            
    with open(CLASSES_FILE) as f:
        file = f.readlines()
        
    classes = list(map(lambda x: x.replace("'", ""), file[0].split(',')))
    return classes[prediction]


def _transform(payload):
    """
    This function transforms the payload in the request using the
    Scikit-Learn pipeline that we created during the preprocessing step.
    """
    
    print("Transforming input data...")

    island = payload.get("island", "")
    culmen_length_mm = payload.get("culmen_length_mm", 0)
    culmen_depth_mm = payload.get("culmen_depth_mm", 0)
    flipper_length_mm = payload.get("flipper_length_mm", 0)
    body_mass_g = payload.get("body_mass_g", 0)
    
    data = pd.DataFrame(
        columns=["island", "culmen_length_mm", "culmen_depth_mm", "flipper_length_mm", "body_mass_g"], 
        data=[[
            island, 
            culmen_length_mm, 
            culmen_depth_mm, 
            flipper_length_mm, 
            body_mass_g
        ]]
    )
    
    result = _get_pipeline().transform(data)
    return result[0].tolist()


Overwriting penguins/code/inference.py


SageMaker's default TensorFlow inference container doesn't come with Scikit-Learn installed, so we need to provide a `requirements.txt` file with the libraries we want SageMaker to install in our endpoint.

In [166]:
%%writefile $CODE_FOLDER/requirements.txt

numpy==1.19.5
pandas==1.2.5
scikit-learn==0.23.2

Overwriting penguins/code/requirements.txt


## Step 3 - Testing the Inference Code

Let's test the inference code locally to ensure it works before deploying it. The `handler()` function is the entry point that will be called by SageMaker whenever the endpoint receives a request.

When testing the inference code, we want to set the `context` to `None` so the function recognizes we are calling it locally. We also want to set the `S3_LOCATION` environment variable to the S3 location of the Scikit-Learn pipeline and the list of supported classes.

In [167]:
%env S3_LOCATION=$preprocessor_destination.default_value

env: S3_LOCATION=s3://mlschool/penguins/preprocessing


In [168]:
from penguins.code.inference import handler

handler(
    data={
        "island": "Biscoe",
        "culmen_length_mm": 48.6,
        "culmen_depth_mm": 16.0,
        "flipper_length_mm": 230.0,
        "body_mass_g": 5800.0,
    }, 
    context=None
)

Handling endpoint request
Processing input data...
Transforming input data...
Sending input data to model to make a prediction...
Response: {'predictions': [[0.2, 0.5, 0.3]]}
Processing prediction received from the model...
Prediction: 1. Confidence: 0.5




('{"species": "Chinstrap", "prediction": 1, "confidence": 0.5}',
 'application/json')

## Step 4 - Registering the Model

We can now register a new [TensorFlowModel](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.tensorflow.html#tensorflow-serving-model). We must also ensure SageMaker repackages the model assets to include the `inference.py` file.

SageMaker triggers a repack whenever we specify the `source_dir` attribute. We want that attribute to point to the local folder containing the `inference.py` file. SageMaker will automatically modify the original `model.tar.gz` package to include a `/code` folder containing the file. Since we need access to Scikit-Learn in our script, we can include a `requirements.txt` file in the same `/code` folder, and SageMaker will install everything in it. To repack the model assets, SageMaker will automatically include a new step in the pipeline right before registering the model.

Here is what the new `model.tar.gz` package will look like:

```
model/
    |--[model_version_number]
        |--assets/
        |--variables/
        |--saved_model.pb
code/
    |--inference.py
    |--requirements.txt
```

Let's use a [ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep) to register the model. Notice the following:

* `model_data`: We use the model assets we generated during the Training or Tuning Step. We determined which assets to use back in Session 4 and stored them in the `model_data` variable.
* `source_dir`: This points to the local folder containing the `inference.py` file. SageMaker will trigger a repack to include the `/code` folder in the model assets.
* `env`: Our custom inference code expects an environment variable `S3_LOCATION` to point to the location of the Scikit-Learn pipeline.

In [184]:
model = TensorFlowModel(
    model_data=train_model_step.properties.ModelArtifacts.S3ModelArtifacts,
    entry_point="inference.py",
    source_dir=str(CODE_FOLDER),
    env={
        "S3_LOCATION": preprocessor_destination,
    },
    framework_version="2.6",
    sagemaker_session=PipelineSession(),
    role=role,
)

register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        approval_status="Approved",
        
        content_types=["application/json"],
        response_types=["application/json"],
        inference_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="TENSORFLOW",
        framework_version="2.6",
    )
)

INFO:sagemaker.tensorflow.model:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


## Step 5 - Deploying the Model

Let's use a [Lambda Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-lambda) to deploy the model automatically.

Let's start by writing the Lambda function to take the model information and create a new hosting endpoint.

In [185]:
%%writefile $PENGUINS_FOLDER/lambda.py

import os
import json
import boto3
import time

sagemaker = boto3.client("sagemaker")

def lambda_handler(event, context):
    model_package_arn = event["model_package_arn"]
    endpoint_name = event["endpoint_name"]
    
    data_capture_percentage = event["data_capture_percentage"]
    data_capture_destination = event["data_capture_destination"]
    
    role = event["role"]
    
    timestamp = time.strftime("%m%d%H%M%S", time.localtime())
    model_name = f"penguins-model-{timestamp}"
    endpoint_config_name = f"penguins-endpoint-config-{timestamp}"

    sagemaker.create_model(
        ModelName=model_name, 
        ExecutionRoleArn=role, 
        Containers=[{
            "ModelPackageName": model_package_arn
        }] 
    )

    sagemaker.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "ModelName": model_name,
                "InstanceType": "ml.m5.large",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "VariantName": "AllTraffic",
            }
        ],
        DataCaptureConfig={
            "EnableCapture": True,
            "InitialSamplingPercentage": data_capture_percentage,
            "DestinationS3Uri": data_capture_destination,
            "CaptureOptions": [
                {
                    'CaptureMode': "Input"
                },
                {
                    'CaptureMode': "Output"
                },
            ],
            "CaptureContentTypeHeader": {
                "JsonContentTypes": [
                    "application/json",
                    "application/octect-stream"
                ]
            }
        },
    )

    sagemaker.create_endpoint(
        EndpointName=endpoint_name, 
        EndpointConfigName=endpoint_config_name,
    )
    
    return {
        "statusCode": 200,
        "body": json.dumps("Endpoint deployed successfully")
    }

Overwriting penguins/lambda.py


We need to ensure our Lambda function has permission to interact with SageMaker, so let's create a new role to run the function.

In [186]:
def create_lambda_role(role_name):
    try:
        response = iam_client.create_role(
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "lambda.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description="Lambda Pipeline Role"
        )

        role_arn = response['Role']['Arn']

        iam_client.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        )

        iam_client.attach_role_policy(
            PolicyArn='arn:aws:iam::aws:policy/AmazonSageMakerFullAccess',
            RoleName=role_name
        )

        return role_arn

    except iam_client.exceptions.EntityAlreadyExistsException:
        response = iam_client.get_role(RoleName=role_name)
        return response['Role']['Arn']


lambda_role = create_lambda_role("lambda-pipeline-role")

## Step 6 - Setting up the Lambda Step

Let's define the [LambdaStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.lambda_step.LambdaStep) that will run the function to deploy the model.

We can use [Data Capture](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-data-capture.html) to record the inputs and outputs of the endpoint to use them later for monitoring the model. We'll enable Data Capture using the following settings:

* `data_capture_percentage`: Represents the percentage of information that flows through the endpoint that we want to capture. For this example, we'll set that to 100%.
* `data_capture_destination`: Specifies the S3 location where we want to store the captured data.


In [187]:
data_capture_percentage = ParameterInteger(
    name="data_capture_percentage",
    default_value=100,
)

data_capture_destination = ParameterString(
    name="data_capture_destination",
    default_value=f"{S3_FILEPATH}/monitoring/data-capture",
)

deploy_fn = Lambda(
    function_name="deploy_fn",
    execution_role_arn=lambda_role,
    script=str(PENGUINS_FOLDER / "lambda.py"),
    handler="lambda.lambda_handler",
    timeout=600
)

deploy_fn.upsert()

deploy_step = LambdaStep(
    name="deploy",
    lambda_func=deploy_fn,
    inputs={
        # We use the ARN of the model we registered to
        # deploy it to the endpoint.
        "model_package_arn": register_model_step.properties.ModelPackageArn,

        "endpoint_name": endpoint_name,
        
        "data_capture_percentage": data_capture_percentage,
        "data_capture_destination": data_capture_destination,
        
        "role": role,
    }
)

## Step 7 - Modifying the Condition Step

We need to modify the [Condition Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) to include the new Deploy Step we just created. If the condition succeeds, we will register and deploy the custom model.

In [188]:
condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition_gte],
    if_steps=[
        register_model_step, deploy_step
    ],
    else_steps=[fail_step], 
)

## Step 8 - Running the Pipeline

We can now run the pipeline. If the pipeline succeeds, there will be a new running endpoint.

In [189]:
session4_pipeline = Pipeline(
    name="penguins-session4-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
        timestamp_signature,
        accuracy_threshold,
        data_capture_percentage,
        data_capture_destination,
    ],
    steps=[
        preprocess_data_step, 
        train_model_step, 
        evaluate_model_step,
        condition_step
    ],
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist or update it if it does.

In [190]:
session4_pipeline.upsert(role_arn=role)
execution = session4_pipeline.start()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3641.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3711.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3741.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3811.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3842.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3912.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3942.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/4012.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/4042.jsonl...
Uploading ground truth data to s3://mlschool/penguins/m

## Step 9 - Testing the Endpoint

We can now create a [Predictor](https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html) to test the endpoint with a few examples.

First, let's wait for the endpoint to be ready to service traffic.

In [191]:
waiter = sagemaker_client.get_waiter("endpoint_in_service")
waiter.wait(
    EndpointName=endpoint_name,
    WaiterConfig={
        "Delay": 10,
        "MaxAttempts": 30
    }
)

Now that the endpoint is in service, we can create a [Predictor](https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html) using a JSON serializer and a deserializer to have it automatically serialize and deserialize the information to and from the endpoint. Check [Serializers](https://sagemaker.readthedocs.io/en/stable/api/inference/serializers.html) and [Deserializers](https://sagemaker.readthedocs.io/en/stable/api/inference/deserializers.html) for a list of supported serializers and deserializers.

In [192]:
predictor = Predictor(
    endpoint_name=endpoint_name,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer()
)

Running one example through the endpoint.

In [193]:
predictor.predict({
    "island": "Dream",
    "culmen_length_mm": 46.4,
    "culmen_depth_mm": 18.6,
    "flipper_length_mm": 190.0,
    "body_mass_g": 3450.0,
})

{'species': 'Chinstrap', 'prediction': 1, 'confidence': 0.456264049}

Running another example.

In [194]:
predictor.predict({
    "island": "Biscoe",
    "culmen_length_mm": 48.6,
    "culmen_depth_mm": 16.0,
    "flipper_length_mm": 230.0,
    "body_mass_g": 5800.0,
})

{'species': 'Gentoo', 'prediction': 2, 'confidence': 0.974682}

## Step 10 - Cleaning up

Before you finish, don't forget to clean up after yourself.

In [195]:
predictor.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: penguins-endpoint-config-0612175110
INFO:sagemaker:Deleting endpoint with name: penguins-endpoint


# Session 5 - Data Monitoring

This session aims to set up a monitoring process to analyze the quality of the data our endpoint receives in production. For this, we will have SageMaker capture and evaluate the data observed by the endpoint.

To enable this functionality, we need a couple of steps:

1. Create a baseline to compare the real-time traffic.
2. Set up a schedule to continuously evaluate and compare against the baseline.

Notice that the Data Quality process uses the baseline dataset we generated during preprocessing. This baseline dataset is the same unprocessed train set in JSON format. We do this because we transformed the train data during the preprocessing step, but we need raw data because that's what the endpoint expects.

Check [Amazon SageMaker Model Monitor](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_monitoring.html) for a brief explanation of how to use SageMaker's Model Monitoring functionality. [Monitor models for data and model quality, bias, and explainability](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html) is a much more extensive guide to monitoring in Amazon SageMaker.

Here is what the Pipeline will look like at the end of this session:

<img src='penguins/images/session5-pipeline.png' alt='Session 5 Pipeline' width="600">


In [196]:
import random

from time import sleep
from datetime import datetime
from threading import Thread, Event

from IPython.display import JSON

from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep
from sagemaker.workflow.execution_variables import ExecutionVariables

from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.workflow.parameters import ParameterBoolean
from sagemaker.model import Model
from sagemaker.model_monitor import CronExpressionGenerator, EndpointInput, DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.s3 import S3Uploader

## Step 1 - Generating a Baseline

Let's now configure the [Quality Check Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-quality-check) and feed it the train set we generated in the preprocessing step.

We can configure the instance that will run the quality check using the [CheckJobConfig](https://sagemaker.readthedocs.io/en/v2.73.0/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.check_job_config.CheckJobConfig) class, and we can use the `DataQualityCheckConfig` class to configure the job.

In [197]:
data_quality_location = f"{S3_FILEPATH}/monitoring/data-quality/"

data_quality_baseline_step = QualityCheckStep(
    name="generate-data-quality-baseline",
    
    check_job_config = CheckJobConfig(
        instance_type="ml.t3.xlarge",
        instance_count=1,
        volume_size_in_gb=20,
        sagemaker_session=sagemaker_session,
        role=role,
    ),
    
    quality_check_config = DataQualityCheckConfig(
        # We will use the train dataset we generated during the preprocessing 
        # step to generate the data quality baseline.
        baseline_dataset=preprocess_data_step.properties.ProcessingOutputConfig.Outputs["train-baseline"].S3Output.S3Uri,

        dataset_format=DatasetFormat.json(lines=True),
        # output_s3_uri=Join(on='/', values=[S3_FILEPATH, "monitoring", "data-quality"]),
        output_s3_uri=data_quality_location
    ),
    
    skip_check=True,
    register_new_baseline=True,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


## Step 2 - Running the Pipeline

We can now run the pipeline.

In [198]:
session5_pipeline = Pipeline(
    name="penguins-session5-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
        timestamp_signature,
        data_capture_percentage,
        data_capture_destination,       
        accuracy_threshold,
    ],
    steps=[
        preprocess_data_step, 
        data_quality_baseline_step,
        train_model_step, 
        evaluate_model_step,
        condition_step
    ],
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist or update it if it does.

In [199]:
session5_pipeline.upsert(role_arn=role)
execution = session5_pipeline.start()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/18/1055.jsonl...


## Step 3 - Setting Up a Predictor

We can now create a [Predictor](https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html) from the endpoint.

In [201]:
waiter = sagemaker_client.get_waiter("endpoint_in_service")
waiter.wait(
    EndpointName=endpoint_name,
    WaiterConfig={
        "Delay": 10,
        "MaxAttempts": 30
    }
)

predictor = Predictor(
    endpoint_name=endpoint_name,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer()
)

## Step 4 - Generating Endpoint Traffic

Let's generate some traffic for our endpoint so we can test the monitoring functionality. We will repeatedly send every sample from the dataset to the endpoint to simulate real prediction requests.

In [202]:
def generate_traffic():
    
    def _predict(data, predictor, stop_traffic_thread):
        for index in data.index:
            payload = {
                "island": data["island"][index],
                "culmen_length_mm": data["culmen_length_mm"][index],
                "culmen_depth_mm": data["culmen_depth_mm"][index],
                "flipper_length_mm": data["flipper_length_mm"][index],
                "body_mass_g": data["body_mass_g"][index],
            }

            predictor.predict(payload, inference_id=str(index))
            sleep(1)

            if stop_traffic_thread.is_set():
                break

    def _generate_prediction_data(data, predictor, stop_traffic_thread):
        while True:
            print(f"Generating {data.shape[0]} predictions...")
            _predict(data, predictor, stop_traffic_thread)
            
            if stop_traffic_thread.is_set():
                break

                
    stop_traffic_thread = Event()
    data = pd.read_csv(LOCAL_FILEPATH).dropna()
    
    traffic_thread = Thread(
        target=_generate_prediction_data,
        args=(data, predictor, stop_traffic_thread,)
    )
    
    traffic_thread.start()
    
    return stop_traffic_thread, traffic_thread


Let's start generating the traffic.

In [203]:
stop_traffic_thread, traffic_thread = generate_traffic()

Generating 334 predictions...


## Step 5 - Checking the Captured Data

Let's check the S3 location where the endpoint stores the requests and responses that it receives.

Notice that it make take a few minutes for the first few files to show up in S3. Keep running the following line until you get some.

In [204]:
files = S3Downloader.list(data_capture_destination.default_value)[:3]
files

['s3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2023/04/28/19/16-06-992-abf52eaa-40cd-4fb9-916a-96bfe20252c0.jsonl',
 's3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2023/04/28/19/17-07-857-a731227c-df7d-425c-8df5-89206a5d8875.jsonl',
 's3://mlschool/penguins/monitoring/data-capture/penguins-endpoint/AllTraffic/2023/04/28/19/18-08-423-d0343cfa-b491-4099-a816-ebe4f3a149dc.jsonl']

These files contain the data captured by the endpoint in a SageMaker-specific JSON-line format. Each inference request is captured in a single line in the `jsonl` file. The line contains both the input and output merged together.

Let's read the first line from the first file:

In [205]:
if len(files):
    lines = S3Downloader.read_file(files[0])
    print(json.dumps(json.loads(lines.split("\n")[0]), indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "application/json",
      "mode": "INPUT",
      "data": "{\"island\": \"Dream\", \"culmen_length_mm\": 46.4, \"culmen_depth_mm\": 18.6, \"flipper_length_mm\": 190.0, \"body_mass_g\": 3450.0}",
      "encoding": "JSON"
    },
    "endpointOutput": {
      "observedContentType": "application/json",
      "mode": "OUTPUT",
      "data": "{\"prediction\": \"0\", \"confidence\": 0.497521222}",
      "encoding": "JSON"
    }
  },
  "eventMetadata": {
    "eventId": "33532b83-7f7c-4335-9c8a-a8023c3799c6",
    "inferenceTime": "2023-04-28T19:16:06Z"
  },
  "eventVersion": "0"
}


## Step 6 - Statistics and Constraints

Our pipeline generated baseline statistics and constraints using our train set. We can take a look at what these values look like by downloading them from S3.

In [206]:
statistics = f"{data_quality_location}statistics.json"
JSON(json.loads(S3Downloader.read_file(statistics)))

<IPython.core.display.JSON object>

Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/18/5916.jsonl...


In [207]:
constraints = f"{data_quality_location}constraints.json"
JSON(json.loads(S3Downloader.read_file(constraints)))

<IPython.core.display.JSON object>

## Step 7 - Scheduling the Monitoring Job

We can now set up a schedule to continuously monitor data going into the endpoint and compare it to the baseline we generated before. This monitoring job will use the baseline statistics and constraints we generated during the Data Quality Check Step. Check [Schedule Data Quality Monitoring Jobs](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-schedule-data-monitor.html) for more information.

SageMaker looks for violations in the data captured by the endpoint. By default, they combine the input data with the endpoint output and compare the result with the previous baseline we generated. If we let SageMaker do this, we will get three violations:

1. An "extra column check" violation because the field `confidence` doesn't exist in the baseline.
2. An "extra column check" violation because the field `prediction` doesn't exist in the baseline.
3. A "missing column check" violation because the field `groundtruth` doesn't appear in the data captured from the endpoint.

We can fix these violations by creating a preprocessing script configuring the data we want the monitoring job to use. This script will create a `groundtruth` column, and exclude `confidence` and `prediction`. By doing this, we will not receive any of these three violations.


In [208]:
DATA_QUALITY_PREPROCESSOR = "data_quality_preprocessor.py"

Here is the preprocessing script for the Data Quality Monitoring Job. Check [Preprocessing and Postprocessing](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-pre-and-post-processing.html) for more information about how to configure these scripts.

In [209]:
%%writefile {PENGUINS_FOLDER}/{DATA_QUALITY_PREPROCESSOR}
import json

def preprocess_handler(inference_record):
    input_data = inference_record.endpoint_input.data
    output_data = json.loads(inference_record.endpoint_output.data)
    
    response = json.loads(input_data)
    response["groundtruth"] = output_data["prediction"]
    return response

Overwriting penguins/data_quality_preprocessor.py


The monitoring schedule expects an S3 location pointing to the preprocessing script. Let's upload the script to the default bucket.

In [210]:
bucket = boto3.Session().resource("s3").Bucket(sagemaker_session.default_bucket())
prefix = "penguins-monitoring"
bucket.Object(os.path.join(prefix, DATA_QUALITY_PREPROCESSOR)).upload_file(str(PENGUINS_FOLDER / DATA_QUALITY_PREPROCESSOR))
data_quality_preprocessor = f"s3://{os.path.join(bucket.name, prefix, DATA_QUALITY_PREPROCESSOR)}"
data_quality_preprocessor

's3://sagemaker-us-east-1-325223348818/penguins-monitoring/data_quality_preprocessor.py'

We can now set up the Data Quality Monitoring Job using the [DefaultModelMonitor](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_monitor.model_monitoring.DefaultModelMonitor) class. Notice how we specify the `record_preprocessor_script` using the S3 location where we uploaded our script.

In [211]:
data_monitor = DefaultModelMonitor(
    instance_type="ml.m5.xlarge",
    instance_count=1,
    max_runtime_in_seconds=3600,
    role=role,
)

data_monitor.create_monitoring_schedule(
    monitor_schedule_name="penguins-data-monitoring-schedule",
    endpoint_input=predictor.endpoint_name,
    record_preprocessor_script=data_quality_preprocessor,
    statistics=f"{data_quality_location}statistics.json",
    constraints=f"{data_quality_location}constraints.json",
    schedule_cron_expression=CronExpressionGenerator.hourly(),
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: penguins-data-monitoring-schedule


You can describe the schedule to see more information about the Data Quality Monitoring Job.

In [212]:
data_monitor.describe_schedule()

{'MonitoringScheduleArn': 'arn:aws:sagemaker:us-east-1:325223348818:monitoring-schedule/penguins-data-monitoring-schedule',
 'MonitoringScheduleName': 'penguins-data-monitoring-schedule',
 'MonitoringScheduleStatus': 'Pending',
 'MonitoringType': 'DataQuality',
 'CreationTime': datetime.datetime(2023, 6, 12, 18, 59, 38, 440000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 6, 12, 18, 59, 38, 576000, tzinfo=tzlocal()),
 'MonitoringScheduleConfig': {'ScheduleConfig': {'ScheduleExpression': 'cron(0 * ? * * *)'},
  'MonitoringJobDefinitionName': 'data-quality-job-definition-2023-06-12-18-59-37-505',
  'MonitoringType': 'DataQuality'},
 'EndpointName': 'penguins-endpoint',
 'LastMonitoringExecutionSummary': {'MonitoringScheduleName': 'penguins-data-monitoring-schedule',
  'ScheduledTime': datetime.datetime(2023, 6, 6, 16, 0, tzinfo=tzlocal()),
  'CreationTime': datetime.datetime(2023, 6, 6, 16, 7, 31, 229000, tzinfo=tzlocal()),
  'LastModifiedTime': datetime.datetime(2023,

Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/18/5946.jsonl...


## Step 8 - Introducing a Violation

Let's make a prediction for a penguin and include extra fields in the request. This should be flagged by the monitoring job.

In [213]:
predictor.predict({
    "island": "Dream",
    "culmen_length_mm": 46.4,
    "culmen_depth_mm": 18.6,
    "flipper_length_mm": 190.0,
    "body_mass_g": 5608.0,
    "name": "Johnny",
    "height": 28.0
})

{'species': 'Adelie', 'prediction': 0, 'confidence': 0.585679114}

Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/19/0016.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/19/0046.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/19/0117.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/19/0147.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/19/0217.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/19/0247.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/19/0317.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/19/0348.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/19/0418.jsonl...
Uploading ground truth data to s3://mlschool/penguins/m

## Step 9 - Cleaning up

Let's stop the monitoring jobs by deleting the monitoring schedule we created before. The following function waits for the job to finish before deleting it.

In [214]:
def delete_monitoring_schedule(schedule):
    attempts = 30
    
    try:
        status = schedule.describe_schedule()["MonitoringScheduleStatus"]
    except Exception:
        print("Monitoring schedule deleted.")
        return
        
    while status in ("Pending", "InProgress") and attempts > 0:
        attempts -= 1
        print(f"Monitoring schedule status: {status}. Waiting for it to finish.")
        time.sleep(30)
        status = schedule.describe_schedule()["MonitoringScheduleStatus"]

    if status not in ("Pending", "InProgress"):
        schedule.delete_monitoring_schedule()
        print("Monitoring schedule deleted.")
    else:
        print("Waiting for monitoring schedule timed out")


delete_monitoring_schedule(data_monitor)


Deleting Monitoring Schedule with name: penguins-data-monitoring-schedule


INFO:sagemaker.model_monitor.model_monitoring:Deleting Data Quality Job Definition with name: data-quality-job-definition-2023-06-12-18-59-37-505


Monitoring schedule deleted.


Let's now stop the thread generating traffic.

In [215]:
stop_traffic_thread.set()
traffic_thread.join()

Finally, we can delete the endpoint.

In [216]:
predictor.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: penguins-endpoint-config-0612182637
INFO:sagemaker:Deleting endpoint with name: penguins-endpoint


Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/20/2322.jsonl...


# Session 6 - Model Monitoring

This session aims to set up a monitoring process to analyze the quality of the model predictions. For this, we need to generate ground truth for the data captured by the endpoint and compare it with a baseline performance.

Check [Amazon SageMaker Model Monitor](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_monitoring.html) for a brief explanation of how to use SageMaker's Model Monitoring functionality. [Monitor models for data and model quality, bias, and explainability](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html) is a much more extensive guide to Model Monitoring in Amazon SageMaker.

Here is what the Pipeline will look like at the end of this session:

<img src='penguins/images/session6-pipeline.png' alt='Session 6 Pipeline' width="600">


In [149]:
from sagemaker.workflow.quality_check_step import ModelQualityCheckConfig

from sagemaker.inputs import CreateModelInput, TransformInput
from sagemaker.transformer import Transformer
from sagemaker.workflow.steps import CreateModelStep, TransformStep
from sagemaker.model_monitor import ModelQualityMonitor

## Step 1 - Creating Test Predictions

To create a baseline to compare the model performance, we must create predictions for the test set and compare them with the predictions from the model. We can do this by running a Batch Transform Job to predict every sample from the test dataset. We can use a [Transform Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-transform) as part of the pipeline to run this job. You can check [Batch Transform](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html) for more information about Batch Transform Jobs.

The Transform Step requires a model to generate predictions, so we need a Model Step that creates a model.

We also need to configure the [Batch Transform Job](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html) using a [Transform Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-transform). This Batch Transform Job will run every sample from the training dataset through the model so we can compute the baseline metrics. We can use an instance of the [Transformer](https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html) class to configure the job.

In [150]:
create_model_step = ModelStep(
    name="create-model",
    step_args=model.create(instance_type="ml.m5.large"),
)

transformer = Transformer(
    # The Batch Transform Job will use the model we created using the
    # Model Step.
    model_name=create_model_step.properties.ModelName,
    
    instance_type="ml.c5.xlarge",
    instance_count=1,
    
    # The baseline set that we generated in the preprocessing step
    # is in JSON format, where every line is a JSON sample.
    accept="application/json",
    strategy="SingleRecord",
    assemble_with="Line",
    
    output_path=f"{S3_FILEPATH}/transform",
)

generate_test_predictions_step = TransformStep(
    name="generate_test_predictions",
    transformer=transformer,
    inputs=TransformInput(
        
        # We will use the test dataset we generated during the preprocessing 
        # step to run it through the model and generate predictions.
        data=preprocess_data_step.properties.ProcessingOutputConfig.Outputs["test-baseline"].S3Output.S3Uri,

        join_source="Input",
        content_type="application/json",
        split_type="Line",
    ),
    cache_config=cache_config
)

## Step 2 - Generating a Baseline

Let's now configure the [Quality Check Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-quality-check) and feed it the data we generated in the Transform Step.

In [151]:
model_quality_location = f"{S3_FILEPATH}/monitoring/model-quality/"

model_quality_baseline_step = QualityCheckStep(
    name="generate-model-quality-baseline",
    
    check_job_config = CheckJobConfig(
        instance_type="ml.t3.xlarge",
        instance_count=1,
        volume_size_in_gb=20,
        sagemaker_session=sagemaker_session,
        role=role,
    ),
    
    quality_check_config = ModelQualityCheckConfig(
        # We are going to use the output of the Transform Step to generate
        # the model quality baseline.
        baseline_dataset=generate_test_predictions_step.properties.TransformOutput.S3OutputPath,

        dataset_format=DatasetFormat.json(lines=True),
        output_s3_uri=model_quality_location,

        # We need to specify the problem type and the fields where the prediction
        # and groundtruth are so the process knows how to interpret the results.
        problem_type="MulticlassClassification",
        inference_attribute="$.SageMakerOutput.prediction",
        ground_truth_attribute="groundtruth",
    ),
    
    skip_check=True,
    register_new_baseline=True,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/4921.jsonl...


## Step 3 - Setting up Model Metrics

We can configure a new set of [ModelMetrics](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_metrics.ModelMetrics) using the results of the Data and Model Quality Steps.

In [152]:
model_metrics = ModelMetrics(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    
    model_constraints=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
)

# drift_check_baselines = DriftCheckBaselines(
#     model_data_statistics=MetricsSource(
#         s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
#         content_type="application/json",
#     ),
#     model_data_constraints=MetricsSource(
#         s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
#         content_type="application/json",
#     ),
#     model_statistics=MetricsSource(
#         s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
#         content_type="application/json",
#     ),
#     model_constraints=MetricsSource(
#         s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
#         content_type="application/json",
#     )
# )

## Step 4 - Registering the Model

We need to redefine the Model Step to register the [TensorFlowModel](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.tensorflow.html#tensorflow-serving-model) so it takes into account the new metrics.

In [153]:
register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        # drift_check_baselines=drift_check_baselines,
        approval_status="Approved",

        content_types=["application/json"],
        response_types=["application/json"],
        inference_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="TENSORFLOW",
        framework_version="2.6",
    )
)

## Step 5 - Setting up the Condition Step

We only want to compute the model quality baseline if the model's performance is above the predefined threshold. The [Condition Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) will gate all necessary steps to compute the baseline. 

In [154]:
condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition_gte],
    if_steps=[
        create_model_step, 
        generate_test_predictions_step, 
        model_quality_baseline_step, 
        register_model_step,
        deploy_step
    ],
    else_steps=[fail_step], 
)

## Step 6 - Running the Pipeline

We can now run the pipeline.

In [155]:
session6_pipeline = Pipeline(
    name="penguins-session6-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination,
        timestamp_signature,
        data_capture_percentage,
        data_capture_destination,
       
        accuracy_threshold,
    ],
    steps=[
        preprocess_data_step, 
        data_quality_baseline_step,
        train_model_step, 
        evaluate_model_step,
        condition_step
    ],
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist or update it if it does.

In [156]:
session6_pipeline.upsert(role_arn=role)
execution = session6_pipeline.start()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/4951.jsonl...
Generating 334 predictions...


## Step 7 - Setting Up a Predictor

We can now create a [Predictor](https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html) from the endpoint.

In [119]:
waiter = sagemaker_client.get_waiter("endpoint_in_service")
waiter.wait(
    EndpointName=endpoint_name,
    WaiterConfig={
        "Delay": 10,
        "MaxAttempts": 30
    }
)

predictor = Predictor(
    endpoint_name=endpoint_name,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer()
)

## Step 8 - Generating Ground Truth Data

To monitor our model, we need to generate ground truth data for the samples captured by the endpoint. We can simulate this by generating a random ground truth for every sample. Check [Ingest Ground Truth Labels and Merge Them With Predictions](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-merge.html) for more information about this.

In [122]:
ground_truth_path = f"{S3_FILEPATH}/monitoring/groundtruth" 

def generate_ground_truth_data(ground_truth_path):
    
    def _generate_ground_truth_record(inference_id):
        random.seed(inference_id)

        return {
            "groundTruthData": {
                "data": str(random.choice([0, 1, 2])),
                "encoding": "CSV",
            },
            "eventMetadata": {
                "eventId": str(inference_id),
            },
            "eventVersion": "0",
        }


    def _upload_ground_truth(records, upload_time):
        records = [json.dumps(r) for r in records]
        data = "\n".join(records)
        uri = f"{ground_truth_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"

        print(f"Uploading ground truth data to {uri}...")

        S3Uploader.upload_string_as_file_body(data, uri)    

                
    def _generate_ground_truth_data(max_records, stop_ground_truth_thread):
        while True:
            records = [_generate_ground_truth_record(i) for i in range(max_records)]
            _upload_ground_truth(records, datetime.utcnow())

            if stop_ground_truth_thread.is_set():
                break

            sleep(30)

                
    stop_ground_truth_thread = Event()
    data = pd.read_csv(LOCAL_FILEPATH).dropna()
    
    groundtruth_thread = Thread(
        target=_generate_ground_truth_data,
        args=(len(data), stop_ground_truth_thread,)
    )
    
    groundtruth_thread.start()
    
    return stop_ground_truth_thread, traffic_thread


Let's start generating traffic to the endpoint and create random ground truth data.

In [123]:
stop_traffic_thread, traffic_thread = generate_traffic()
stop_ground_truth_thread, groundtruth_thread = generate_ground_truth_data(ground_truth_path)

Generating 334 predictions...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/3618.jsonl...


## Step 9 - Scheduling the Monitoring Job

Let's set up a schedule to continuously monitor the quality of the model and compare it to the baseline we generated before. This monitoring job will use the baseline constraints we generated during the Model Quality Check Step. Check [Schedule Model Quality Monitoring Jobs](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-schedule.html) for more information.

To set up a Model Quality Monitoring Job, we can use the [ModelQualityMonitor](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_monitor.model_monitoring.ModelQualityMonitor) class. The [EndpointInput](https://sagemaker.readthedocs.io/en/v2.24.2/api/inference/model_monitor.html#sagemaker.model_monitor.model_monitoring.EndpointInput) instance configures the attribute the monitoring job should use to determine the prediction from the model.

Check [Amazon SageMaker Model Quality Monitor](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker_model_monitor/model_quality/model_quality_churn_sdk.html) for a complete tutorial on how to run a Model Monitoring Job in SageMaker.

In [124]:
model_monitor = ModelQualityMonitor(
    instance_type="ml.m5.xlarge",
    instance_count=1,
    max_runtime_in_seconds=1800,
    role=role
)

endpoint_input = EndpointInput(
    endpoint_name=predictor.endpoint_name,
    
    # The endpoint returns an attribute `prediction` with the
    # prediction from the model. That's the attribute we want to
    # use to compare with the ground truth.
    inference_attribute="prediction",
    
    destination="/opt/ml/processing/input_data",
)

model_monitor.create_monitoring_schedule(
    monitor_schedule_name="penguins-model-monitoring-schedule",
    endpoint_input=endpoint_input,
    problem_type="MulticlassClassification",
    
    ground_truth_input=ground_truth_path,
    
    constraints=f"{model_quality_location}constraints.json",
    
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    output_s3_uri=f"{S3_FILEPATH}/monitoring/model-quality",
    enable_cloudwatch_metrics=True,
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: penguins-model-monitoring-schedule


You can describe the schedule to see more information about the Model Quality Monitoring Job.

In [125]:
model_monitor.describe_schedule()

{'MonitoringScheduleArn': 'arn:aws:sagemaker:us-east-1:325223348818:monitoring-schedule/penguins-model-monitoring-schedule',
 'MonitoringScheduleName': 'penguins-model-monitoring-schedule',
 'MonitoringScheduleStatus': 'Pending',
 'MonitoringType': 'ModelQuality',
 'CreationTime': datetime.datetime(2023, 6, 12, 15, 36, 29, 90000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 6, 12, 15, 36, 29, 202000, tzinfo=tzlocal()),
 'MonitoringScheduleConfig': {'ScheduleConfig': {'ScheduleExpression': 'cron(0 * ? * * *)'},
  'MonitoringJobDefinitionName': 'model-quality-job-definition-2023-06-12-15-36-28-309',
  'MonitoringType': 'ModelQuality'},
 'EndpointName': 'penguins-endpoint',
 'LastMonitoringExecutionSummary': {'MonitoringScheduleName': 'penguins-model-monitoring-schedule',
  'ScheduledTime': datetime.datetime(2023, 5, 26, 21, 0, tzinfo=tzlocal()),
  'CreationTime': datetime.datetime(2023, 5, 26, 21, 0, 56, 245000, tzinfo=tzlocal()),
  'LastModifiedTime': datetime.datetim

Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/3648.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/15/3719.jsonl...


## Step 10 - Cleaning up

Let's stop the monitoring job by deleting the monitoring schedule we created before.

In [160]:
delete_monitoring_schedule(model_monitor)


Deleting Monitoring Schedule with name: penguins-model-monitoring-schedule


INFO:sagemaker.model_monitor.model_monitoring:Deleting Model Quality Job Definition with name: model-quality-job-definition-2023-06-12-15-36-28-309


Monitoring schedule deleted.
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5121.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5152.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5222.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5252.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5322.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5352.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5423.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5453.jsonl...
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5523.jsonl...
Generating 334 predictions

We also need to stop the threads generating predictions and ground truth data.

In [None]:
stop_traffic_thread.set()
stop_ground_truth_thread.set()

traffic_thread.join()
groundtruth_thread.join()

Finally, let's delete the endpoint.

In [161]:
predictor.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: penguins-endpoint-config-0612153337
INFO:sagemaker:Deleting endpoint with name: penguins-endpoint


Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/17/3410.jsonl...


# Final Clean Up

Here we can do a more deep clean up.

In [85]:
def delete_pipeline(pipeline):
    if pipeline:
        pipeline.delete()

In [None]:
delete_pipeline(session1_pipeline)
delete_pipeline(session2_pipeline)
delete_pipeline(session3_pipeline)
delete_pipeline(session4_pipeline)
delete_pipeline(session5_pipeline)
delete_pipeline(session6_pipeline)

In [157]:
# Let's delete every model we registered under our model package group
for mp in sagemaker_client.list_model_packages(ModelPackageGroupName=model_package_group_name)["ModelPackageSummaryList"]:
    print(f"Deleting {mp['ModelPackageArn']}")
    sagemaker_client.delete_model_package(ModelPackageName=mp["ModelPackageArn"])

# We can now delete the model package group.    
sagemaker_client.delete_model_package_group(ModelPackageGroupName=model_package_group_name)

Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/15
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/14
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/13
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/12
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/11
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/10
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/9
Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5021.jsonl...
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/8
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/7
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/6
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/penguins/5
Deleting arn:aws:sagemaker:us-east-1:325223348818:model-package/

{'ResponseMetadata': {'RequestId': '1bcee64a-fbec-45d0-a996-5f07c99f0c15',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1bcee64a-fbec-45d0-a996-5f07c99f0c15',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Mon, 12 Jun 2023 16:50:33 GMT'},
  'RetryAttempts': 0}}

Uploading ground truth data to s3://mlschool/penguins/monitoring/groundtruth/2023/06/12/16/5051.jsonl...
