# Penguins in Production - Session 1

This program creates a [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) to build an end-to-end Machine Learning system to solve the problem of classifying penguin species. This example uses the [Penguins dataset](https://www.kaggle.com/parulpandey/palmer-archipelago-antarctica-penguin-data).

<img src='https://imgur.com/orZWHly.png' alt='Penguins dataset' width="900">

This session builds a simple [SageMaker Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) with one step to preprocess the [Penguins dataset](https://www.kaggle.com/parulpandey/palmer-archipelago-antarctica-penguin-data). We'll use a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) with a [SKLearnProcessor](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor) to execute a preprocessing script. Check the [SageMaker Pipelines Overview](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) for an introduction to the fundamental components of a SageMaker Pipeline.

Here is what the Pipeline will look like at the end of this session:

<img src='images/session1-pipeline.png' alt='Session 1 Pipeline' width="600">

This notebook is part of the [Machine Learning School](https://www.ml.school) program.

Let's ensure we are running the latest version of the SakeMaker SDK. **Restart the Kernel** after you run the following cell.

In [2]:
!pip install -q --upgrade pip
!pip install -q --upgrade awscli boto3
!pip install -q --upgrade scikit-learn==0.23.2
!pip install -q --upgrade PyYAML==6.0
!pip install -q --upgrade sagemaker
!pip show sagemaker

[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
sagemaker 2.171.0 requires PyYAML==6.0, but you have pyyaml 5.4.1 which is incompatible.[0m[31m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
awscli 1.29.2 requires PyYAML<5.5,>=3.10, but you have pyyaml 6.0 which is incompatible.[0m[31m
[0mName: sagemaker
Version: 2.171.0
Summary: Open source library for training and deploying models on Amazon SageMaker.
Home-page: https://github.com/aws/sagemaker-python-sdk/
Author: Amazon Web Services
Author-email: 
License: Apache License 2.0
Location: /usr/local/lib/python3.8/site-packages
Requires: attrs, boto3, cloudpickle, google-pasta, importlib-metadata, jsonschema, numpy, packaging, pandas, pathos, platformdirs, protobuf

In [2]:
%load_ext autoreload
%autoreload 2

import sys
from pathlib import Path

CODE_FOLDER = "code"
Path(CODE_FOLDER).mkdir(parents=True, exist_ok=True)

sys.path.append(f"./{CODE_FOLDER}")

In [3]:
import os
import pandas as pd
import sagemaker
import tempfile
import urllib.request

## Step 1 - Initial Setup

Let's start by preparing the S3 bucket where we will organize every resource we are going to use during the program. Make sure you set `BUCKET` to the bucket name you want to use. This name has to be unique. The [command line interface](https://docs.aws.amazon.com/cli/latest/index.html) is a simple way to interact with the AWS services. You can combine Python code with bash commands in the same notebook cell, which makes notebooks a very flexible tool.

If you want to create a bucket in a region other than `us-east-1`, use this command instead:

```
!aws s3api create-bucket --bucket $BUCKET --create-bucket-configuration LocationConstraint=$region
```

The `LocationConstraint` argument should specify the region where you want to create the bucket.

After we have a bucket, we can download the [Penguins dataset](https://www.kaggle.com/parulpandey/palmer-archipelago-antarctica-penguin-data) and store it in a folder inside the bucket. Our SageMaker Pipeline will use this dataset.

In [4]:
%%writefile {CODE_FOLDER}/session1.py

import boto3
import sagemaker

from pathlib import Path
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

BUCKET = "mlschool"
CODE_FOLDER = "code"

S3_FILEPATH = f"s3://{BUCKET}/penguins"
LOCAL_FILEPATH = Path().resolve() / "data.csv"
INPUT_DATA_URI = f"{S3_FILEPATH}/data.csv"

Overwriting code/session1.py


In [5]:
from session1 import *

In [6]:
!aws s3api create-bucket --bucket $BUCKET

{
    "Location": "/mlschool"
}


Download the official Penguins dataset and store it locally.

In [7]:
urllib.request.urlretrieve(
    "https://storage.googleapis.com/download.tensorflow.org/data/palmer_penguins/penguins_size.csv", 
    LOCAL_FILEPATH
)

(PosixPath('/root/ml.school/penguins/data.csv'),
 <http.client.HTTPMessage at 0x7f347f63dc10>)

Upload the dataset to S3. We need to do this to make it available to the preprocessing step.

In [8]:
sagemaker.s3.S3Uploader.upload(
    local_path=str(LOCAL_FILEPATH), 
    desired_s3_uri=S3_FILEPATH,
)

's3://mlschool/penguins/data.csv'

We can now load and display the dataset.

In [9]:
df = pd.read_csv(LOCAL_FILEPATH)
df

Unnamed: 0,species,island,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g,sex
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,MALE
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,FEMALE
2,Adelie,Torgersen,40.3,18.0,195.0,3250.0,FEMALE
3,Adelie,Torgersen,,,,,
4,Adelie,Torgersen,36.7,19.3,193.0,3450.0,FEMALE
...,...,...,...,...,...,...,...
339,Gentoo,Biscoe,,,,,
340,Gentoo,Biscoe,46.8,14.3,215.0,4850.0,FEMALE
341,Gentoo,Biscoe,50.4,15.7,222.0,5750.0,MALE
342,Gentoo,Biscoe,45.2,14.8,212.0,5200.0,FEMALE


## Step 2 - Setting up Permissions

To run this notebook you need to update the Execution Policy assigned to SageMaker's Execution Role with the following permissions:

```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "IAM0",
            "Effect": "Allow",
            "Action": [
                "iam:CreateServiceLinkedRole"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "iam:AWSServiceName": [
                        "autoscaling.amazonaws.com",
                        "ec2scheduled.amazonaws.com",
                        "elasticloadbalancing.amazonaws.com",
                        "spot.amazonaws.com",
                        "spotfleet.amazonaws.com",
                        "transitgateway.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Sid": "IAM1",
            "Effect": "Allow",
            "Action": [
                "iam:CreateRole",
                "iam:PassRole",
                "iam:AttachRolePolicy"
            ],
            "Resource": "*"
        },
        {
            "Sid": "Lambda",
            "Effect": "Allow",
            "Action": [
                "lambda:CreateFunction",
                "lambda:DeleteFunction",
                "lambda:InvokeFunctionUrl",
                "lambda:InvokeFunction",
                "lambda:UpdateFunctionCode",
                "lambda:InvokeAsync"
            ],
            "Resource": "*"
        },
        {
            "Sid": "SageMaker",
            "Effect": "Allow",
            "Action": [
                "sagemaker:UpdateDomain",
                "sagemaker:UpdateUserProfile"
            ],
            "Resource": "*"
        },
        {
            "Sid": "CloudWatch",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData",
                "cloudwatch:GetMetricData",
                "cloudwatch:DescribeAlarmsForMetric",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "logs:CreateLogGroup",
                "logs:DescribeLogStreams"
            ],
            "Resource": "*"
        },
        {
            "Sid": "ECR",
            "Effect": "Allow",
            "Action": [
                "ecr:GetAuthorizationToken",
                "ecr:BatchCheckLayerAvailability",
                "ecr:GetDownloadUrlForLayer",
                "ecr:BatchGetImage"
            ],
            "Resource": "*"
        },
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:CreateBucket",
                "s3:ListBucket",
                "s3:GetBucketLocation",
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::*"
        }
    ]
}
```

Let's start by defining a few variables we'll use throughout this notebook:

* `sagemaker_client`: We'll use a [boto3 SageMaker Client](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html) instance to access SageMaker.
* `iam_client`: We'll use a [boto3 IAM Client](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html) instance to access IAM.
* `role`: This is the execution role attached to this notebook. We can use this role with any of the SageMaker services that need it to ensure they run with the appropriate permissions.
* `region`: The current region attached to our session. 
* `sagemaker_session`: The current SageMaker session.

In [10]:
%%writefile -a {CODE_FOLDER}/session1.py

sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

Appending to code/session1.py


## Step 3 - Preprocessing the Dataset

Let's create a script to do feature engineering on the original dataset. We will run this script using a SageMaker Processing Job later in this session.

The script should split the data into train, validation, and test sets so we can later train and evaluate a model. We will also save the Scikit-Learn pipeline that we use to preprocess the data to use it during inference time and the list of target classes.

The script uses the [np.split()](https://numpy.org/doc/stable/reference/generated/numpy.split.html) function to split the dataset into three sets in the following way:

1. The train set will use the top 70% of the data.
2. The validation set will use 15% of the data, starting with the sample after the 70% used for the train set.
3. Finally, the test set will use the remaining 15% of the data.

Pay special attention to the way the Scikit-Learn pipeline `preprocessor` is used to process the three sets:

* First, we use the `fit_transform()` to fit the pipeline on the train set.
* Then, we consecutively transform the validation and test sets using `transform()`.

Always use `fit_transform()` on the training data to fit the scaling parameters we need to transform the data. For example, `fit_transform()` will learn the mean and variance of the features of the training set. It can then use these same parameters to scale the validation and test sets.

That's why we want to save this Scikit-Learn pipeline to use later to scale production data using the same parameters we learned on the train set.

In [11]:
%%writefile {CODE_FOLDER}/preprocessor.py

import os
import numpy as np
import pandas as pd
import tempfile

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from pickle import dump


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIRECTORY = "/opt/ml/processing"
DATA_FILEPATH = Path(BASE_DIRECTORY) / "input" / "data.csv"


def _save_splits(base_directory, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """
    
    train_path = Path(base_directory) / "train" 
    validation_path = Path(base_directory) / "validation" 
    test_path = Path(base_directory) / "test"
    
    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)
    
    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(validation_path / "validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)
    

def _save_pipeline(base_directory, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_directory) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", 'wb'))
    

def _save_classes(base_directory, classes):
    """
    Saves the list of classes from the dataset.
    """
    path = Path(base_directory) / "classes"
    path.mkdir(parents=True, exist_ok=True)
    
    print("CLASSES", np.asarray(classes))

    np.asarray(classes).tofile(path / "classes.csv", sep = ",") 
    
    
def _save_baseline(base_directory, X_train, X_test):
    """
    During the data and quality monitoring steps, we will need a baseline
    to compute constraints and statistics. This function will save that
    baseline to the disk.
    """
    
    for split, data in [("train", X_train), ("test", X_test)]:
        baseline_path = Path(base_directory) / f"{split}-baseline" 
        baseline_path.mkdir(parents=True, exist_ok=True)

        df = data.copy().dropna()
        df.to_json(baseline_path / f"{split}-baseline.json", orient='records', lines=True)
    
    
def preprocess(base_directory, data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train, validation,
    and a test set.
    """

    df = pd.read_csv(data_filepath)
    
    numerical_columns = [column for column in df.columns if df[column].dtype in ["int64", "float64"]]
    
    numerical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="mean")),
        ("scaler", StandardScaler())
    ])

    categorical_preprocessor = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore"))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ("numerical", numerical_preprocessor, numerical_columns),
            ("categorical", categorical_preprocessor, ["island"]),
        ]
    )
    

    X = df.drop(["sex"], axis=1)
    columns = list(X.columns)
    
    X = X.to_numpy()
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7 * len(X)), int(.85 * len(X))])
    
    X_train = pd.DataFrame(train, columns=columns)
    X_validation = pd.DataFrame(validation, columns=columns)
    X_test = pd.DataFrame(test, columns=columns)
    
    _save_baseline(base_directory, X_train, X_test)
    
    y_train = X_train.species
    y_validation = X_validation.species
    y_test = X_test.species
    
    label_encoder = LabelEncoder()
    
    y_train = label_encoder.fit_transform(y_train)
    y_validation = label_encoder.transform(y_validation)
    y_test = label_encoder.transform(y_test)
    
    X_train.drop(["species"], axis=1, inplace=True)
    X_validation.drop(["species"], axis=1, inplace=True)
    X_test.drop(["species"], axis=1, inplace=True)

    X_train = preprocessor.fit_transform(X_train)
    X_validation = preprocessor.transform(X_validation)
    X_test = preprocessor.transform(X_test)
    
    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)
    
    _save_splits(base_directory, train, validation, test)
    _save_pipeline(base_directory, pipeline=preprocessor)
    _save_classes(base_directory, label_encoder.classes_)

if __name__ == "__main__":
    preprocess(BASE_DIRECTORY, DATA_FILEPATH)


Overwriting code/preprocessor.py


## Step 4 - Testing the Preprocessing Script

We can now load the script we just created and run it locally to ensure it outputs every file we need.

We will set up a SageMaker Processing Job to run this script, but we always want to test the code locally. In this case, we can call the `preprocess()` function with the local directory and the local copy of the dataset.

In [12]:
from preprocessor import preprocess

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        data_filepath=LOCAL_FILEPATH
    )
    
    print(f"Folders: {os.listdir(directory)}")

CLASSES ['Adelie' 'Chinstrap' 'Gentoo']
Folders: ['train-baseline', 'test-baseline', 'train', 'validation', 'test', 'pipeline', 'classes']


## Step 5 - Pipeline Configuration

When creating a SageMaker Pipeline, we can specify a list of parameters we can use on individual pipeline steps. To read more about these parameters, check [Pipeline Parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html).

These are the parameters that we need right now:

* `dataset_location`: This parameter represents the dataset's location in S3. We will use this parameter to indicate the SageMaker Processing Job where to find the dataset. The Processing Job will download the dataset from S3 and make it available on the instance running the script.
* `preprocessor_destination`: We need to define the location where the SageMaker Processing Job will store the output. When it finishes, the Processing Job will copy the script's output to the S3 location specified by this parameter. By default, SageMaker uploads the output of a job to a custom location in S3, but unfortunately, if we rely on that functionality, we can't cache the Processing Step in the Pipeline.
* `train_dataset_baseline_destination`: This parameter represents the location where we will store the train dataset to compute constraints and statistic baselines.
* `test_dataset_baseline_destination`: This parameter represents the location where we will store the test dataset to compute constraints and statistic baselines.

In [13]:
%%writefile -a {CODE_FOLDER}/session1.py

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

dataset_location = ParameterString(
    name="dataset_location",
    default_value=INPUT_DATA_URI,
)

preprocessor_destination = ParameterString(
    name="preprocessor_destination",
    default_value=f"{S3_FILEPATH}/preprocessing",
)

train_dataset_baseline_destination = ParameterString(
    name="train_dataset_baseline_destination",
    default_value=f"{S3_FILEPATH}/preprocessing/baselines/train",
)

test_dataset_baseline_destination = ParameterString(
    name="test_dataset_baseline_destination",
    default_value=f"{S3_FILEPATH}/preprocessing/baselines/test",
)



Appending to code/session1.py


## Step 6 - Caching Pipeline Steps

While building a pipeline, you only want to rerun every step if you expect a different result. To accomplish this, you can instruct SageMaker to reuse the result of a previous successful run of a pipeline step. You can find more information about this topic in [Caching Pipeline Steps](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html).

In [14]:
%%writefile -a {CODE_FOLDER}/session1.py

cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

Appending to code/session1.py


## Step 7 - Setting up a Processing Step

The first step we need in the pipeline is a [Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) to run the preprocessing script. This Processing Step will create a SageMaker Processing Job in the background, run the script, and upload the output to S3. You can use Processing Jobs to perform data preprocessing, post-processing, feature engineering, data validation, and model evaluation. Check the [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) SageMaker's SDK documentation for more information.

A processor gives the Processing Step information about the hardware and software that SageMaker should use to launch the Processing Job. To run the script, we need access to Scikit-Learn, so we can use the [SKLearnProcessor](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor) processor that comes out-of-the-box with the SageMaker's Python SDK. The [Data Processing with Framework Processors](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks.html) page discusses other built-in processors you can use. The [Docker Registry Paths and Example Code](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html) page contains information about the available framework versions for each region.

The [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) requires a list of inputs that we need on the preprocessing script. In this case, the input is the dataset we stored in S3. We also have a few outputs that we want SageMaker to capture when the Processing Job finishes. SageMaker will upload every one of these outputs to the location specified by the `preprocessor_destination` parameter except the baseline data, which we will upload to the location specified by the `baseline_destination` parameter.

In [15]:
%%writefile -a {CODE_FOLDER}/session1.py

sklearn_processor = SKLearnProcessor(
    base_job_name="penguins-preprocessing",
    framework_version="0.23-1",
    instance_type="ml.t3.medium",
    instance_count=1,
    role=role,
)

preprocess_data_step = ProcessingStep(
    name="preprocess-data",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=dataset_location, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=preprocessor_destination),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=preprocessor_destination),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=preprocessor_destination),
        ProcessingOutput(output_name="pipeline", source="/opt/ml/processing/pipeline", destination=preprocessor_destination),
        ProcessingOutput(output_name="classes", source="/opt/ml/processing/classes", destination=preprocessor_destination),
        ProcessingOutput(output_name="train-baseline", source="/opt/ml/processing/train-baseline", destination=train_dataset_baseline_destination),
        ProcessingOutput(output_name="test-baseline", source="/opt/ml/processing/test-baseline", destination=test_dataset_baseline_destination),
    ],
    code=f"{CODE_FOLDER}/preprocessor.py",
    cache_config=cache_config
)

Appending to code/session1.py


## Step 8 - Running the Pipeline

Let's define and run the SageMaker Pipeline. Check [Pipeline Structure and Execution](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-pipeline.html) for more information about how to define a pipeline and [Run a Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/run-pipeline.html) for information about how to run it.

The pipeline uses the parameters we defined before and the Preprocess Step.

In [16]:
from session1 import *
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="penguins-session1-pipeline",
    parameters=[
        dataset_location, 
        preprocessor_destination,
        train_dataset_baseline_destination,
        test_dataset_baseline_destination
    ],
    steps=[
        preprocess_data_step, 
    ],
    pipeline_definition_config=pipeline_definition_config
)

Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist or update it if it does.

In [17]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()