# Pipeline of Digits

This is a starting notebook for solving the "Pipeline of Digits" assignment.


This notebook was created by [Santiago L. Valdarrama](https://twitter.com/svpino) as part of the [Machine Learning School](https://www.ml.school) program.

Let's make sure we are running the latest version of the SakeMaker's SDK. **Restart the notebook** after you upgrade the library.

In [2]:
!pip install -q --upgrade awscli
!pip install -q --upgrade pip
!pip install -q --upgrade sagemaker
!pip show sagemaker

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
sagemaker 2.145.0 requires importlib-metadata<5.0,>=1.4.0, but you have importlib-metadata 6.3.0 which is incompatible.
aiobotocore 2.4.2 requires botocore<1.27.60,>=1.27.59, but you have botocore 1.29.118 which is incompatible.[0m[31m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
spyder 4.0.1 requires pyqt5<5.13; python_version >= "3", which is not installed.
spyder 4.0.1 requires pyqtwebengine<5.13; python_version >= "3

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import boto3
import sagemaker
import pandas as pd

from pathlib import Path

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

## Creating the S3 Bucket

Let's create an S3 bucket where you will upload all the information generated by the pipeline. Make sure you set `BUCKET` to the name of the bucket you want to use. This name has to be unique.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

In [4]:
# uncomment if you want create new bucket

BUCKET = "mlschool-mnist"
region = "eu-north-1"

In [5]:
# !aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region

In [6]:
import os
current_folder = os.getcwd()
print(current_folder)
%ls

/root/ml-school/mnist
[0m[01;34mdataset[0m/  dataset.tar.gz  mnist.ipynb  preprocessor.py  train.py


## Loading the dataset

We have two CSV files containing the MNIST dataset. These files come from the [MNIST in CSV](https://www.kaggle.com/datasets/oddrationale/mnist-in-csv) Kaggle dataset.

The `mnist_train.csv` file contains 60,000 training examples and labels. The `mnist_test.csv` contains 10,000 test examples and labels. Each row consists of 785 values: the first value is the label (a number from 0 to 9) and the remaining 784 values are the pixel values (a number from 0 to 255).

Let's extract the `dataset.tar.gz` file.

In [7]:
MNIST_FOLDER = "/root/ml-school/mnist"
DATASET_FOLDER = Path(MNIST_FOLDER) / "dataset"

!tar -xvzf $MNIST_FOLDER/dataset.tar.gz -C $MNIST_FOLDER --no-same-owner

dataset/
dataset/mnist_test.csv
dataset/mnist_train.csv


Let's load the first 10 rows of the test set.

In [8]:
df = pd.read_csv(DATASET_FOLDER / "mnist_train.csv", nrows=10)
df

Unnamed: 0,label,1x1,1x2,1x3,1x4,1x5,1x6,1x7,1x8,1x9,...,28x19,28x20,28x21,28x22,28x23,28x24,28x25,28x26,28x27,28x28
0,5,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,9,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,2,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
6,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
7,3,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
8,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
9,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


## Uploading dataset to S3

In [9]:
S3_FILEPATH = f"s3://mlschool-mnist/{MNIST_FOLDER}"


TRAIN_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_train.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

TEST_SET_S3_URI = sagemaker.s3.S3Uploader.upload(
    local_path=str(DATASET_FOLDER / "mnist_test.csv"), 
    desired_s3_uri=S3_FILEPATH,
)

print(f"Train set S3 location: {TRAIN_SET_S3_URI}")
print(f"Test set S3 location: {TEST_SET_S3_URI}")

Train set S3 location: s3://mlschool-mnist/root/ml-school/mnist/mnist_train.csv
Test set S3 location: s3://mlschool-mnist/root/ml-school/mnist/mnist_test.csv


In [10]:
%%writefile {MNIST_FOLDER}/preprocessor.py

import os
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from pickle import dump
from pathlib import Path


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIR = "/opt/ml/processing"
DATA_FILEPATH_TRAIN = Path(BASE_DIR) / "input" / "mnist_train" / "mnist_train.csv"
DATA_FILEPATH_TEST = Path(BASE_DIR) / "input" / "mnist_test" / "mnist_test.csv"


def save_splits(base_dir, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """

    train_path = Path(base_dir) / "train"
    validation_path = Path(base_dir) / "validation"
    test_path = Path(base_dir) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)


def save_pipeline(base_dir, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_dir) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", 'wb'))


def generate_baseline(base_dir, X_train, y_train):
    """
    Generates a baseline for our model using the train set.
    It saves the baseline in a JSON file where every line is
    a JSON object.
    """
    baseline_path = Path(base_dir) / "baseline"
    baseline_path.mkdir(parents=True, exist_ok=True)

    df = X_train.copy()
    df["groundtruth"] = y_train

    df.to_json(baseline_path / "baseline.json", orient='records', lines=True)


def preprocess(base_dir, data_filepath_train, data_filepath_test):
    """
    Preprocesses the supplied raw dataset and splits it into a train, validation,
    and a test set.
    """

    df_train = pd.read_csv(data_filepath_train, nrows=7200)
    df_test = pd.read_csv(data_filepath_test, nrows=2000)

    numerical_columns = df_train.select_dtypes(include=['number']).drop(['label'], axis=1).columns

    numerical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="mean")),
        ("scaler", StandardScaler())
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ("numerical", numerical_preprocessor, numerical_columns),
        ]
    )

    X_train = df_train.copy()
    y_train = df_train['label']
    columns = list(X_train.drop(['label'], axis=1).columns)

    X_train, X_validation, y_train, y_validation =  train_test_split(X_train, y_train, test_size=0.2, random_state=12)
    X_test = df_test.copy()

    y_train = X_train.label
    y_validation = X_validation.label
    y_test = X_test.label

    X_train.drop(["label"], axis=1, inplace=True)
    X_validation.drop(["label"], axis=1, inplace=True)
    X_test.drop(["label"], axis=1, inplace=True)

    X_train = pd.DataFrame(X_train, columns=columns)
    X_validation = pd.DataFrame(X_validation, columns=columns)
    X_test = pd.DataFrame(X_test, columns=columns)

    y_train = y_train.astype(int)
    y_validation = y_validation.astype(int)
    y_test = y_test.astype(int)

    # Let's use the train set to generate a baseline that we can
    # later use to measure the quality of our model. This baseline
    # will use the original data.
    generate_baseline(base_dir, X_train, y_train)

    # Transform the data using the Scikit-Learn pipeline.
    X_train = preprocessor.fit_transform(X_train)
    X_validation = preprocessor.transform(X_validation)
    X_test = preprocessor.transform(X_test)

    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)

    save_splits(base_dir, train, validation, test)
    save_pipeline(base_dir, pipeline=preprocessor)


if __name__ == "__main__":
    preprocess(BASE_DIR, DATA_FILEPATH_TRAIN, DATA_FILEPATH_TEST)

Overwriting /root/ml-school/mnist/preprocessor.py


In [11]:
from botocore.exceptions import ClientError
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig

In [12]:
dataset_location_train = ParameterString(
    name="dataset_location_train",
    default_value=TRAIN_SET_S3_URI,
)

dataset_location_test = ParameterString(
    name="dataset_location_test",
    default_value=TEST_SET_S3_URI,
)

preprocessor_destination = ParameterString(
    name="preprocessor_destination",
    default_value=f"{S3_FILEPATH}/preprocessing",
)

baseline_destination = ParameterString(
    name="baseline_destination",
    default_value=f"{S3_FILEPATH}/baseline",
)

In [13]:
cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

In [14]:
sklearn_processor = SKLearnProcessor(
    base_job_name="mnist-preprocessing",
    framework_version="0.23-1",
    instance_type="ml.t3.medium",
    instance_count=1,
    role=role
)

In [15]:
preprocess_step = ProcessingStep(
    name="preprocessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=dataset_location_train, destination="/opt/ml/processing/input/mnist_train"),
        ProcessingInput(source=dataset_location_test, destination="/opt/ml/processing/input/mnist_test"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=preprocessor_destination),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=preprocessor_destination),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=preprocessor_destination),
        ProcessingOutput(output_name="pipeline", source="/opt/ml/processing/pipeline", destination=preprocessor_destination),
        ProcessingOutput(output_name="baseline", source="/opt/ml/processing/baseline", destination=baseline_destination),
    ],
    code=f"{MNIST_FOLDER}/preprocessor.py",
    cache_config=cache_config
    
)

In [16]:
# session1_pipeline = Pipeline(
#     name="mnist-session1-pipeline",
#     parameters=[
#         dataset_location_train,
#         dataset_location_test,
#         preprocessor_destination,
#         baseline_destination,
#     ],
#     steps=[
#         preprocess_step, 
#     ]
# )

In [17]:
# session1_pipeline.upsert(role_arn=role)
# execution = session1_pipeline.start()

In [18]:
# session1_pipeline.delete()

In [41]:
%%writefile {MNIST_FOLDER}/train.py

import os
import argparse

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as np
from pathlib import Path


def train(base_directory, train_path, validation_path, epochs=50, batch_size=32, learning_rate=0.01):
    X_train = pd.read_csv(Path(train_path) / "train.csv")
    y_train = X_train[X_train.columns[-1]]
    X_train.drop(X_train.columns[-1], axis=1, inplace=True)

    X_validation = pd.read_csv(Path(validation_path) / "validation.csv")
    y_validation = X_validation[X_validation.columns[-1]]
    X_validation.drop(X_validation.columns[-1], axis=1, inplace=True)

    train_dataset = TensorDataset(torch.tensor(X_train.values, dtype=torch.float32), torch.tensor(y_train.values, dtype=torch.long))
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    validation_dataset = TensorDataset(torch.tensor(X_validation.values, dtype=torch.float32), torch.tensor(y_validation.values, dtype=torch.long))
    validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)

    model = nn.Sequential(
        nn.Linear(X_train.shape[1], 10),
        nn.ReLU(),
        nn.Linear(10, 128),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(128, 10)
    )

    # Define the loss function
    loss_fn = nn.CrossEntropyLoss()

    optimizer = optim.SGD(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        train_loss = 0.0
        train_acc = 0.0

        model.train()
        for x_batch, y_batch in train_loader:
            optimizer.zero_grad()

            y_pred = model(x_batch)

            loss = loss_fn(y_pred, y_batch)
            loss.backward()

            optimizer.step()

            train_loss += loss.item() * x_batch.shape[0]
            train_acc += accuracy_score(y_batch.numpy(), np.argmax(y_pred.detach().numpy(), axis=1)) * x_batch.shape[0]

        train_loss /= len(train_dataset)
        train_acc /= len(train_dataset)

        validation_loss = 0.0
        validation_acc = 0.0

        model.eval()
        with torch.no_grad():
            for x_batch, y_batch in validation_loader:
                y_pred = model(x_batch)

                loss = loss_fn(y_pred, y_batch)

                validation_loss += loss.item() * x_batch.shape[0]
                validation_acc += accuracy_score(y_batch.numpy(), np.argmax(y_pred.numpy(), axis=1)) * x_batch.shape[0]

            validation_loss /= len(validation_dataset)
            validation_acc /= len(validation_dataset)

        print(f"Epoch {epoch+1}: Train loss: {train_loss:.4f}, Train accuracy: {train_acc:.4f}, Validation loss: {validation_loss:.4f}, Validation accuracy: {validation_acc:.4f}")

    model_filepath = Path(base_directory) / "model" / "001"
    torch.save(model.state_dict(), model_filepath)

if __name__ == "__main__":
    # Any hyperparameters provided by the training job are passed to the entry point
    # as script arguments. SageMaker will also provide a list of special parameters
    # that you can capture here. Here is the full list:
    # https://github.com/aws/sagemaker-training-toolkit/blob/master/src/sagemaker_training/params.py
    parser = argparse.ArgumentParser()
    parser.add_argument("--base_directory", type=str, default="/opt/ml/")
    parser.add_argument("--train_path", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", None))
    parser.add_argument("--validation_path", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION", None))
    parser.add_argument("--epochs", type=int, default=50)
    parser.add_argument("--batch_size", type=int, default=32)
    parser.add_argument("--learning_rate", type=float, default=0.1)
    args, _ = parser.parse_known_args()

    train(
        base_directory=args.base_directory,
        train_path=args.train_path,
        validation_path=args.validation_path,
        epochs=args.epochs,
        batch_size=args.batch_size,
        learning_rate=args.learning_rate
    )

Overwriting /root/ml-school/mnist/train.py


In [42]:
from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep
from sagemaker.parameter import IntegerParameter
from sagemaker.parameter import ContinuousParameter
from sagemaker.inputs import TrainingInput
from sagemaker.tensorflow import TensorFlow
from sagemaker.pytorch import PyTorch
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession

In [43]:
hyperparameters = {
    "epochs": 50,
    "batch_size": 32,
    "learning_rate": 0.1
}

estimator = TensorFlow(
    entry_point=f"{MNIST_FOLDER}/train.py",
    hyperparameters=hyperparameters,
    framework_version="2.4",
    py_version="py37",
    instance_type="ml.m5.large",
    instance_count=1,
    script_mode=True,
    disable_profiler=True,
    role=role,
)

pytorch_estimator = PyTorch(
    entry_point=f"{MNIST_FOLDER}/train.py",
    instance_type='ml.m5.large',
    instance_count=1,
    framework_version='1.8.0',
    py_version='py36',
    hyperparameters = hyperparameters,
    role=role,
    disable_profiler=True
)

In [44]:
training_step = TrainingStep(
    name="training",
    estimator=pytorch_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

In [49]:
hyperparameter_ranges = {
    "epochs": IntegerParameter(10, 50),
    "learning_rate": ContinuousParameter(0.001, 0.3)
}

objective_metric_name = "val_accuracy"
objective_type = "Maximize"
metric_definitions = [{"Name": objective_metric_name, "Regex": "Validation accuracy: ([0-9\\.]+)"}]
    
tuner = HyperparameterTuner(
    pytorch_estimator,
    objective_metric_name,
    hyperparameter_ranges,
    metric_definitions,
    objective_type=objective_type,
    max_jobs=5,
    max_parallel_jobs=2,
)

In [50]:
tuning_step = TuningStep(
    name = "tuning",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config
)

In [51]:
USE_TUNING_STEP = True

session2_pipeline = Pipeline(
    name="mnist-session2-pipeline",
    parameters=[
        dataset_location_train,
        dataset_location_test,
        preprocessor_destination,
        baseline_destination,
    ],
    steps=[
        preprocess_step, 
        tuning_step if USE_TUNING_STEP else training_step
    ]
)

In [52]:
session2_pipeline.upsert(role_arn=role)
execution = session2_pipeline.start()

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


Using provided s3_resource


No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


Using provided s3_resource


In [None]:
session2_pipeline.delete()