## ML pipeline for scammers detection

The steps in this pipeline include:
1. Preprocessing the dataset
2. Train an XGBoost Model
3. Evaluate the model performance
4. Create a model
5. Register the model version into the Model Registry
5. Deploy the model version to a SageMaker Hosted Endpoint using a Lambda Function

In [1]:
# Make sure the SageMaker SDK is updated
import sys
!{sys.executable} -m pip install --upgrade sagemaker

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes


In [33]:
# Import all the libraries used in this notebook
import os, time, json
import boto3
import sagemaker
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    Processor,
    ScriptProcessor,
)
from sagemaker import Model
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.xgboost import XGBoostPredictor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CacheConfig
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.workflow.step_collections import CreateModelStep, RegisterModel
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
from sagemaker.lambda_helper import Lambda

In [3]:
# Create the SageMaker Session and define variables and parameters needed for the Pipeline steps
region = sagemaker.Session().boto_region_name
sm_client = boto3.client("sagemaker")
boto_session = boto3.Session(region_name=region)
sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sm_client)
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
base_job_prefix = "scam-detect"
s3_prefix = "scam-detect-pipeline"
model_package_group_name = "scam-detect-package-group"
input_dataset_s3_uri="s3://rodzanto2021ml/scam-detect/features_small.csv" ### <--- Replace this with your S3 location for the input dataset...

In [4]:
# Define a series of parameters for the pipeline
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-servicecatalog-seedcode-{region}/dataset/abalone-dataset.csv",
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

# Cache Pipeline steps to reduce execution time on subsequent executions
#cache_config = CacheConfig(enable_caching=True, expire_after="30d") ### Uncomment if you want to enable caching in the pipeline
cache_config = CacheConfig(enable_caching=False)

#### Data Preparation

An SKLearn processor is used to prepare the dataset for the Hyperparameter Tuning job. Using the script preprocess.py, the dataset is featurized and split into train, test, and validation datasets.

The output of this step is used as the input to the TrainingStep

In [5]:
%%writefile preprocess.py

"""Feature engineers the dataset."""
import argparse
import logging
import os
import pathlib
import requests
import tempfile
import boto3
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

if __name__ == "__main__":
    logger.debug("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    args = parser.parse_args()

    base_dir = "/opt/ml/processing"
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
    input_data = args.input_data
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])

    logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
    fn = f"{base_dir}/data/features_small.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, fn)

    logger.debug("Reading downloaded data.")
    df = pd.read_csv(
        fn
    )
    os.unlink(fn)
    feature_columns_names = list(df.columns[df.columns != 'scam'])
    label_column = "scam"

    logger.debug("Defining transformers.")
    numeric_features = list(feature_columns_names)
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
        ]
    )

    logger.info("Applying transforms.")
    y = df.pop(label_column)
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    logger.info("Splitting %d rows of data into train, validation, test datasets.", len(X))
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    logger.info("Writing out datasets to %s.", base_dir)
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting preprocess.py


In [6]:
# Process the training data step using a python script.
# Split the training data set into train, test, and validation datasets

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/preprocess",
    sagemaker_session=sagemaker_session,
    role=role,
)
step_process = ProcessingStep(
    name="PreprocessDataset",
    processor=sklearn_processor,
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="preprocess.py",
    job_arguments=["--input-data", input_data],
    cache_config=cache_config,
)

#### Model Training

Train an XGBoost model with the output of the ProcessingStep.

In [21]:
# Define the output path for the model artifacts from the Hyperparameter Tuning Job
model_path = f"s3://{default_bucket}/{base_job_prefix}/scam-detect-train"

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{base_job_prefix}/train",
    sagemaker_session=sagemaker_session,
    role=role,
)

xgb_train.set_hyperparameters(
    objective="binary:logistic",
    num_round=500,
    gamma=0,
    max_depth=3
)

step_train = TrainingStep(
    name="TrainModel",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)

#### Evaluate the model

Use a processing job to evaluate the model from the TrainingStep. If the output of the evaluation is True, a model will be created and a Lambda will be invoked to deploy the model to a SageMaker Endpoint. 

In [22]:
%%writefile evaluate.py

"""Evaluation script for measuring mean squared error."""
import json
import logging
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
    logger.debug("Starting evaluation.")
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading xgboost model.")
    model = pickle.load(open("xgboost-model", "rb"))

    logger.debug("Reading test data.")
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    logger.debug("Reading test data.")
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)

    logger.info("Performing predictions against test data.")
    predictions = model.predict(X_test)

    logger.debug("Calculating mean squared error.")
    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info("Writing out evaluation report with mse: %f", mse)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting evaluate.py


In [23]:
# A ProcessingStep is used to evaluate the performance of the trained model. Based on the results of the evaluation, the model is created and deployed.

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/evaluate",
    sagemaker_session=sagemaker_session,
    role=role,
)

evaluation_report = PropertyFile(
    name="ScamDetectEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

step_eval = ProcessingStep(
    name="EvaluateModel",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f"s3://{default_bucket}/{s3_prefix}/evaluation_report",
        ),
    ],
    code="evaluate.py",
    property_files=[evaluation_report],
    cache_config=cache_config,
)

#### Create the model

The model is created and the name of the model is provided to the Lambda function for deployment. The `CreateModelStep` dynamically assigns a name to the model.

In [24]:
# Create Model
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
    predictor_cls=XGBoostPredictor,
)

step_create_model = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=sagemaker.inputs.CreateModelInput(instance_type="ml.m5.large"),
)

#### Register the model

Register the model version in the SageMaker Model Registry for version control.

In [25]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
step_register = RegisterModel(
    name="RegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.xlarge", "ml.m5.2xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

#### Create the Lambda Step

When defining the LambdaStep, the SageMaker Lambda helper class provides helper functions for creating the Lambda function. Users can either use the `lambda_func` argument to provide the function ARN to an already deployed Lambda function OR use the `Lambda` class to create a Lambda function by providing a script, function name and role for the Lambda function. 

When passing inputs to the Lambda, the `inputs` argument can be used and within the Lambda function's handler, the `event` argument can be used to retrieve the inputs.

The dictionary response from the Lambda function is parsed through the `LambdaOutput` objects provided to the `outputs` argument. The `output_name` in `LambdaOutput` corresponds to the dictionary key in the Lambda's return dictionary. 

#### Define the Lambda function

Users can choose the leverage the Lambda helper class to create a Lambda function and provide that function object to the LambdaStep. Alternatively, users can use a pre-deployed Lambda function and provide the function ARN to the `Lambda` helper class in the lambda step. 

In [26]:
%%writefile lambda_deploy.py

"""
This Lambda function creates an Endpoint Configuration and deploys a model to an Endpoint. 
The name of the model to deploy is provided via the `event` argument
"""

import json
import boto3


def lambda_handler(event, context):
    """ """
    sm_client = boto3.client("sagemaker")

    # The name of the model created in the Pipeline CreateModelStep
    model_name = event["model_name"]

    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]

    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": "ml.m5.xlarge",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ],
    )

    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
    )

    return {
        "statusCode": 200,
        "body": json.dumps("Created Endpoint!"),
        "other_key": "example_value",
    }

Overwriting lambda_deploy.py


In [27]:
# Deploy Lambda Step
lambda_role = role
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())
model_name = "scam-detect-model" + current_time
endpoint_config_name = "scam-detect-endpoint-config-" + current_time
endpoint_name = "scam-detect-endpoint-" + current_time
function_name = "sagemaker-lambda-step-endpoint-deploy-" + current_time

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="lambda_deploy.py",
    handler="lambda_deploy.lambda_handler",
)

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="other_key", output_type=LambdaOutputTypeEnum.String)

step_deploy_lambda = LambdaStep(
    name="DeployEndpoint",
    lambda_func=func,
    inputs={
        "model_name": step_create_model.properties.ModelName,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
    },
    outputs=[output_param_1, output_param_2, output_param_3],
)

In [28]:
# condition step for evaluating model quality and branching execution.
# The `json_path` value is based on the `report_dict` variable in `evaluate.py`
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=6.0,
)

step_cond = ConditionStep(
    name="CheckEvaluation",
    conditions=[cond_lte],
    if_steps=[step_create_model, step_deploy_lambda, step_register],
    else_steps=[],
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [29]:
# Use the same pipeline name across execution for cache usage.
pipeline_name = "scam-detect-pipeline" + current_time

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        input_data,
        model_approval_status,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=sagemaker_session,
)

#### Execute the Pipeline

In [30]:
definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'InputDataUrl',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-servicecatalog-seedcode-eu-west-1/dataset/abalone-dataset.csv'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessDataset',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
   

In [31]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:eu-west-1:889960878219:pipeline/scam-detect-pipeline08-31-12-54-08',
 'ResponseMetadata': {'RequestId': '40b1a859-cb34-4449-b798-56f4f0ade421',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '40b1a859-cb34-4449-b798-56f4f0ade421',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '102',
   'date': 'Tue, 31 Aug 2021 12:54:12 GMT'},
  'RetryAttempts': 0}}

In [32]:
execution = pipeline.start(
    parameters=dict(
        InputDataUrl=input_dataset_s3_uri
    )
)

In [None]:
execution.wait()

#### Cleaning up resources

Running the following cell will delete the following resources created in this notebook -
* SageMaker Model
* SageMaker Endpoint Configuration
* SageMaker Endpoint
* SageMaker Pipeline
* Lambda Function

In [None]:
# Create a SageMaker client
sm_client = boto3.client("sagemaker")

# Get the model name from the EndpointCofig. The CreateModelStep properties are not available outside the Pipeline execution context
# so `step_create_model.properties.ModelName` can not be used while deleting the model.
model_name = sm_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)[
    "ProductionVariants"
][0]["ModelName"]

# Delete the Model
sm_client.delete_model(ModelName=model_name)

# Delete the EndpointConfig
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

# Delete the endpoint
sm_client.delete_endpoint(EndpointName=endpoint_name)

# Delete the Lambda function
func.delete()

# Delete the Pipeline
sm_client.delete_pipeline(PipelineName=pipeline_name)