# Pipeline of Digits

This is a starting notebook for solving the "Pipeline of Digits" assignment.


This notebook was created by [Sushant Gautam](https://www.linkedin.com/in/susan-gautam/) as part of the [Machine Learning School Assignment](https://www.ml.school) program.

Let's make sure we are running the latest version of the SakeMaker's SDK. **Restart the notebook** after you upgrade the library.

In [3]:
!pip install -q --upgrade awscli
!pip install -q --upgrade pip
!pip install -q --upgrade sagemaker
!pip show sagemaker

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0[0m[39;49m -> [0m[32;49m23.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[0mName: sagemaker
Version: 2.150.0
Summary: Open source library for training and deploying models on Amazon SageMaker.
Home-page: https://github.com/aws/sagemaker-python-sdk/
Author: Amazon Web Services
Author-email: 
License: Apache License 2.0
Location: /opt/conda/lib/python3.8/site-packages
Requires: attrs, boto3, cloudpickle, google-pasta, importlib-metadata, jsonschema, numpy, packaging, pandas, pathos, platformdirs, protobuf, protobuf3-to-dict, PyYAML, schema, smdebug-rulesconfig, tblib
Required-by: 


In [4]:
%load_ext autoreload
%autoreload 2

In [63]:
import boto3
import sagemaker
import pandas as pd

from pathlib import Path

boto_session = boto3.Session()
sagemaker_session = sagemaker.Session(boto_session=boto_session)
sm_client = boto3.client("sagemaker")
region = boto_session.region_name
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()
account = sagemaker_session.boto_session.client("sts").get_caller_identity()["Account"]


print(f"account: {account}")
print(f"bucket: {bucket}")
print(f"region: {region}")
print(f"role: {role}")

iam_client = boto3.client("iam")
sagemaker_client = boto3.client("sagemaker")

# role = sagemaker.get_execution_role()
# region = boto3.Session().region_name
# sagemaker_session = sagemaker.session.Session()

account: 629171115321
bucket: sagemaker-us-east-1-629171115321
region: us-east-1
role: arn:aws:iam::629171115321:role/my_sagemaker_execution_role


## Creating the S3 Bucket

Let's create an S3 bucket where you will upload all the information generated by the pipeline. Make sure you set `BUCKET` to the name of the bucket you want to use. This name has to be unique.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

In [6]:
BUCKET = "mlschooldata"

!aws s3api create-bucket --bucket $BUCKET

{
    "Location": "/mlschooldata"
}


## Loading the dataset

We have two CSV files containing the MNIST dataset. These files come from the [MNIST in CSV](https://www.kaggle.com/datasets/oddrationale/mnist-in-csv) Kaggle dataset.

The `mnist_train.csv` file contains 60,000 training examples and labels. The `mnist_test.csv` contains 10,000 test examples and labels. Each row consists of 785 values: the first value is the label (a number from 0 to 9) and the remaining 784 values are the pixel values (a number from 0 to 255).

Let's extract the `dataset.tar.gz` file.

In [7]:
MNIST_FOLDER = "mnist"
DATASET_FOLDER = Path(MNIST_FOLDER) / "dataset"

!tar -xvzf $MNIST_FOLDER/dataset.tar.gz -C $MNIST_FOLDER --no-same-owner

dataset/
dataset/mnist_test.csv
dataset/mnist_train.csv


Let's load the first 10 rows of the test set.

In [7]:
# train_df = pd.read_csv(DATASET_FOLDER / "mnist_train.csv")
# test_df = pd.read_csv(DATASET_FOLDER / "mnist_test.csv")
# train_df.head(10)


Unnamed: 0,label,1x1,1x2,1x3,1x4,1x5,1x6,1x7,1x8,1x9,...,28x19,28x20,28x21,28x22,28x23,28x24,28x25,28x26,28x27,28x28
0,5,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,9,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,2,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
6,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
7,3,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
8,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
9,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [8]:
# print(train_df.shape)
# print(test_df.shape)

(60000, 785)
(10000, 785)


## Step 1: Preprocessing Step

In [8]:
%%writefile preprocessor.py

import os
import numpy as np
import pandas as pd
import tempfile

from pathlib import Path
from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIR = "/opt/ml/processing"
DATA_FILEPATH_TRAIN = Path(BASE_DIR) / "input" / "mnist_train.csv"
DATA_FILEPATH_TEST = Path(BASE_DIR) / "input" / "mnist_test.csv"


def save_splits(base_dir, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """
    
    train_path = Path(base_dir) / "train" 
    validation_path = Path(base_dir) / "validation" 
    test_path = Path(base_dir) / "test"
    
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)
    
    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)
    
    
def preprocess(base_dir, train_data_filepath, test_data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train, validation,
    and a test set.
    """
    
    train_df = pd.read_csv(train_data_filepath)
    test_df = pd.read_csv(test_data_filepath)
    
    X_train = train_df.drop(['label'], axis=1)
    X_train = X_train.values/255.
    X_test = test_df.drop(['label'], axis=1)
    X_test = X_test.values/255.
    y_train = train_df['label'].values.astype('int')
    y_test = test_df['label'].values.astype('int')
    
    # validation set
    train, validation = np.split(X_train, [int(.8 * len(X_train))])

    validation_split = StratifiedShuffleSplit(n_splits=1, test_size=0.25, random_state=46)
    validation_split.split(X_train, y_train)
    training_idx, validation_idx = list(validation_split.split(X_train, y_train))[0]
    
    x_training = X_train[training_idx]
    y_training = y_train[training_idx]

    x_validation = X_train[validation_idx]
    y_validation = y_train[validation_idx]
    
    
    train = np.concatenate((np.expand_dims(y_training, axis=1), x_training), axis=1)
    validation = np.concatenate((np.expand_dims(y_validation, axis=1), x_validation), axis=1)
    test = np.concatenate((np.expand_dims(y_test, axis=1), X_test), axis=1)
    
    
    save_splits(base_dir, train, validation, test)
        

if __name__ == "__main__":
    preprocess(BASE_DIR, DATA_FILEPATH_TRAIN, DATA_FILEPATH_TEST)


Overwriting preprocessor.py


## Step 4 - Testing the Preprocessing Script

We can now load the script we just created and run it locally to ensure it outputs every file we need.

We will set up a SageMaker Processing Job to run this script, but we always want to test the code locally. In this case, we can call the `preprocess()` function with the local directory and the local copy of the dataset.

In [9]:
from preprocessor import preprocess
import tempfile

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_dir=directory, 
        train_data_filepath = 'mnist/dataset/mnist_train.csv',
        test_data_filepath = 'mnist/dataset/mnist_test.csv'
    )
    
    print(f"Folders: {os.listdir(directory)}")


Folders: ['train', 'validation', 'test']


## Uploading dataset to S3

In [9]:
S3_FILEPATH = f"s3://{BUCKET}/{MNIST_FOLDER}"


TRAIN_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_train.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

TEST_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_test.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

print(f"Dataset location: {S3_FILEPATH}")
print(f"Train set S3 location: {TRAIN_SET_S3_URI}")
print(f"Test set S3 location: {TEST_SET_S3_URI}")

Dataset location: s3://mlschooldata/mnist
Train set S3 location: s3://mlschooldata/mnist/mnist_train.csv
Test set S3 location: s3://mlschooldata/mnist/mnist_test.csv


## Step 5 - Pipeline Configuration

When we create a SageMaker Pipeline we can specify a list of paramaters that we can use on individual pipeline steps. To read more about these parameters, check [Pipeline Parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html).

These are the parameters that we need right now:

* `dataset_location`: This parameter represents the location of the dataset in S3. We will use this parameter to indicate the SageMaker Processing Job where the dataset is located. The Processing Job will download the dataset from S3 and make it available on the instance running the script.
* `preprocessor_destination`: We need to define the location where the SageMaker Processing Job will store the output. When it finishes, the Processing Job will copy the script's output to the S3 location specified by this parameter. By default, SageMaker uploads the output of a job to a custom location in S3, but unfortunately, if we relay on that functionality, we can't cache the Processing Step in the Pipeline.
* `baseline_destination`: This parameter represents the location where we will store the baseline data. We will use this baseline data in Session 6 to compute general statistics about the model. This will be helpful to monitor the quality of the model results.

In [10]:
import os
import sagemaker
import numpy as np
import boto3
import json
import pandas as pd
import numpy as np
import urllib.request
import argparse
import tempfile
from pathlib import Path

from botocore.exceptions import ClientError
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig
from sagemaker.pytorch import PyTorch


In [11]:
# train_dataset_location = ParameterString(
#     name="dataset_location",
#     default_value=TRAIN_SET_S3_URI,
# )

# test_dataset_location = ParameterString(
#     name="dataset_location",
#     default_value=TEST_SET_S3_URI,
# )
dataset_location = ParameterString(
    name="dataset_location",
    default_value=S3_FILEPATH,
)

preprocessor_destination = ParameterString(
    name="preprocessor_destination",
    default_value=f"{S3_FILEPATH}/preprocessing",
)

### Setting Github username and email

In [12]:
! git config --global user.email "sushant@gmail.com"
! git config --global user.name "sushant"

! git config --global credential.helper '!a"ws codecommit credential-helper $@'
! git config --global credential.UseHttpPath true

### Caching

In [13]:
cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

## Setting up a Processing Step

The first step we need in the pipeline is a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) to run the preprocessing script. Check the [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) SageMaker's SDK documentation for more information. This Processing Step will create a SageMaker Processing Job in the background, run the script, and upload the output to S3. You can use Processing Jobs to perform data pre-processing, post-processing, feature engineering, data validation, and model evaluation.

In [14]:
sklearn_processor = SKLearnProcessor(
    base_job_name="mnist-preprocessing",
    framework_version="0.23-1",
    instance_type="ml.t3.large",
    instance_count=1,
    role=role,
)

In [15]:
preprocess_step = ProcessingStep(
    name="preprocessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=dataset_location, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=preprocessor_destination),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=preprocessor_destination),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=preprocessor_destination),
    ],
    code="preprocessor.py",
    cache_config=cache_config
)

##  Running the Pipeline

Let's define and run the SageMaker Pipeline. Check [Pipeline Structure and Execution](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-pipeline.html) for more information about how to define a pipeline and [Run a Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/run-pipeline.html) for information about how to run it.

The pipeline uses the parameters we defined before and a single step: the Preprocess Step that will preprocess the dataset.

In [16]:
session1_pipeline = Pipeline(
    name="mnist-session1-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
    ],
    steps=[
        preprocess_step, 
    ]
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does.

In [30]:
# session1_pipeline.upsert(role_arn=role)
# execution = session1_pipeline.start()

##  Cleaning up

Before you finish, don't forget to clean up after you.

In [None]:
# session1_pipeline.delete()

### Import

### Train Step

Model Creation Reference: https://www.kaggle.com/code/mercedeszkistoth/digit-classifier-vanilla-nn-pytorch

In [17]:
%%writefile train.py

import os
import argparse

import numpy as np
import pandas as pd
import torch

from pathlib import Path
from sklearn.metrics import accuracy_score


import numpy as np
from sklearn import metrics
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader


# Expects a .csv file where the first column is the label
# and returns a tensor dataset, the input_size and the number of classes
def csv_to_tensor(file):
    X_train = pd.read_csv(Path(file), header=None)
    y_train = X_train.iloc[:,0].values
    X_train = X_train.iloc[:, 1:].values
    tensor_labels = torch.from_numpy(y_train).long()
    tensor_data = torch.from_numpy(X_train).float()
    return TensorDataset(tensor_data, tensor_labels)

# DataLoader used for mini-batch training
# DataLoader used for mini-batch training
def make_data_loader(train_file, val_file, batch_size):
    train_dataset = csv_to_tensor(train_file)
    val_dataset = csv_to_tensor(val_file)
    
    input_size = len(val_dataset.tensors[0][0])
    print(" input_size ",input_size)
    tensor_labels = val_dataset.tensors[1]
    print(" tensor_labels ", tensor_labels)
    labels = set(label.item() for label in tensor_labels)
    num_classes = len(labels)
    print("num_classes ", num_classes)
    
    train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(dataset=val_dataset, shuffle=True)
    return train_loader, val_loader, input_size, num_classes

# Our model, the heart
class DigitClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super().__init__()
        self.hidden_layer_1 = nn.Linear(input_size, hidden_size) 
        self.activation_1 = nn.ReLU()
        self.hidden_layer_2 = nn.Linear(hidden_size, hidden_size) 
        self.activation_2 = nn.ReLU()
        self.output_layer = nn.Linear(hidden_size, num_classes)
        self.probabilities = nn.Softmax(dim=0)
    
    def forward(self, x):
        hidden_1 = self.hidden_layer_1(x)
        hidden_activated_1 = self.activation_1(hidden_1)
        hidden_2 = self.hidden_layer_2(hidden_activated_1)
        hidden_activated_2 = self.activation_2(hidden_2)
        out_layer = self.output_layer(hidden_activated_2)
        return out_layer
    

def accuracy(predictions, labels):
    classes = torch.argmax(predictions, dim=1)
    return accuracy_score(labels.cpu(), classes.cpu())

def train_model(model, train_dataset, validation_dataset, loss_fn, optimizer, batch_size=100, n_epochs=1):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)
    training_losses = []
    
    for epoch in range(n_epochs):
        model.train()
        running_loss = 0.00
        running_accuracy = 0.00
        for i, (x_batch, y_batch) in enumerate(train_dataset):
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            # zero gradients
            optimizer.zero_grad()
            # predictions
            outputs = model(x_batch)
            loss = loss_fn(outputs, y_batch)
            loss.backward()
            optimizer.step()
            training_losses.append(loss.item())
            running_loss += loss.item()
            running_accuracy += accuracy(outputs, y_batch)
        
        running_loss /= len(train_dataset)
        running_accuracy /= len(train_dataset)

        # validation accuracy
        model.eval()
        correct, total = 0.0, 0.0
        for images, labels in validation_dataset:
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum() 
            
        val_accuracy =  correct/total
        print(f'Epoch: {epoch+1}/{n_epochs} - '+
                  f'loss: {loss:.4f} - '+ 
                  f'accuracy: {running_accuracy:.4f} - '+
                  f'val_accuracy: {val_accuracy:.4f}')
            
    
    return model, training_losses

            
# def predict(model, input_tensor):
#     device = 'cuda' if torch.cuda.is_available() else 'cpu'
#     model.to(device)
#     input_tensor.to(device)
    
#     with torch.no_grad():
#         raw_output = model(input_tensor)
#         output_probabilities = model.probabilities(raw_output)
#         pred_category = torch.argmax(output_probabilities).item()
#         pred_probability = torch.max(output_probabilities).item()
#     return pred_category, round(pred_probability, 4)

# def evaluate(model, test_dataset):
#     actual = []
#     predicted = []
#     n_correct = 0
#     incorrect = []
    
#     for i, (x, y) in enumerate(test_dataset):
#         actual.append(y.item())
#         label, prob = predict(model, x)
#         predicted.append(label)
        
#         if (label == y.item()):
#             n_correct+=1
#         else:
#             incorrect.append([i, y.item(), label])
    
#     print(f'Accuracy on test set: {n_correct/len(test_dataset)*100:.2f}%')

#     confusion_matrix = metrics.confusion_matrix(actual, predicted)
#     print(f"Confusion Matrix: {confusion_matrix}")
    

# Hyperparameters
hidden_size = 10
learning_rate = 0.001


def train(base_directory, train_path, validation_path, epochs=50, batch_size=32):
    train_loader, val_loader, input_size, num_classes = make_data_loader(Path(train_path) / "train.csv", 
                                                                      Path(validation_path) / "validation.csv", 
                                                                      batch_size)
   
    model = DigitClassifier(input_size, hidden_size, num_classes)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    model, training_loss = train_model(model, 
                                    train_loader,
                                    val_loader,
                                    loss_fn, 
                                    optimizer,
                                    batch_size,
                                    epochs)
        
    model_filepath = Path(base_directory) / "model" / "001" 
    
    model_filepath.mkdir(parents=True, exist_ok=True)

    torch.save(model.state_dict(), model_filepath / "model.pt")
    
    # the_model = TheModelClass(*args, **kwargs)
    # the_model.load_state_dict(torch.load(PATH))
    # model.eval()
    
if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to the entry point
    # as script arguments. SageMaker will also provide a list of special parameters
    # that you can capture here. Here is the full list: 
    # https://github.com/aws/sagemaker-training-toolkit/blob/master/src/sagemaker_training/params.py
    parser = argparse.ArgumentParser()
    parser.add_argument("--base_directory", type=str, default="/opt/ml/")
    parser.add_argument("--train_path", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", None))
    parser.add_argument("--validation_path", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION", None))
    parser.add_argument("--epochs", type=int, default=10)
    parser.add_argument("--batch_size", type=int, default=1)
    args, _ = parser.parse_known_args()
    
    train(
        base_directory=args.base_directory,
        train_path=args.train_path,
        validation_path=args.validation_path,
        epochs=args.epochs,
        batch_size=args.batch_size
    )

Overwriting train.py


In [18]:
LOCAL_FILEPATH = Path(MNIST_FOLDER) / "dataset"

In [19]:
from preprocessor import preprocess
from train import train


with tempfile.TemporaryDirectory() as directory:
    # First, we preprocess the data and create the 
    # dataset splits.
    preprocess(
       directory,
       LOCAL_FILEPATH / "mnist_train.csv",
       LOCAL_FILEPATH / "mnist_test.csv",
    )
    
    print(f"Folders: {os.listdir(directory)}")
    
    
    # Then, we train a model using the train and 
    # validation splits.
    train(
        base_directory=directory, 
        train_path=Path(directory) / "train", 
        validation_path=Path(directory) / "validation",
        epochs=10
    )

  from .autonotebook import tqdm as notebook_tqdm


Folders: ['train', 'validation', 'test']
 input_size  784
 tensor_labels  tensor([8, 6, 4,  ..., 3, 1, 9])
num_classes  10
[2023-04-26 14:35:47.917 pytorch-1-12-cpu-py38-ml-m5-large-99f7772120c38cc808dd72abae27:27 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None




[2023-04-26 14:35:48.213 pytorch-1-12-cpu-py38-ml-m5-large-99f7772120c38cc808dd72abae27:27 INFO profiler_config_parser.py:111] Unable to find config at /opt/ml/input/config/profilerconfig.json. Profiler is disabled.
Epoch: 1/10 - loss: 0.0479 - accuracy: 0.8180 - val_accuracy: 0.8959
Epoch: 2/10 - loss: 0.2017 - accuracy: 0.9067 - val_accuracy: 0.9153
Epoch: 3/10 - loss: 0.1583 - accuracy: 0.9204 - val_accuracy: 0.9193
Epoch: 4/10 - loss: 0.0996 - accuracy: 0.9265 - val_accuracy: 0.9233
Epoch: 5/10 - loss: 0.3529 - accuracy: 0.9306 - val_accuracy: 0.9264
Epoch: 6/10 - loss: 0.6274 - accuracy: 0.9329 - val_accuracy: 0.9229
Epoch: 7/10 - loss: 0.0387 - accuracy: 0.9354 - val_accuracy: 0.9309
Epoch: 8/10 - loss: 0.0242 - accuracy: 0.9387 - val_accuracy: 0.9323
Epoch: 9/10 - loss: 0.2415 - accuracy: 0.9397 - val_accuracy: 0.9315
Epoch: 10/10 - loss: 0.2362 - accuracy: 0.9416 - val_accuracy: 0.9334


# Session 2 - Training and Tuning

This session extends the [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) we built in the previous session with a step to train a model. We'll explore the [Training](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) and the [Tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-tuning) steps.


In [20]:
from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep
from sagemaker.parameter import IntegerParameter
from sagemaker.inputs import TrainingInput
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession

## Step 3 - Switching Between Training and Tuning

There are two ways we can create a model: Using a [Training Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) or using a [Tuning Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-tuning).

In this notebook we are going to alternate between both methods, and we'll use the `USE_TUNING_STEP` flag to indicate which method we want to run.

In [21]:
USE_TUNING_STEP = True

## Step 4 - Setting up a Training Step

We can now create a [Training Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) that we can add to the pipeline. Check the [TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep) SageMaker's SDK documentation for more information. This Training Step will create a SageMaker Training Job in the background, run the training script, and upload the output to S3. 

In [22]:
hyperparameters = {
    "epochs": 10,
    "batch_size": 32,
}

estimator = PyTorch(
    entry_point="train.py",
    hyperparameters=hyperparameters,
    framework_version="1.11.0",
    py_version="py38",
    instance_type="ml.m5.xlarge",#"ml.m5.xlarge",
    use_spot_instances=True,
    max_wait=2000,
    max_run=60 * 15, # 15 minutes
    instance_count=1,
    script_mode=True,
    disable_profiler=True,
    role=role,
)

We can now create the [TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep) using the estimator we defined before.

This step will receive the train and validation splits from the preprocessing step as inputs. Notice how we reference both splits using the `preprocess_step` variable. This creates a dependency between the Training Step and the Processing Step that we defined in Session 1. When we build a new Pipeline, we'll see that the Training Step won't run until the Processing Step finishes.

In [23]:
training_step = TrainingStep(
    name="training",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

## Step 5 - Setting up a Tuning Step

Let's now create a [Tuning Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-tuning) to add it to our pipeline. Check the [TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep) SageMaker's SDK documentation for more information. This Tuning Step will create a SageMaker Hyperparameter Tuning Job in the background and use the training script to train different variants of the model and choose the best one.

The Tuning Step requires a [HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html) reference to configure the Hyperparameter Tuning Job. In this example, the tuner will use the same `Estimator` we defined to train the model.

Here is the configuration that we'll use to find the best model:

1. `objective_metric_name`: This is the name of the metric the tuner will use to determine the best model.
2. `objective_type`: This is the objective of the tuner. Should it "Minimize" the metric or "Maximize" it? In this example, since we are using the validation accuracy of the model, we want the objetive to be "Maximize." If we were using the loss of the model, we would set the objective to "Minimize."
3. `metric_definitions`: Defines how the tuner will determine the value of the metric by looking at the output logs of the training process.

The tuner expects the list of the hyperparameters you want to explore. You can use subclasses of the [Parameter](https://sagemaker.readthedocs.io/en/stable/api/training/parameter.html#sagemaker.parameter.ParameterRange) class to specify different types of hyperparameters. In this example, we are exploring different values for the `epochs` hyperparameter.

Finally, you can control the number of jobs and how many of them will run in parallel using the following two arguments:

* `max_jobs`: Defines the maximum total number of training jobs to start for the hyperparameter tuning job.
* `max_parallel_jobs`: Defines the maximum number of parallel training jobs to start.

In [24]:
from sagemaker.tuner import (
    IntegerParameter,
    ContinuousParameter,
    HyperparameterTuner,
)

In [25]:
hyperparameter_ranges = {
    "learning_rate": ContinuousParameter(0.01, 0.03),
}

objective_metric_name = "val_accuracy"
objective_type = "Maximize"
metric_definitions = [{"Name": objective_metric_name, "Regex": "val_accuracy: ([0-9\\.]+)"}]
    
tuner = HyperparameterTuner(
    estimator,
    objective_metric_name,
    hyperparameter_ranges,
    metric_definitions,
    objective_type=objective_type,
    max_jobs=3,
    max_parallel_jobs=3,
    early_stopping_type="Auto",
)

We can now create the [TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep). 

This step will use the tuner we configured before and will receive the train and validation splits from the preprocessing step as inputs. Notice how we reference both splits using the `preprocess_step` variable. This creates a dependency between the Tuning Step and the Processing Step that we defined in Session 1. When we build a new Pipeline, we'll see that the Tuning Step won't run until the Processing Step finishes.

In [26]:
tuning_step = TuningStep(
    name = "tuning",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

## Step 6 - Running the Pipeline

We can now define and run the SageMaker Pipeline, this time using the Training Step or the Tuning Step.

In [27]:
session2_pipeline = Pipeline(
    name="mnist-tuning-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
    ],
    steps=[
        preprocess_step, 
        tuning_step if USE_TUNING_STEP else training_step
    ]
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does.

In [65]:
# session2_pipeline.upsert(role_arn=role)
# execution = session2_pipeline.start()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource


## Step 9 - Cleaning up

Before you finish, don't forget to clean up after you.

In [39]:
session2_pipeline.delete()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:629171115321:pipeline/mnist-tuning-pipeline',
 'ResponseMetadata': {'RequestId': '412d8fc9-6c50-4b8b-8254-c5f752d6e6f3',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '412d8fc9-6c50-4b8b-8254-c5f752d6e6f3',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '89',
   'date': 'Tue, 25 Apr 2023 15:10:00 GMT'},
  'RetryAttempts': 0}}

# Session 3 - Evaluating the Model

This session extends the [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) with a step to evaluate the model. We'll use a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) with a [ScriptProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor) running TensorFlow to execute an evaluation script. 

Here is what the Pipeline will look like at the end of this session:
![image](https://user-images.githubusercontent.com/30827903/234051902-004d6609-6b0b-435e-89eb-3f0920840af1.png)


### Assignments

1. The evaluation script produces an evaluation report containing the accuracy of the model. Extend the evaluation report by adding other metrics. For example, add the support of the test set (the number of samples in the test set.)

2. One of the assignments from the previous session was to replace the TensorFlow Estimator with a PyTorch Estimator. You can now modify the evaluation step to load a script that uses PyTorch to evaluate the model.

3. If you are runing the Training and Tuning Steps simultaneously, create two different Evaluation Steps to evaluate both models independently.

4. Instead of runing the Training and Tuning Steps simultaneously, run the Tuning Step but create two evaluation steps to evaluate the two best models produced by the Tuning Step. Check the [TuningStep.get_top_model_s3_uri()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri) function to retrieve the two best models.

5. Modify the SageMaker Pipeline you created for the "Pipeline of Digits" project and add an evaluation step that receives the test data from the preprocessing step.



In [28]:
import tarfile

from sagemaker.workflow.properties import PropertyFile

## Step 1 - Evaluating the Model

This script is reponsible from loading the model we created and evaluating it on the test set. Before finishing, this script will create an evaluation report of the model.

In [29]:
%%writefile evaluation.py

import os
import json
import tarfile
import numpy as np
import pandas as pd

from pathlib import Path
import torch
import torch.nn as nn
from sklearn.metrics import accuracy_score


MODEL_PATH = "/opt/ml/processing/model/"
TEST_PATH = "/opt/ml/processing/test/"
OUTPUT_PATH = "/opt/ml/processing/evaluation/"


# Our model, the heart
class DigitClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super().__init__()
        self.hidden_layer_1 = nn.Linear(input_size, hidden_size) 
        self.activation_1 = nn.ReLU()
        self.hidden_layer_2 = nn.Linear(hidden_size, hidden_size) 
        self.activation_2 = nn.ReLU()
        self.output_layer = nn.Linear(hidden_size, num_classes)
        self.probabilities = nn.Softmax(dim=0)
    
    def forward(self, x):
        hidden_1 = self.hidden_layer_1(x)
        hidden_activated_1 = self.activation_1(hidden_1)
        hidden_2 = self.hidden_layer_2(hidden_activated_1)
        hidden_activated_2 = self.activation_2(hidden_2)
        out_layer = self.output_layer(hidden_activated_2)
        return out_layer
    
def evaluate(model_path, test_path, output_path):
    # The first step is to extract the model package provided
    # by SageMaker.
    with tarfile.open(Path(model_path) / "model.tar.gz") as tar:
        tar.extractall(path=Path(model_path))
        
    # We can now load the model from disk.
    # model = keras.models.load_model(Path(model_path) / "001")
    model = DigitClassifier(input_size=784, hidden_size=10, num_classes=10)
    model.load_state_dict(torch.load(Path(model_path) / "001" / "model.pt"))
    model.eval()
    
    X_test = pd.read_csv(Path(test_path) / "test.csv")
    y_test = X_test.iloc[:, 0].values
    X_test = X_test.iloc[:, 1:].values
    
    X_test = torch.from_numpy(X_test).float() # tensors
    predictions = model(X_test).detach().cpu().numpy()
    predictions = np.argmax(predictions, axis=-1)
    accuracy = accuracy_score(y_test, predictions)
    print(f"Test accuracy: {accuracy}")

    # Let's add the accuracy of the model to our evaluation report.
    evaluation_report = {
        "metrics": {
            "accuracy": {
                "value": accuracy
            },
        },
    }
    
    # We need to save the evaluation report to the output path.
    Path(output_path).mkdir(parents=True, exist_ok=True)
    with open(Path(output_path) / "evaluation.json", "w") as f:
        f.write(json.dumps(evaluation_report))


if __name__ == "__main__":
    evaluate(
        model_path=MODEL_PATH, 
        test_path=TEST_PATH,
        output_path=OUTPUT_PATH
    )

Overwriting evaluation.py


## Step 2 - Testing the Evaluation Script

Let's test the script we just created by running it locally.

In [32]:
from preprocessor import preprocess
from train import train
from evaluation import evaluate
import tarfile

with tempfile.TemporaryDirectory() as directory:
    # First, we preprocess the data and create the 
    # dataset splits.
    preprocess(
        base_dir=directory, 
        train_data_filepath = 'mnist/dataset/mnist_train.csv',
        test_data_filepath = 'mnist/dataset/mnist_test.csv'
    )

    # Then, we train a model using the train and 
    # validation splits.
    train(
        base_directory=directory, 
        train_path=Path(directory) / "train", 
        validation_path=Path(directory) / "validation",
        epochs=5,
        batch_size=4
    )
    
    # After training a model, we need to prepare a package just like
    # SageMaker would. This package is what the evaluation script is
    # expecting as an input.
    with tarfile.open(Path(directory) / "model.tar.gz", "w:gz") as tar:
        tar.add(Path(directory) / "model" / "001", arcname="001")
        
    
    # We can now call the evaluation script.
    evaluate(
        model_path=directory, 
        test_path=Path(directory) / "test",
        output_path=Path(directory) / "evaluation",
    )

 input_size  784
 tensor_labels  tensor([8, 6, 4,  ..., 3, 1, 9])
num_classes  10
Epoch: 1/5 - loss: 1.1587 - accuracy: 0.8651 - val_accuracy: 0.9134
Epoch: 2/5 - loss: 0.0253 - accuracy: 0.9190 - val_accuracy: 0.9180
Epoch: 3/5 - loss: 0.0038 - accuracy: 0.9281 - val_accuracy: 0.9285
Epoch: 4/5 - loss: 0.0351 - accuracy: 0.9322 - val_accuracy: 0.9291
Epoch: 5/5 - loss: 0.0368 - accuracy: 0.9348 - val_accuracy: 0.9309
Test accuracy: 0.9301930193019302


## Step 3 - Pipeline Configuration

We need to define a new Pipeline paramater with the location where the Processing Step running the evaluation script will store the evaluation report. Just like we did with the Processing Step runing the preprocessing script, we want to prevent SageMaker from appending a timestamp to their auto-generated location. If we let SageMaker use a timestamp, we can't cache that step. That's the goal of the `evaluation_destination` parameter.

In [30]:
evaluation_destination = ParameterString(
    name="evaluation_destination",
    default_value=f'{S3_FILEPATH}/evaluation',
)

## Step 4 - Setting up a Processor

To run the evaluation script we can use a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing). Check the [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) SageMaker's SDK documentation for more information.

This time, we will use a [ScriptProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor) running a training TensorFlow image. This will give us access to every library we need to execute the evaluation script because the training TensorFlow image also includes Scikit-Learn. An alternative could be creating a custom image that includes the libraries we need.

You can use the [sagemaker.image_uris.retrieve()](https://sagemaker.readthedocs.io/en/stable/api/utility/image_uris.html) function for generating the URI of pre-built docker images.

In [31]:
from sagemaker.processing import ScriptProcessor

# Let's retrieve the image we want to use to run the
# processing job.
image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="1.11.0",
    py_version="py38",
    image_scope="training",
    instance_type="ml.m5.large"
)

# We can now setup the processor using the URI of
# the pre-built docker image.
evaluation_script_processor = ScriptProcessor(
    base_job_name="mnist-evaluation-processor",
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.t3.medium",
    instance_count=1,
    role=role,
)

## Step 5 - Configuring the Model Input

One of the inputs we need to provide to the Processing Step that runs the evaluation script is the model we created. Currently, we create a model using either a Training Step or a Tuning Step, so we can use the `USE_TUNING_STEP` flag to configure the input to the Processing Step.

In case we are using the Tuning Step, we can use the [TuningStep.get_top_model_s3_uri()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri) function to get the model artifacts from the top performing training job of the Hyperparameter Tuning Job.

In [32]:
# This is the input in case we want to use the best model generated
# by the Tuning Step.
tuning_model_input = ProcessingInput(
    source=tuning_step.get_top_model_s3_uri(
        top_k=0, 
        s3_bucket=sagemaker_session.default_bucket()
    ),
    destination="/opt/ml/processing/model",
)

# This is the input in case we want to use the trained model
# from the Training Step.
training_model_input = ProcessingInput(
    source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    destination="/opt/ml/processing/model"
)

# We can now select the appropriate input depending on which step
# we are using.
model_input = tuning_model_input if USE_TUNING_STEP else training_model_input

## Step 6 - Setting up a Processing Step

We can now create a [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) to run the evaluation script. We'll use the [ScriptProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor) we defined before. 

The inputs of this step will be the model we created and the test set that we generated during the preprocessing step. The output will be the evaluation report file.

The [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) lets us specify a list of [PropertyFile](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.properties.PropertyFile) instances from the output of the job. We can use this to map the evaluation report that we generate in the evaluations script. Check [How to Build and Manage Property Files](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html) for more information.

In [33]:
# We want to map the evaluation report that we generate inside
# the evaluation script so we can later reference it.
evaluation_report = PropertyFile(
    name="evaluation-report",
    output_name="evaluation",
    path="evaluation.json"
)


# Notice how this step uses the model generated by the tuning or training
# step, and the test set generated by the preprocessing step.
evaluation_step = ProcessingStep(
    name="evaluation",
    processor=evaluation_script_processor,
    inputs=[
        model_input,
        ProcessingInput(
            source=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=evaluation_destination),
    ],
    code="evaluation.py",
    property_files=[evaluation_report],
    cache_config=cache_config
)

## Step 7 - Running the Pipeline

We can now add the model evaluation step to the pipeline.

We are going to configure the pipeline to run the Tuning Step or the Training Step depending on the value of the `USE_TUNING_STEP` flag.

In [34]:
session3_pipeline = Pipeline(
    name="mnist-session3-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        evaluation_destination,
    ],
    steps=[
        preprocess_step, 
        tuning_step if USE_TUNING_STEP else training_step,
        evaluation_step
    ]
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does.

In [46]:
session3_pipeline.upsert(role_arn=role)
execution = session3_pipeline.start()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


## Step 7 - Cleaning up

Before you finish, don't forget to clean up after you.

In [63]:
# session3_pipeline.delete()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:629171115321:pipeline/mnist-session3-pipeline',
 'ResponseMetadata': {'RequestId': 'ec57203f-ff3c-4407-8937-1591f4b88a69',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ec57203f-ff3c-4407-8937-1591f4b88a69',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '91',
   'date': 'Tue, 25 Apr 2023 15:59:06 GMT'},
  'RetryAttempts': 0}}

## Resources

1. Check the [ScriptProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor) SageMaker's SDK documentation for more information about how to run a processing job using a machine learning framework.

2. SageMaker offers a list of pre-built docker images. You can use the [sagemaker.image_uris.retrieve()](https://sagemaker.readthedocs.io/en/stable/api/utility/image_uris.html) function for generating the URI of these images.

3. You can use the [TuningJob.get_top_model_s3_uri()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri) function to get the model artifacts from the top performing training jobs of the hyperparameter tuning job.

4. Check [How to Build and Manage Property Files](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html) for more information about mapping the output of a [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) to a [PropertyFile](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.properties.PropertyFile).

# Session 4 - Model Registration

This session extends the [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) with a step to register a new model if it reaches a predefined accuracy threshold. We'll use a [Condition Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) to determine whether the model's accuracy is above a threshold and a [Model Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model) to register the model. After we register the model, we'll deploy it manually. To learn more about the Model Registry, check [Register and Deploy Models with Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html).

Here is what the Pipeline will look like at the end of this session:
![image](https://user-images.githubusercontent.com/30827903/234328144-77acc0e0-75d5-4240-9327-cb9791c6a23b.png)


In [35]:
import time

from sagemaker import ModelPackage
from sagemaker.model import Model
from sagemaker.pytorch.model import PyTorchModel
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.predictor import Predictor
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.functions import Join

## Step 1 - Approval and Threshold Configuration

We are going to use two new [Pipeline Parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html) in our pipeline:

* `model_approval_status`: This parameter represents the default approval status we will use when registering a new model. Check [Update the Approval Status of a Model](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-approve.html) for more information about the different approval status of a model and how you can update them.
* `accuracy_threshold`: This parameter represents the minimum accuracy that the model should reach for it to be registered.

In [36]:
model_approval_status = ParameterString(
    name="model_approval_status", 
    default_value="Approved"
)

accuracy_threshold = ParameterFloat(
    name="accuracy_threshold", 
    default_value=0.70
)

## Step 2 - Configuring the Model Assets

We need to specify the location of the model assets to register a model. Currently, we create a model using either a Training Step or a Tuning Step, so we can use the `USE_TUNING_STEP` flag to configure the model.

In case we are using the Tuning Step, we can use the [TuningStep.get_top_model_s3_uri()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri) function to get the model artifacts from the top performing training job of the Hyperparameter Tuning Job.

In [37]:
# This is the model data in case we want to use the best model generated
# by the Tuning Step.
tuning_model_data = tuning_step.get_top_model_s3_uri(
    top_k=0, 
    s3_bucket=sagemaker_session.default_bucket()
)

# This is the model data in case we want to use the trained model
# from the Training Step.
training_model_data = training_step.properties.ModelArtifacts.S3ModelArtifacts

# We can now select the appropriate model data depending on which step
# we are using.
model_data = tuning_model_data if USE_TUNING_STEP else training_model_data

## Step 3 - Configuring the Model

The model we trained uses TensorFlow, so we can use the built-in [TensorFlowModel](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/sagemaker.tensorflow.html#tensorflow-serving-model) class to create an instance of the model.

In [38]:
model = PyTorchModel(
    model_data=model_data,
    framework_version="1.11.0",
    py_version="py38",
    sagemaker_session=PipelineSession(),
    role=role,
)

## Step 4 - Setting up the Model Metrics

When we register a model, we can specify a set of [ModelMetrics](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_metrics.ModelMetrics). We can use the evaluation report we generated during the 
Evaluation step to populate these statistics.

In [39]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(on="/", values=[
            evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'],
            "evaluation.json"]
        ),
        content_type="application/json",
    )
)

## Step 5 - Setting up a Model Step

We can now create a [Model Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model) to register the model. Check the [ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep) SageMaker's SDK documentation for more information. We aim to create a new version of the model and register it in the Model Registry. Check [Register a Model Version](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-version.html) for more information about model registration.

This step will use the `Model` instance we configured before.

In [40]:
model_package_group_name = "mnist-model-package-group"

register_model_step = ModelStep(
    name="register-model",
    step_args=model.register(
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
        approval_status=model_approval_status,
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.m5.large"],
        transform_instances=["ml.m5.large"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="PYTORCH",
        framework_version="1.11.0",
    ),
)



## Step 6 - Setting up a Condition Step

We only want to register a new model if its accuracy exceeds a predefined threshold. We can use a [Condition Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) together with the evaluation report we generated in the Evaluation step to accomplish this. Check the [ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#conditionstep) SageMaker's SDK documentation for more information.

In this example, we will use a [ConditionGreaterThanOrEqualTo](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.conditions.ConditionGreaterThanOrEqualTo) condition to compare the model's accuracy with the threshold. Look at the [Conditions](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#conditions) section in the documentation for more information about the types of supported conditions.

If the model's accuracy is not greater than or equal our threshold, we will send the pipeline to a [Fail Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-fail) with the appropriate error message. Check the [FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep) SageMaker's SDK documentation for more information.

In [41]:
condition_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value"
    ),
    right=accuracy_threshold
)

fail_step = FailStep(
    name="fail",
    error_message=Join(
        on=" ", 
        values=[
            "Execution failed because the model's accuracy was lower than", 
            accuracy_threshold
        ]
    ),
)

condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition_gte],
    if_steps=[register_model_step],
    else_steps=[fail_step], 
)

## Step 7 - Running the Pipeline

We can now add the registration of the model to the pipeline. Notice how we add the Condition Step, which will call the Model Step if the condition passes.

In [42]:
session4_pipeline = Pipeline(
    name="mnist-session4-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        evaluation_destination,
        model_approval_status,
        accuracy_threshold,
    ],
    steps=[
        preprocess_step, 
        tuning_step if USE_TUNING_STEP else training_step, 
        evaluation_step,
        condition_step
    ],
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist or update it if it does.

In [62]:
# session4_pipeline.upsert(role_arn=role)
# execution = session4_pipeline.start()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource




In [65]:
# session4_pipeline.delete()

### Step 8 - Loading the Latest Approved Model

Now that we registered the model, we can load the latest approved model from the Model Registry to deploy it to an endpoint.

We can use `boto3` to query the list of approved models and get the latest one. Check the [boto3 SageMaker Client API](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html) for a list of every available method.

In [43]:
def get_latest_approved_model_package(model_package_group_name):
    """
    Returns the latest approved model package registered under the 
    specified model package group.
    """
    try:
        # We can use the boto3 SageMaker's API to list the existing
        # model packages with the specified name. We only care about
        # approved models.
        response = sagemaker_client.list_model_packages(
            ModelPackageGroupName=model_package_group_name,
            ModelApprovalStatus="Approved",
            SortBy="CreationTime",
            MaxResults=100,
        )
        approved_packages = response["ModelPackageSummaryList"]

        # If we get a NextToken back, we need to deal with pagination.
        while len(approved_packages) == 0 and "NextToken" in response:
            response = sagemaker_client.list_model_packages(
                ModelPackageGroupName=model_package_group_name,
                ModelApprovalStatus="Approved",
                SortBy="CreationTime",
                MaxResults=100,
                NextToken=response["NextToken"],
            )
            approved_packages.extend(response["ModelPackageSummaryList"])

        if len(approved_packages) == 0:
            print(f"No approved model pacakages for \"{model_package_group_name}\"")
            return None

        # At this point we identified the latest approved model,
        # so we can return it.
        print(f"Latest approved model package: {approved_packages[0]['ModelPackageArn']}")
        return approved_packages[0]

    except ClientError as e:
        print(e.response["Error"]["Message"])
        raise Exception(e.response["Error"]["Message"])


We can now use the `get_latest_approved_model_package()` function to get the latest approved model from the Model Registry.

In [44]:
approved_model_package = get_latest_approved_model_package(model_package_group_name)
model_description = None

if approved_model_package:
    approved_model_package_arn = approved_model_package["ModelPackageArn"]

    model_description = sagemaker_client.describe_model_package(
        ModelPackageName=approved_model_package_arn
    )

model_description

Latest approved model package: arn:aws:sagemaker:us-east-1:629171115321:model-package/mnist-model-package-group/1


{'ModelPackageGroupName': 'mnist-model-package-group',
 'ModelPackageVersion': 1,
 'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:629171115321:model-package/mnist-model-package-group/1',
 'CreationTime': datetime.datetime(2023, 4, 25, 16, 32, 10, 739000, tzinfo=tzlocal()),
 'InferenceSpecification': {'Containers': [{'Image': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.11.0-cpu-py38',
    'ImageDigest': 'sha256:60beb640d60a3b59a8304aa622a07d606a7f89e0714484112e774094cccbcbc1',
    'ModelDataUrl': 's3://sagemaker-us-east-1-629171115321/2lgi4hp8zdbx-tuning-S1HfDUSlGr-003-6bc1b6c9/output/model.tar.gz',
    'Environment': {'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
     'SAGEMAKER_REGION': 'us-east-1'},
    'Framework': 'PYTORCH',
    'FrameworkVersion': '1.11.0'}],
  'SupportedTransformInstanceTypes': ['ml.m5.large'],
  'SupportedRealtimeInferenceInstanceTypes': ['ml.m5.large'],
  'SupportedContentTypes': ['text/csv'],
  'SupportedResponseMIMETypes': ['text/csv']},
 'Mod

## Step 9 - Deploying the Model

We can now deploy the latest approved model to an endpoint.

Using the ARN of the model package from the Model Registry, we can deploy the model by creating a [ModelPackage](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.ModelPackage) instance and calling its `deploy()` method. The model information lives in the Model Registry, so we don't need to specify anything else.

In [45]:
# model_package = ModelPackage(
#     model_package_arn=approved_model_package_arn, 
#     sagemaker_session=sagemaker_session,
#     role=role, 
# )

# # We can use the current time to generate a unique signature
# # to generate new assets every time we deploy the model.
# signature = time.strftime("%m%d%H%M%S", time.localtime())
# endpoint_name = "mnist-endpoint-" + signature

# model_package.deploy(
#     endpoint_name=endpoint_name,
#     initial_instance_count=1, 
#     instance_type="ml.m5.large", 
# )

INFO:sagemaker:Creating model with name: 1-2023-04-26-14-37-00-173
INFO:sagemaker:Creating endpoint-config with name mnist-endpoint-0426143700
INFO:sagemaker:Creating endpoint with name mnist-endpoint-0426143700


-------!

In [58]:
from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorchModel
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

In [72]:
%%writefile infer.py

import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as T
import torch.nn.functional as F
import json

device = "cuda" if torch.cuda.is_available() else "cpu"

class DigitClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super().__init__()
        self.hidden_layer_1 = nn.Linear(input_size, hidden_size) 
        self.activation_1 = nn.ReLU()
        self.hidden_layer_2 = nn.Linear(hidden_size, hidden_size) 
        self.activation_2 = nn.ReLU()
        self.output_layer = nn.Linear(hidden_size, num_classes)
        self.probabilities = nn.Softmax(dim=0)
    
    def forward(self, x):
        hidden_1 = self.hidden_layer_1(x)
        hidden_activated_1 = self.activation_1(hidden_1)
        hidden_2 = self.hidden_layer_2(hidden_activated_1)
        hidden_activated_2 = self.activation_2(hidden_2)
        out_layer = self.output_layer(hidden_activated_2)
        return out_layer
    
    
# load model
def model_fn(model_dir):
    model = DigitClassifier(input_size=784, hidden_size=10, num_classes=10)
    model.load_state_dict(torch.load(Path(model_dir) / "001" / "model.pt"))
    model.to(device).eval()
    
    return model

# data preprocessing
def input_fn(request_body, request_content_type):
    assert request_content_type == "application/json"
    data = json.loads(request_body)["inputs"]
    data = np.array(data).reshape(1, -1)
    print("Data shape: ", data.shape)
    return data


# inference
def predict_fn(input_object, model):
    input_object = torch.from_numpy(input_object).float() # tensors
    with torch.no_grad():
        prediction = model(input_object).detach().cpu().numpy()
        prediction = np.argmax(predictions, axis=-1)
    
    print("Prediction:", prediction)
    outputs = {"Predictions":prediction}
    
    return prediction


# postprocess
def output_fn(outputs, content_type):
    assert content_type == "application/json"
    # Print top categories per image
    return outputs


Overwriting infer.py


In [73]:
model = PyTorchModel(
    entry_point="infer.py",
    # source_dir="",
    role=get_execution_role(),
    model_data="s3://sagemaker-us-east-1-629171115321/2lgi4hp8zdbx-tuning-S1HfDUSlGr-003-6bc1b6c9/output/model.tar.gz",
    framework_version="1.11.0",
    py_version="py38",
)

In [74]:
predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

INFO:sagemaker:Creating model with name: pytorch-inference-2023-04-26-15-40-59-182
INFO:sagemaker:Creating endpoint-config with name pytorch-inference-2023-04-26-15-40-59-972
INFO:sagemaker:Creating endpoint with name pytorch-inference-2023-04-26-15-40-59-972


------!

## Step 10 - Testing the Endpoint

Using a [Predictor](https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html#sagemaker.predictor.Predictor) from the endpoint name, we can test our model.

In [75]:
import numpy as np
from PIL import Image
import requests
from io import BytesIO

from sagemaker.pytorch import PyTorchPredictor

In [79]:
predictor_new = PyTorchPredictor(
    endpoint_name="pytorch-inference-2023-04-26-15-40-59-972",
    sagemaker_session=sagemaker_session,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

In [80]:
def predict(source):
    if "http" in source:
        # !wget -O image.jpg $source
        response = requests.get(source)
        im = Image.open(BytesIO(response.content))

    else:
        im = Image.open(source).convert("RGB")

    inputs = {"inputs": np.array(im)}

    out = predictor_new.predict(inputs)
    print(out)
    im.show()

In [81]:
predict("mnist/img_1.jpg")

ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (0) from primary with message "Your invocation timed out while waiting for a response from container primary. Review the latency metrics for each container in Amazon CloudWatch, resolve the issue, and try again.". See https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws/sagemaker/Endpoints/pytorch-inference-2023-04-26-15-40-59-972 in account 629171115321 for more information.

In [46]:
# read the test image csv
test_df = pd.read_csv("test/test.csv", header=None)

In [47]:
label_x = test_df.iloc[0,0].astype('int')

In [48]:
label_x

0

In [49]:
x = test_df.iloc[0,1:].values

In [50]:
x.shape

(784,)

In [55]:
# read the test image csv
test_df = pd.read_csv("test/test.csv", header=None)
label_x = test_df.iloc[0,0].astype('int')
x = test_df.iloc[0,1:].values

predictor = Predictor(endpoint_name=endpoint_name)

# The payload we need to provide the model is in CSV format. Notice how the model expects data that's
# already transformed. We can't provide the original data from our dataset because the model will not
# work with it.
# payload = "0.6569590202313976, -1.0813829646495108, 1.2097102831892812, 0.9226343641317372, 1.0, 0.0, 0.0"
payload = ','.join(['%.2f' % num for num in x]) # read as: np.fromstring(VIstring, sep=',')
payload = str(label_x) + ', ' + str(payload)
response = predictor.predict(payload, initial_args={"ContentType": "text/csv"})

# We can decode the output of the endpoint and print the "predictions" key.
predictions = json.loads(response.decode("utf-8"))["predictions"]
print(f"Prediction: {np.argmax(predictions, axis=1)[0]}")

KeyboardInterrupt: 

## Step 11 - Cleaning up

Before you finish, don't forget to clean up after yourself.

In [56]:
# predictor.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: mnist-endpoint-0426143700
INFO:sagemaker:Deleting endpoint with name: mnist-endpoint-0426143700


### Session 5 - Model Deployment

This session extends the [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) with a step to deploy the model to an endpoint automatically. We'll use a [Lambda Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-lambda) to create an endpoint and deploy the model. To control the endpoint's inputs and outputs, we'll modify the model's assets to include code that customizes the processing of a request. 

In the previous session, we deployed the model to an endpoint that expects the input to be in CSV format. Instead, we want our endpoint to handle unprocessed data in JSON format. Here is an example of the payload we want the endpoint to support:

```
{
    "island": "Biscoe",
    "culmen_length_mm": 48.6,
    "culmen_depth_mm": 16.0,
    "flipper_length_mm": 230.0,
    "body_mass_g": 5800.0,
}
```

At the end of this session, our Pipeline will look like this:

![image](https://user-images.githubusercontent.com/30827903/234338997-0afd4c84-9939-4992-903a-0fb1c9c5e4d6.png)