# Pipeline of Digits

This is a starting notebook for solving the "Pipeline of Digits" assignment...


This notebook was created by [Santiago L. Valdarrama](https://twitter.com/svpino) as part of the [Machine Learning School](https://www.ml.school) program.

Let's make sure we are running the latest version of the SakeMaker's SDK. **Restart the notebook** after you upgrade the library.

In [2]:
!pip install -q --upgrade awscli
!pip install -q --upgrade pip
!pip install -q --upgrade sagemaker
!pip show sagemaker

[0mName: sagemaker
Version: 2.151.0
Summary: Open source library for training and deploying models on Amazon SageMaker.
Home-page: https://github.com/aws/sagemaker-python-sdk/
Author: Amazon Web Services
Author-email: 
License: Apache License 2.0
Location: /opt/conda/lib/python3.9/site-packages
Requires: attrs, boto3, cloudpickle, google-pasta, importlib-metadata, jsonschema, numpy, packaging, pandas, pathos, platformdirs, protobuf, protobuf3-to-dict, PyYAML, schema, smdebug-rulesconfig, tblib
Required-by: 


In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
import boto3
import sagemaker
import pandas as pd

from pathlib import Path

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

## Creating the S3 Bucket

Let's create an S3 bucket where you will upload all the information generated by the pipeline. Make sure you set `BUCKET` to the name of the bucket you want to use. This name has to be unique.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

In [5]:
BUCKET = "mlschool-raluca"

#!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region

In [6]:
region

'eu-west-2'

## Loading the dataset

We have two CSV files containing the MNIST dataset. These files come from the [MNIST in CSV](https://www.kaggle.com/datasets/oddrationale/mnist-in-csv) Kaggle dataset.

The `mnist_train.csv` file contains 60,000 training examples and labels. The `mnist_test.csv` contains 10,000 test examples and labels. Each row consists of 785 values: the first value is the label (a number from 0 to 9) and the remaining 784 values are the pixel values (a number from 0 to 255).

Let's extract the `dataset.tar.gz` file.

In [7]:
MNIST_FOLDER = "mnist"
DATASET_FOLDER = Path(MNIST_FOLDER) / "dataset"

In [8]:
#!tar -xvzf dataset.tar.gz -C $MNIST_FOLDER --no-same-owner

Let's load the first 10 rows of the test set.

In [9]:
df = pd.read_csv(DATASET_FOLDER / "mnist_train.csv", nrows=10)
df

Unnamed: 0,label,1x1,1x2,1x3,1x4,1x5,1x6,1x7,1x8,1x9,...,28x19,28x20,28x21,28x22,28x23,28x24,28x25,28x26,28x27,28x28
0,5,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,9,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,2,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
6,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
7,3,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
8,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
9,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


## Uploading dataset to S3

In [10]:
# S3_FILEPATH = f"s3://{BUCKET}/{MNIST_FOLDER}"


# TRAIN_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
#     local_path=str(DATASET_FOLDER / "mnist_train.csv"), 
#     desired_s3_uri=S3_FILEPATH,
# )

# TEST_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
#     local_path=str(DATASET_FOLDER / "mnist_test.csv"), 
#     desired_s3_uri=S3_FILEPATH,
# )

# print(f"Train set S3 location: {TRAIN_SET_S3_URI}")
# print(f"Test set S3 location: {TEST_SET_S3_URI}")

In [11]:
role = sagemaker.get_execution_role()
role

'arn:aws:iam::952844543179:role/service-role/AmazonSageMakerServiceCatalogProductsExecutionRole'

## Assignment 1: Preprocessing the Dataset

Set up a SageMaker pipeline for the "Pipeline of Digits" project.

Create a Processing Step where you split 20% off the MNIST train set to use as a validation set.

In [12]:
%%writefile {MNIST_FOLDER}/preprocessor.py


import os
import numpy as np
import pandas as pd
import tempfile

from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import LabelEncoder, StandardScaler
from pickle import dump


# This is the location where the SageMaker Processing job will save the input dataset.
BASE_DIR = "/opt/ml/processing"
TRAIN_FILEPATH = Path(BASE_DIR) / "input" / "mnist_train.csv"
TEST_FILEPATH = Path(BASE_DIR) / "input" / "mnist_test.csv"


def save_splits(base_dir, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """
    train_path = Path(base_dir) / "train" 
    validation_path = Path(base_dir) / "validation" 
    test_path = Path(base_dir) / "test"
    
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)
    
    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)
    
    
def save_pipeline(base_dir, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to preprocess the data.
    """
    pipeline_path = Path(base_dir) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", 'wb'))

    
def generate_baseline_dataset(split_name, base_dir, X, y):
    """
    To monitor the data and the quality of our model we need to compare the 
    production quality and results against a baseline. To create those baselines, 
    we need to use a dataset to compute statistics and constraints. That dataset
    should contain information in the same format as expected by the production
    endpoint. This function will generate a baseline dataset and save it to 
    disk so we can later use it.
    
    """
    baseline_path = Path(base_dir) / f"{split_name}-baseline" 
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = X.copy()
    df = pd.DataFrame(df)
    df = df.reset_index(drop=True)
    
    # The baseline dataset needs a column containing the groundtruth.
    df["groundtruth"] = y.ravel()
    df["groundtruth"] = df["groundtruth"].values.astype(str)
    
    # We will use the baseline dataset to generate baselines
    # for monitoring data and model quality. To simplify the process, 
    # we don't want to include any NaN rows.
    df = df.dropna()

    df.to_json(baseline_path / f"{split_name}-baseline.json", orient='records', lines=True)    
    
    
    
def preprocess(base_dir, train_filepath, test_filepath):
    """
    Preprocesses the supplied train and test datasets and splits the train set into train and validation sets.
    """   
    
    # read the data
    train_df = pd.read_csv(train_filepath)
    test_df = pd.read_csv(test_filepath)
    
    # performs normalization on the input data by dividing by 255
    X, y = train_df.drop('label', axis=1).values / 255., train_df['label'].values.astype(int)
    X_test, y_test = test_df.drop('label', axis=1).values / 255., test_df['label'].values.astype(int)
    
    # split trainset into train and validation sets
    X_train, X_validation, y_train, y_validation = train_test_split(X, y, test_size=0.2, random_state=42)
    
    
    # Define the sk-learn pipeline to apply the preprocessor   
    numerical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="mean")),
        ('scaler', StandardScaler())
    ])
    
    numerical_features = np.arange(X_train.shape[1])
    preprocessor = ColumnTransformer(
        transformers=[
            ("numerical", numerical_preprocessor, numerical_features)
        ]
    )

    # label encoding
    label_encoder = LabelEncoder()
    y_train = label_encoder.fit_transform(y_train)
    y_validation = label_encoder.transform(y_validation)
    y_test = label_encoder.transform(y_test)
    
        
    # Let's generate a dataset that we can later use to compute
    # baseline statistics and constraints about the data that we
    # used to train our model.
    generate_baseline_dataset("train", base_dir, X_train, y_train)
    
    # To generate baseline constraints about the quality of the
    # model's predictions, we will use the test set.
    generate_baseline_dataset("test", base_dir, X_test, y_test)
    
    
    # Fit the preprocessor on the training set and transform the training, validation, and testing sets
    X_train = preprocessor.fit_transform(X_train)
    X_validation = preprocessor.transform(X_validation)
    X_test = preprocessor.transform(X_test)
    
    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)    
    
    save_splits(base_dir, train, validation, test)
    save_pipeline(base_dir, pipeline=preprocessor)
    
    
if __name__ == "__main__":
    preprocess(BASE_DIR, TRAIN_FILEPATH, TEST_FILEPATH)

Overwriting mnist/preprocessor.py


## Testing the Preprocessing Script
We can now load the script we just created and run it locally to ensure it outputs every file we need.

We will set up a SageMaker Processing Job to run this script, but we always want to test the code locally. In this case, we can call the `preprocess()` function with the local directory and the local copy of the dataset.

In [13]:
from mnist.preprocessor import preprocess
import tempfile
import os


LOCAL_TRAIN_FILEPATH = DATASET_FOLDER / "mnist_train.csv"
LOCAL_TEST_FILEPATH = DATASET_FOLDER / "mnist_test.csv"


def print_baseline(split_name):
    print()
    print(f"Baseline {split_name}:")
    with open(Path(directory) / f"{split_name}-baseline" / f"{split_name}-baseline.json") as baseline:
        lines = [next(baseline) for _ in range(5)]
        
    for l in lines:
        print(l[:-1])
    

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_dir=directory, 
        train_filepath=LOCAL_TRAIN_FILEPATH,
        test_filepath=LOCAL_TEST_FILEPATH
    )
    print(f"Folders: {os.listdir(directory)}")
    
    print_baseline("train")
    print_baseline("test")

Folders: ['train-baseline', 'test-baseline', 'train', 'validation', 'test', 'pipeline']

Baseline train:
{"0":0.0,"1":0.0,"2":0.0,"3":0.0,"4":0.0,"5":0.0,"6":0.0,"7":0.0,"8":0.0,"9":0.0,"10":0.0,"11":0.0,"12":0.0,"13":0.0,"14":0.0,"15":0.0,"16":0.0,"17":0.0,"18":0.0,"19":0.0,"20":0.0,"21":0.0,"22":0.0,"23":0.0,"24":0.0,"25":0.0,"26":0.0,"27":0.0,"28":0.0,"29":0.0,"30":0.0,"31":0.0,"32":0.0,"33":0.0,"34":0.0,"35":0.0,"36":0.0,"37":0.0,"38":0.0,"39":0.0,"40":0.0,"41":0.0,"42":0.0,"43":0.0,"44":0.0,"45":0.0,"46":0.0,"47":0.0,"48":0.0,"49":0.0,"50":0.0,"51":0.0,"52":0.0,"53":0.0,"54":0.0,"55":0.0,"56":0.0,"57":0.0,"58":0.0,"59":0.0,"60":0.0,"61":0.0,"62":0.0,"63":0.0,"64":0.0,"65":0.0,"66":0.0,"67":0.0,"68":0.0,"69":0.0,"70":0.0,"71":0.0,"72":0.0,"73":0.0,"74":0.0,"75":0.0,"76":0.0,"77":0.0,"78":0.0,"79":0.0,"80":0.0,"81":0.0,"82":0.0,"83":0.0,"84":0.0,"85":0.0,"86":0.0,"87":0.0,"88":0.0,"89":0.0,"90":0.0,"91":0.0,"92":0.0,"93":0.0,"94":0.0,"95":0.0,"96":0.0,"97":0.0,"98":0.0,"99":0.0,"100