# Pipeline of Digits

This is a starting notebook for solving the "Pipeline of Digits" assignment.


This notebook was created by [Santiago L. Valdarrama](https://twitter.com/svpino) as part of the [Machine Learning School](https://www.ml.school) program.

Let's make sure we are running the latest version of the SakeMaker's SDK. **Restart the notebook** after you upgrade the library.

In [2]:
!pwd
!pip install -q --upgrade pip
!pip install -q --upgrade awscli boto3
!pip install -q --upgrade PyYAML==6.0
!pip install -q --upgrade sagemakersagemaker==2.165.0
!pip show sagemaker

/root/ml.school/mnist
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
sagemaker 2.165.0 requires PyYAML==6.0, but you have pyyaml 5.4.1 which is incompatible.[0m[31m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
awscli 1.27.159 requires PyYAML<5.5,>=3.10, but you have pyyaml 6.0 which is incompatible.[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement sagemakersagemaker==2.165.0 (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for sagemakersagemaker==2.165.0[0m[31m
[0mName: sagemaker
Version: 2.165.0
Summary: Open source library for training and deploying models on Amazon SageMaker.
Home-page: https://github.com/aws/sagemaker-python-sdk/
Author: Amazon W

In [None]:
%load_ext autoreload
%autoreload 2

Let's extract the `dataset.tar.gz` file.

In [None]:
from pathlib import Path

MNIST_FOLDER = "mnist"
DATASET_FOLDER =  Path("dataset")

!tar -xvzf dataset.tar.gz  --no-same-owner

## Loading the dataset

We have two CSV files containing the MNIST dataset. These files come from the [MNIST in CSV](https://www.kaggle.com/datasets/oddrationale/mnist-in-csv) Kaggle dataset.

The `mnist_train.csv` file contains 60,000 training examples and labels. The `mnist_test.csv` contains 10,000 test examples and labels. Each row consists of 785 values: the first value is the label (a number from 0 to 9) and the remaining 784 values are the pixel values (a number from 0 to 255).

Let's load the first 10 rows of the test set.

In [None]:
import pandas as pd

LOCAL_FILEPATH = DATASET_FOLDER / "mnist_train.csv"

df = pd.read_csv(LOCAL_FILEPATH, nrows=10)
df

In [None]:
LOCAL_TEST_FILEPATH = DATASET_FOLDER / "mnist_test.csv"

dfTest = pd.read_csv(LOCAL_TEST_FILEPATH, nrows=10)
dfTest

## Creating the S3 Bucket

Let's create an S3 bucket where you will upload all the information generated by the pipeline. Make sure you set `BUCKET` to the name of the bucket you want to use. This name has to be unique.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

In [None]:
BUCKET = "julianj-mlschool"
region = "us-west-2"
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region

## Uploading dataset to S3

In [None]:
import boto3
import sagemaker

S3_FILEPATH = f"s3://{BUCKET}/{MNIST_FOLDER}"

TRAIN_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(LOCAL_FILEPATH), 
    desired_s3_uri=S3_FILEPATH,
)

TEST_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(LOCAL_TEST_FILEPATH), 
    desired_s3_uri=S3_FILEPATH,
)

print(f"Train set S3 location: {TRAIN_SET_S3_URI}")
print(f"Test set S3 location: {TEST_SET_S3_URI}")

## Step 1 - Preprocessing the Dataset

Let's create a script to do feature engineering on the original dataset. We will run this script using a SageMaker Processing Job later in this session.

The script should:

1. Split the train set as follows:
    1. 80% into a new train set
    2. 20% into a validation test.
    3. How are we splitting the data? using [np.split()](https://numpy.org/doc/stable/reference/generated/numpy.split.html)?
2. Save the ?Scikit-Learn? pipeline that we use to preprocess the data to use it during inference time.
    1. Do we have a list of target classes or other generated data that needs to be saved?


**Notes from the penguins:
Pay special attention to the way the Scikit-Learn pipeline `preprocessor` is used to process the three sets:

* First, we use the `fit_transform()` to fit the pipeline on the train set.
* Then, we consecutively transform the validation and test sets using `transform()`.

Always use `fit_transform()` on the training data to fit the scaling parameters we need to transform the data. For example, `fit_transform()` will learn the mean and variance of the features of the training set. It can then use these same parameters to scale the validation and test sets.

That's why we want to save this Scikit-Learn pipeline to use later to scale production data using the same parameters we learned on the train set.

#### Setup the notebook environment

In [None]:
import os
import numpy as np
import boto3
import json
import numpy as np
import argparse
import tempfile

from botocore.exceptions import ClientError
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig

sagemaker_client = boto3.client("sagemaker")
sagemaker_session = sagemaker.session.Session()
iam_client = boto3.client("iam")
role = sagemaker.get_execution_role()
region = boto3.Session().region_name


In [None]:
%%writefile preprocessor.py

import os
import numpy as np
import pandas as pd
import tempfile

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from pickle import dump


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIRECTORY = "/opt/ml/processing"
DATA_FILEPATH = Path(BASE_DIRECTORY) / "input" / "data.csv"
DATA_TEST_FILEPATH = Path(BASE_DIRECTORY) / "input" / "test.csv"

def _save_splits(base_directory, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """
    
    train_path = Path(base_directory) / "train" 
    validation_path = Path(base_directory) / "validation" 
    test_path = Path(base_directory) / "test"
    
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)
    
    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)
    

def _save_pipeline(base_directory, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_directory) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", 'wb'))
    
def _generate_baseline_dataset(split_name, base_directory, X, y):
    """
    To monitor the data and the quality of our model we need to compare the 
    production quality and results against a baseline. To create those baselines, 
    we need to use a dataset to compute statistics and constraints. That dataset
    should contain information in the same format as expected by the production
    endpoint. This function will generate a baseline dataset and save it to 
    disk so we can later use it.
    
    """
    print(f"base_directory:{base_directory}")
    baseline_path = Path(base_directory) / f"{split_name}-baseline" 
    print(f"baseline_path:{baseline_path}")
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = X.copy()
    
    # The baseline dataset needs a column containing the groundtruth.
    df["groundtruth"] = y
    df["groundtruth"] = df["groundtruth"].values.astype(str)
    
    # We will use the baseline dataset to generate baselines
    # for monitoring data and model quality. To simplify the process, 
    # we don't want to include any NaN rows.
    df = df.dropna()

    df.to_json(baseline_path / f"{split_name}-baseline.json", orient='records', lines=True)
    

def preprocess(base_directory, data_filepath, data_test_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train, validation,
    and a test set.
    """
    
    df = pd.read_csv(data_filepath, nrows=10000)
    dfTest = pd.read_csv(data_test_filepath)
    
    numerical_columns = [column for column in df.columns if df[column].dtype in ["int64", "float64"]][1:]
    
    numerical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="mean")),
        ("scaler", StandardScaler())
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ("numerical", numerical_preprocessor, numerical_columns)
        ]
    )
       
    columns = list(df.columns)

    X = df.to_numpy()
    np.random.shuffle(X)
    train, validation = np.split(X, [int(.8 * len(X))])
    
    Y = dfTest.to_numpy()
    test = Y
    
    X_train = pd.DataFrame(train, columns=columns)
    X_validation = pd.DataFrame(validation, columns=columns)
    X_test = dfTest
    
    y_train = X_train.label
    y_validation = X_validation.label
    y_test = X_test.label
    
    X_train.drop(["label"], axis=1, inplace=True)
    X_validation.drop(["label"], axis=1, inplace=True)
    X_test.drop(["label"], axis=1, inplace=True)

    # Let's generate a dataset that we can later use to compute
    # baseline statistics and constraints about the data that we
    # used to train our model.
    _generate_baseline_dataset("train", base_directory, X_train, y_train)
    
    # To generate baseline constraints about the quality of the
    # model's predictions, we will use the test set.
    _generate_baseline_dataset("test", base_directory, X_test, y_test)
    
    # Transform the data using the Scikit-Learn pipeline.
    X_train = preprocessor.fit_transform(X_train)
    X_validation = preprocessor.transform(X_validation)
    X_test = preprocessor.transform(X_test)
    
    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)
    
    _save_splits(base_directory, train, validation, test)
    _save_pipeline(base_directory, pipeline=preprocessor)
    
if __name__ == "__main__":
    preprocess(BASE_DIRECTORY, DATA_FILEPATH, DATA_TEST_FILEPATH)

In [None]:
from preprocessor import preprocess
import tempfile


with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        data_filepath=LOCAL_FILEPATH,
        data_test_filepath=LOCAL_TEST_FILEPATH 
    )
    
    print(f"Folders: {os.listdir(directory)}")