#### This notebook creates a SageMaker Pipeline to build an end-to-end Machine Learning system to solve the problem of classifying penguin species. With a SageMaker Pipeline, you can create, automate, and manage end-to-end Machine Learning workflows at scale.

In [2]:
%load_ext autoreload
%autoreload 2

import sys
from pathlib import Path

CODE_FOLDER = Path("code")
sys.path.append(f"./{CODE_FOLDER}")

#### Session 1 - Building a Pipeline
This session builds a simple SageMaker Pipeline with one step to preprocess the Penguins dataset. We'll use a Processing Step with a SKLearnProcessor to execute a preprocessing script.

In [3]:
import os
import numpy as np
import json
import numpy as np
import tempfile

from constants import *
from sagemaker.inputs import FileSystemInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig



#### Step 1 - Preprocessing the Dataset
Let's create a script to do feature engineering on the original dataset. We will run this script using a SageMaker Processing Job. The script should split the data into train, validation, and test sets so we can later train and evaluate a model. We will also save the Scikit-Learn pipeline that we use to preprocess the data and the list of target classes.

Pay special attention to the way the Scikit-Learn pipeline preprocessor is used to process the three sets:

First, we use the fit_transform() to fit the pipeline on the train set.
Then, we consecutively transform the validation and test sets using transform().
Always use fit_transform() on the training data to fit the scaling parameters we need to transform the data. For example, fit_transform() will learn the mean and variance of the features of the training set. It can then use these same parameters to scale the validation and test sets. That's why we want to save this Scikit-Learn pipeline to use later to scale production data using the same parameters we learned on the train set.

In [4]:
%%writefile {CODE_FOLDER}/preprocessor.py

import os
import numpy as np
import pandas as pd

from pathlib import Path
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from pickle import dump


# This is the location where the SageMaker Processing job
# will save the input dataset.
BASE_DIRECTORY = "/opt/ml/processing"
DATA_FILEPATH = Path(BASE_DIRECTORY) / "input" / "data.csv"


def _save_splits(base_directory, train, validation, test):
    """
    One of the goals of this script is to output the three
    dataset splits. This function will save each of these
    splits to disk.
    """

    train_path = Path(base_directory) / "train"
    validation_path = Path(base_directory) / "validation"
    test_path = Path(base_directory) / "test"

    train_path.mkdir(parents=True, exist_ok=True)
    validation_path.mkdir(parents=True, exist_ok=True)
    test_path.mkdir(parents=True, exist_ok=True)

    pd.DataFrame(train).to_csv(train_path / "train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        validation_path / "validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(test_path / "test.csv", header=False, index=False)


def _save_pipeline(base_directory, pipeline):
    """
    Saves the Scikit-Learn pipeline that we used to
    preprocess the data.
    """
    pipeline_path = Path(base_directory) / "pipeline"
    pipeline_path.mkdir(parents=True, exist_ok=True)
    dump(pipeline, open(pipeline_path / "pipeline.pkl", "wb"))


def _save_classes(base_directory, classes):
    """
    Saves the list of classes from the dataset.
    """
    path = Path(base_directory) / "classes"
    path.mkdir(parents=True, exist_ok=True)

    np.asarray(classes).tofile(path / "classes.csv", sep=",")


def _save_baseline(base_directory, df_train, df_test):
    """
    During the data and quality monitoring steps, we will need a baseline
    to compute constraints and statistics. This function will save that
    baseline to the disk.
    """

    for split, data in [("train", df_train), ("test", df_test)]:
        baseline_path = Path(base_directory) / f"{split}-baseline"
        baseline_path.mkdir(parents=True, exist_ok=True)

        df = data.copy().dropna()
        df.to_json(
            baseline_path / f"{split}-baseline.json", orient="records", lines=True
        )


def preprocess(base_directory, data_filepath):
    """
    Preprocesses the supplied raw dataset and splits it into a train,
    validation, and a test set.
    """

    df = pd.read_csv(data_filepath)

    numeric_features = df.select_dtypes(include=['float64']).columns.tolist()
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="mean")),
            ("scaler", StandardScaler()),
        ]
    )

    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OneHotEncoder()),
        ]
    )

    preprocessor = ColumnTransformer(
        transformers=[
            ("numeric", numeric_transformer, numeric_features),
            ("categorical", categorical_transformer, ["island"]),
        ]
    )

    pipeline = Pipeline(
        steps=[
            ("preprocessing", preprocessor)
        ]
    )

    df.drop(["sex"], axis=1, inplace=True)
    df = df.sample(frac=1, random_state=42)

    df_train, temp = train_test_split(df, test_size=0.3)
    df_validation, df_test = train_test_split(temp, test_size=0.5)

    label_encoder = LabelEncoder()
    y_train = label_encoder.fit_transform(df_train.species)
    y_validation = label_encoder.transform(df_validation.species)
    y_test = label_encoder.transform(df_test.species)
    
    _save_baseline(base_directory, df_train, df_test)

    df_train = df_train.drop(["species"], axis=1)
    df_validation = df_validation.drop(["species"], axis=1)
    df_test = df_test.drop(["species"], axis=1)

    X_train = pipeline.fit_transform(df_train)
    X_validation = pipeline.transform(df_validation)
    X_test = pipeline.transform(df_test)

    train = np.concatenate((X_train, np.expand_dims(y_train, axis=1)), axis=1)
    validation = np.concatenate((X_validation, np.expand_dims(y_validation, axis=1)), axis=1)
    test = np.concatenate((X_test, np.expand_dims(y_test, axis=1)), axis=1)

    _save_splits(base_directory, train, validation, test)
    _save_pipeline(base_directory, pipeline=pipeline)
    _save_classes(base_directory, label_encoder.classes_)


if __name__ == "__main__":
    preprocess(BASE_DIRECTORY, DATA_FILEPATH)

Writing code/preprocessor.py


#### Step 2 - Testing the Preprocessing Script
We can now load the script we just created and run it locally to ensure it outputs every file we need. In this case, we can call the preprocess() function with the local directory and the local copy of the dataset.

In [5]:
from preprocessor import preprocess

with tempfile.TemporaryDirectory() as directory:
    preprocess(
        base_directory=directory, 
        data_filepath=DATA_FILEPATH
    )
    
    print(f"Folders: {os.listdir(directory)}")


Folders: ['train-baseline', 'test-baseline', 'train', 'validation', 'test', 'pipeline', 'classes']


In [6]:
directory

'/tmp/tmpe0t6xxb5'

In [7]:
DATA_FILEPATH

PosixPath('/root/Penguin-Detection/data.csv')

#### Step 3 - Pipeline Configuration
When creating a SageMaker Pipeline, we can specify a list of parameters we can use on individual pipeline steps. To read more about these parameters, check Pipeline Parameters. The dataset_location represents the dataset's location in S3. We will use this parameter to indicate the SageMaker Processing Job where to find the dataset. The Processing Job will download the dataset from S3 and make it available on the instance running the script.

We can also define a caching policy to reuse the result of a previous successful run of a pipeline step. You can find more information about this topic in Caching Pipeline Steps.

In [8]:
dataset_location = ParameterString(
    name="dataset_location",
    default_value=f"{S3_LOCATION}/data.csv",
)

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

cache_config = CacheConfig(
    enable_caching=True, 
    expire_after="15d"
)

#### Step 4 - Setting up a Processing Step
The first step we need in the pipeline is a Processing Step to run the preprocessing script. This Processing Step will create a SageMaker Processing Job in the background, run the script, and upload the output to S3. You can use Processing Jobs to perform data preprocessing, post-processing, feature engineering, data validation, and model evaluation. Check the ProcessingStep SageMaker's SDK documentation for more information.

A processor gives the Processing Step information about the hardware and software that SageMaker should use to launch the Processing Job. To run the script, we need access to Scikit-Learn, so we can use the SKLearnProcessor processor that comes out-of-the-box with the SageMaker's Python SDK. The Data Processing with Framework Processors page discusses other built-in processors you can use. The Docker Registry Paths and Example Code page contains information about the available framework versions for each region.

The ProcessingStep requires a list of inputs that we need on the preprocessing script. In this case, the input is the dataset we stored in S3. We also have a few outputs that we want SageMaker to capture when the Processing Job finishes.

In [9]:
sklearn_processor = SKLearnProcessor(
    base_job_name="penguins-preprocessing",
    framework_version="0.23-1",
    instance_type="ml.t3.medium",
    instance_count=1,
    role=role,
    sagemaker_session=pipeline_session
)

preprocess_data_step = ProcessingStep(
    name="preprocess-data",
    step_args=sklearn_processor.run(
        code=f"{CODE_FOLDER}/preprocessor.py",
        inputs=[
            ProcessingInput(source=dataset_location, destination="/opt/ml/processing/input"),  
        ],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
            ProcessingOutput(output_name="pipeline", source="/opt/ml/processing/pipeline"),
            ProcessingOutput(output_name="classes", source="/opt/ml/processing/classes"),
            ProcessingOutput(output_name="train-baseline", source="/opt/ml/processing/train-baseline"),
            ProcessingOutput(output_name="test-baseline", source="/opt/ml/processing/test-baseline"),
        ]
    ),
    cache_config=cache_config
)



#### Step 5 - Setting up the Pipeline
Let's define the SageMaker Pipeline. Check Pipeline Structure and Execution for more information about how to define a pipeline.

In [10]:
session1_pipeline = Pipeline(
    name="penguins-session1-pipeline",
    parameters=[
        dataset_location
    ],
    steps=[
        preprocess_data_step, 
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=pipeline_session
)

session1_pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:411039631567:pipeline/penguins-session1-pipeline',
 'ResponseMetadata': {'RequestId': '919e77fa-ee56-4b5e-a0f3-d66e52f74c65',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '919e77fa-ee56-4b5e-a0f3-d66e52f74c65',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '94',
   'date': 'Fri, 04 Aug 2023 19:06:51 GMT'},
  'RetryAttempts': 0}}

#### Running the first pipeline 

In [None]:
session1_pipeline.start()