# AWS Sagemaker pipeline 

1. Imports 
2. Configuration 
3. Data <br>&emsp;3.1 Train data <br>
&emsp;3.2 Batch inference data <br>
4. Pipeline components <br>
4.1 Define Parameters of pipeline <br>
4.2 Feature Engineering Processing step <br>
4.2.1 Code for preprocessing <br>
4.2.2 SKLearn Processor instance <br>
4.2.3 Sagemaker pipeline - Processing Step <br>
4.3 Training step <br>
4.3.1 Sagemaker XGboost <br>
4.3.2 Sagemaker pipeline - Training step <br>
4.4 Model evaluation Processing step <br>
4.4.1 Code for model validation<br>
4.4.2 Script processor instance<br>
4.4.3 Sagemaker pipeline - Processing Step<br>
4.5 Batch inference<br>
4.5.1 Success - Conditional step<br>
4.5.2 Fail - Conditional step<br>

<br>
<br>




# 1. Imports

In [48]:
import sys
import boto3
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput, CreateModelInput, TransformInput
from sagemaker.model import Model
from sagemaker.transformer import Transformer
from sagemaker.model_metrics import MetricsSource, ModelMetrics


In [3]:
#Sagemaker pipeline workflow imports
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TransformStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.pipeline import Pipeline

In [3]:
!{sys.executable} -m pip install "sagemaker>=2.99.0"

[0m

# 2. Configuration settings

In [4]:
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"#Model registry name

# 3. Data

## 3.1 Train Data

In [5]:
!mkdir -p data

In [6]:
# Download data
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-sample-files").download_file(
    "datasets/tabular/uci_abalone/abalone.csv", local_path
)


In [7]:
# Upload data to S3
base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-ap-northeast-1-604458403141/abalone/abalone-dataset.csv


## 3.2 Batch inference data

In [9]:
# Download data
local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch", local_path
)

In [10]:
# upload data
base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-ap-northeast-1-604458403141/abalone/abalone-dataset-batch


# 4. PIPELINE COMPONENTS

## 4.1 Define Parameters of pipeline

In [11]:
# Number of compute instances
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# Type of instances
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")

# Default approval state of the model 
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

# Path of the train data
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

# Path of the batch prediction data
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)

# Threshold amount
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

## 4.2 Feature Engineering Processing step

- Fill in missing sex category data and encode it so that it is suitable for training.
- Scale and normalize all numerical fields, aside from sex and rings numerical data.
- Split the data into training, validation, and test datasets

### 4.2.1 Code for preprocessing

In [14]:
!mkdir -p code

In [15]:
%%writefile code/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file, we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype), #read .csv from local
    )
    
    #Define separate pipeline for numerice features - imputer + scaler
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
    ) 

    #Define separate pipeline for categorical features - imputer + one-hot encoding
    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    ) 

    #transform different column subset separately and concat features
    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    ) 

    #Preprocess data
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    #train, validation, test split
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    #Save data to local of container running this .py file
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)


Writing code/preprocessing.py


### 4.2.2 SKLearn Processor instance

In [21]:
#Defining sklearn processor docker container specifications
framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

### 4.2.3 Sagemaker pipeline - Processing Step

In [22]:
#Run the docker container but this time it does not launch the processing job, 
#it returns the arguments needed to run the job as a step in the pipeline

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="AbaloneProcess", step_args=processor_args)



## 4.3 Training step

### 4.3.1 Sagemaker XGboost

In [25]:
model_path = f"s3://{default_bucket}/AbaloneTrain"

#Define the specs of XGB docker image
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

#Run the XGB image in a container
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ), #Using the output of the previous step 
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)


### 4.3.2 Sagemaker pipeline - Training step

In [27]:
step_train = TrainingStep(
    name="AbaloneTrain",
    step_args=train_args,
)

## 4.4 Model evaluation Processing step

- Load the model.
- Read the test data.
- Issue predictions against the test data.
- Build a classification report, including accuracy and ROC curve.
- Save the evaluation report to the evaluation directory.

### 4.4.1 Code for model validation

In [30]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Writing code/evaluation.py


### 4.4.2 Script processor instance

In [29]:
script_eval = ScriptProcessor(
    image_uri=image_uri, #XGB image defined earlier
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

#Run the script processor image in a docker container
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)


### 4.4.3 Sagemaker pipeline - Processing Step

In [34]:
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

In [35]:
step_eval = ProcessingStep(
    name="AbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## 4.5 Batch inference 
The trained model is used for batch inference if it satisfies the threshold metric value. This is the 1st part of the conditional statement where the model performance is greater than the threshold and it can be used for inference

### 4.5.1 Success - Conditional step

#### Sagemaker pipeline - Create model step

In [38]:
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, #Model params from the train step
    sagemaker_session=pipeline_session,
    role=role,
)

step_create_model = ModelStep(
    name="AbaloneCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

#### Sagemaker pipeline - Transform Step

In [41]:
#Define transformer instance image specifications
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform",
)

step_transform = TransformStep(
    name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

#### Sagemaker pipeline - Register Model Step

In [44]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource( #specify the evaluation metrics json
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="AbaloneRegisterModel", step_args=register_args)


Job Name:  script-abalone-eval-2022-12-11-10-11-10-286
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fee09206d90>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7fee091f02d0>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-northeast-1-604458403141/script-abalone-eval-2022-12-11-10-11-10-286/input/code/evaluation.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3Compressi

### 4.5.2 Fail - Conditional step
Fail Step to Terminate the Pipeline Execution and Mark it as Failed

- Define a FailStep with customized error message, which indicates the cause of the execution failure.
- Enter the FailStep error message with a Join function, which appends a static text string with the dynamic mse_threshold parameter to build a more informative error message.

In [46]:
step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

## 4.6 Define a Condition Step
Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State
A ConditionStep enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties

- Define a ConditionLessThanOrEqualTo on the accuracy value found in the output of the evaluation step, step_eval.
- Use the condition in the list of conditions in a ConditionStep.
- Pass the CreateModelStep and TransformStep steps, and the RegisterModel step collection into the if_steps of the ConditionStep, which are only executed if the condition evaluates to True.
- Pass the FailStep step into the else_steps of the ConditionStep, which is only executed if the condition evaluates to False

In [None]:
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

# 5. BUILD THE PIPELINE
Combine the steps into a Pipeline so it can be executed

In [None]:
pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)


# 6. Submit the pipeline to SageMaker and start execution

In [None]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()