## SageMaker Model Building and Deployment using SageMaker Workflow Pipeline


In this notebook we show how to use Amazon SageMaker to develop, train, tune and deploy a XGBoost model. Sythetic customer churn data is used. 

The data is in AWS public S3 bucket: s3://sagemaker-sample-files/datasets/tabular/synthetic/churn.txt

Sklearn Processor is used to process the raw data.

* XGBoost https://sagemaker.readthedocs.io/en/stable/frameworks/xgboost/using_xgboost.html?highlight=xgboost
* Doc https://sagemaker.readthedocs.io/en/stable/using_sklearn.html
* SDK https://sagemaker.readthedocs.io/en/stable/sagemaker.sklearn.html
* boto3 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#client
 
**This sample is provided for demonstration purposes, make sure to conduct appropriate testing if derivating this code for your own use-cases!**

In [None]:
%matplotlib inline
import os
import time
import logging
import pandas as pd
import numpy as np
import sagemaker
import json
import boto3
from sagemaker import get_execution_role

sm_client = boto3.client('sagemaker')

In [None]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep

In [None]:
# Retrieve the bucket
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()  # this could also be a hard-coded bucket name
region = sagemaker_session.boto_region_name
print(region)
role = get_execution_role()

project_name = "test_pro"
project_id = "test_id"
#model_package_group_name = project_name
print(f"sagemaker role arn <{role}>")

assert(len(project_name) <= 15 ) # the project name should not have more than 15 chars

In [None]:
print(bucket)

## Load Raw Data to S3

Load raw data from the public S3 bucket to your own S3 bucket.

In [None]:
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/synthetic/churn.txt s3://{bucket}/sagemaker/DEMO-xgboost-churn/data/RawData.csv 

## Prepare script to be used by preprocessing job and model evaluation

Create preprocessing script. This script will be used by SageMaker process job instance to preocess raw data.

In [None]:
%%writefile preprocess.py

"""Preprocess the customer churn dataset."""

import argparse
import logging
import pathlib

import boto3
import numpy as np
import pandas as pd

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

if __name__ == "__main__":
    logger.info("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    args = parser.parse_args()

    base_dir = "/opt/ml/processing"
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
    input_data = args.input_data
    print(input_data)
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])

    logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
    fn = f"{base_dir}/data/raw-data.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, fn)

    logger.info("Reading downloaded data.")

    # read in csv
    df = pd.read_csv(fn)

    # drop the "Phone" feature column
    df = df.drop(["Phone"], axis=1)

    # Change the data type of "Area Code"
    df["Area Code"] = df["Area Code"].astype(object)

    # Drop several other columns
    df = df.drop(["Day Charge", "Eve Charge", "Night Charge", "Intl Charge"], axis=1)

    # Convert categorical variables into dummy/indicator variables.
    model_data = pd.get_dummies(df)

    # Create one binary classification target column
    model_data = pd.concat(
        [
            model_data["Churn?_True."],
            model_data.drop(["Churn?_False.", "Churn?_True."], axis=1),
        ],
        axis=1,
    )

    # Split the data
    train_data, validation_data, test_data = np.split(
        model_data.sample(frac=1, random_state=1729),
        [int(0.7 * len(model_data)), int(0.9 * len(model_data))],
    )

    pd.DataFrame(train_data).to_csv(
        f"{base_dir}/train/train.csv", header=False, index=False
    )
    pd.DataFrame(validation_data).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test_data).to_csv(
        f"{base_dir}/test/test.csv", header=False, index=False
    )


### Prepare evaluation script

In [None]:
%%writefile evaluate.py

"""Evaluation script for measuring model accuracy."""

import json
import os
import tarfile
import logging
import pickle

import pandas as pd
import xgboost

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

# May need to import additional metrics depending on what you are measuring.
# See https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score


if __name__ == "__main__":
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path="..")

    logger.debug("Loading xgboost model.")
    model = pickle.load(open("xgboost-model", "rb"))

    print("Loading test input data")
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    logger.debug("Reading test data.")
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)

    logger.info("Performing predictions against test data.")
    predictions = model.predict(X_test)

    print("Creating classification evaluation report")
    acc = accuracy_score(y_test, predictions.round())
    auc = roc_auc_score(y_test, predictions.round())

    # The metrics reported can change based on the model used, but it must be a specific name per (https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html)
    report_dict = {
        "binary_classification_metrics": {
            "accuracy": {
                "value": acc,
                "standard_deviation" : "NaN"
            },
            "auc" : {
                "value" : auc,
                "standard_deviation": "NaN"
            },
        },
    }
    evaluation_output_path = '/opt/ml/processing/evaluation/evaluation.json'
    with open(evaluation_output_path, 'w') as f:
        f.write(json.dumps(report_dict))
    


## Define Model Building Pipeline

 Pipeline input parameters are listed below. 

In [None]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
        name="ProcessingInstanceType", default_value="ml.m5.xlarge"
    )

training_instance_type = ParameterString(
        name="TrainingInstanceType", default_value="ml.m5.xlarge"
    )

model_approval_status = ParameterString(
        name="ModelApprovalStatus",
        default_value="Approved",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
    )

input_data = ParameterString(
        name="InputDataUrl",
        default_value=f"s3://{sagemaker_session.default_bucket()}/sagemaker/DEMO-xgboost-churn/data/RawData.csv",  # Change this to point to the s3 location of your raw input data.
    )

In [None]:
model_package_group_name="CustomerChurnPackageGroup"  # Choose any name
#pipeline_name="CustomerChurnDemoPipe2"  # 
base_job_prefix="CustomerChurn"  # Choose any name

In [None]:
from time import strftime,gmtime
pipeline_name = 'CustomerChurn-Pipe-' + strftime("%M%S", gmtime())

The below defines a SageMaker model buidling pipeline using workflow. 

In [None]:
# Processing step for feature engineering
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-CustomerChurn-preprocess",  # choose any name
    sagemaker_session=sagemaker_session,
    role=role,
)
step_process = ProcessingStep(
    name="CustomerChurnProcess",  # choose any name
    processor=sklearn_processor,
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code=("preprocess.py"),
    job_arguments=["--input-data", input_data],
)
# Training step for generating model artifacts
model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/CustomerChurnTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",  # we are using the Sagemaker built in xgboost algorithm
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{base_job_prefix}/CustomerChurn-train",
    sagemaker_session=sagemaker_session,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="binary:logistic",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)
step_train = TrainingStep(
    name="CustomerChurnTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)
# Processing step for evaluation
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-CustomerChurn-eval",
    sagemaker_session=sagemaker_session,
    role=role,
)
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
step_eval = ProcessingStep(
    name="CustomerChurnEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code=("evaluate.py"),
    property_files=[evaluation_report],
)
# Register model step that will be conditionally executed
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
# Register model step that will be conditionally executed
step_register = RegisterModel(
    name="CustomerChurnRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
# Condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThanOrEqualTo(  # You can change the condition here
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="binary_classification_metrics.accuracy.value",  # This should follow the structure of your report_dict defined in the evaluate.py file.
    ),
    right=0.8,  # You can change the threshold here
)
step_cond = ConditionStep(
    name="CustomerChurnAccuracyCond",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)

# Pipeline instance
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=sagemaker_session,
)

## Create Pipeline

In [None]:
from botocore.exceptions import ClientError, ValidationError

try:
    response = pipeline.create(role_arn=role)
except ClientError as e:
    error = e.response["Error"]
    if error["Code"] == "ValidationError" and "Pipeline names must be unique within" in error["Message"]:
        print(error["Message"])
        response = pipeline.describe()
    else:
        raise

pipeline_arn = response["PipelineArn"]
sm_client.add_tags(
    ResourceArn=pipeline_arn,
    Tags=[
        {'Key': 'sagemaker:project-name', 'Value': project_name },
        {'Key': 'sagemaker:project-id', 'Value': project_id }
    ]
)
print(pipeline_arn)

## Run Pipeline

In [None]:
start_response = pipeline.start()

pipeline_execution_arn = start_response.arn
print(pipeline_execution_arn)

while True:
    resp = sm_client.describe_pipeline_execution(PipelineExecutionArn=pipeline_execution_arn)
    if resp['PipelineExecutionStatus'] == 'Executing':
        print('Running...')
    else:
        print(resp['PipelineExecutionStatus'], pipeline_execution_arn)
        break
    time.sleep(15)

## Approve the model to kick-off the deployment process

In [None]:
# list all packages and select the latest one
packages = sm_client.list_model_packages(ModelPackageGroupName=model_package_group_name)['ModelPackageSummaryList']
packages = sorted(packages, key=lambda x: x['CreationTime'], reverse=True)

latest_model_package_arn = packages[0]['ModelPackageArn']

In [None]:
sm_client.list_model_packages(ModelPackageGroupName=model_package_group_name)['ModelPackageSummaryList']

In [None]:
print(latest_model_package_arn)

## Approve model
Approval permission controlled by IAM role

In [None]:
model_package_update_response = sm_client.update_model_package(
    ModelPackageArn=latest_model_package_arn,
    ModelApprovalStatus="Approved",
)

## Get Approved model

In [None]:
model_details=sm_client.describe_model_package(ModelPackageName=latest_model_package_arn)

In [None]:
model_data=model_details['InferenceSpecification']['Containers'][0]['ModelDataUrl']

In [None]:
image_path=model_details['InferenceSpecification']['Containers'][0]['Image']

In [None]:
model_data

In [None]:
image_path

In [None]:
!mkdir pipeline_model

In [None]:
!aws s3 cp {model_data} ./pipeline_model

In [None]:
import tarfile
# open file
file = tarfile.open('./pipeline_model/model.tar.gz')
  
# extracting file
file.extractall('./pipeline_model')
  
file.close()

## Make prediction using Local Model

A model is created in the SageMaker notebook instance instead of using SageMaker endpoint instance. The local model in the notebook instance is used to check predictions within the notebook. 

Steps:
1. Install XGboost library on the Notebook instance if not installed. 
2. Load the trained model.
3. Use test data to make predictions.

In [None]:
!pip install xgboost

In [None]:
import xgboost as xgb

In [None]:
import joblib
loaded_model = joblib.load("./pipeline_model/xgboost-model")

In [None]:
test_data=pd.read_csv('test.csv',header=None)

In [None]:
X=test_data.iloc[0:1,1:]

In [None]:
X.shape

In [None]:
xgtest = xgb.DMatrix(X.values)

In [None]:
loaded_model.predict(xgtest)

## Create Endpoint

Create Model and deploy an endpoint

In [None]:
from time import strftime,gmtime
model_name = 'CustomerChurn-model-' + strftime("%M%S", gmtime())
model_version_arn=latest_model_package_arn

print("Model name : {}".format(model_name))

create_model_response = sm_client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,  
    PrimaryContainer = {
        "ModelPackageName": model_version_arn,
        
    }
    
) 
print("Model arn : {}".format(create_model_response["ModelArn"]))

In [None]:
#Create endpointconfig

In [None]:
endpoint_config_name = 'Test-EndpointConfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.t2.medium',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])


In [None]:
#Deploy endpoint

In [None]:
endpoint_name = 'Test-endpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("EndpointName={}".format(endpoint_name))

create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

In [None]:
sm_client.describe_endpoint(EndpointName=endpoint_name)

In [None]:
def wait_for_response(client, endpoint_name, poll_interval=30):
    ### Wait until the job finishes
    status = 'Creating'
    while(status == 'Creating'):
        response = client.describe_endpoint(EndpointName=endpoint_name)
        status = response['EndpointStatus']
        print('Creating job is still in status: {} ...'.format(status))
        if status == 'Failed':
            message = response['FailureReason']
            logging.info('Endpoint Creation failed with the following error: {}'.format(message))
            print('Endpoint failed with the following error: {}'.format(message))
            raise Exception('Creating Endpoint failed')
        logging.info("Creating job is still in status: " + status)
        time.sleep(poll_interval)

    if status == 'InService':
        logging.info("Creating job ended with status: " + status)
        print('Creating job ended with status: {}'.format(status))
    else:
        raise Exception('Creating job stopped')

In [None]:
wait_for_response(sm_client, endpoint_name, poll_interval=30)

## Invoke Endpoint

Invoke the endpoint to make predictions.

In [None]:
test_data=pd.read_csv('test.csv',header=None)
testdata1=test_data.iloc[0:1,1:]

In [None]:
runtime = boto3.client("sagemaker-runtime")
Endpoint_name=endpoint_name

In [None]:
%time
# csv serialization

prediction = runtime.invoke_endpoint(
    EndpointName=Endpoint_name,
    Body=testdata1.to_csv(header=False, index=False).encode("utf-8"),
    ContentType="text/csv",
    Accept= "text/csv",
)

In [None]:
print(prediction["Body"].read())

In [11]:
import pandas as pd
import numpy as np
import sagemaker
import boto3
from sagemaker import get_execution_role

test_data=pd.read_csv('test.csv',header=None)
testdata1=test_data.iloc[0:5,1:]

runtime = boto3.client("sagemaker-runtime")
Endpoint_name= ''  #<your endpoint name> # update to your own endpoint name

prediction = runtime.invoke_endpoint(
    EndpointName=Endpoint_name,
    Body=testdata1.to_csv(header=False, index=False).encode("utf-8"),
    ContentType="text/csv",
    Accept= "text/csv",
)

print(prediction["Body"].read())

b'0.18987475335597992,0.9903860092163086,0.003180732252076268,0.010586234740912914,0.5162394046783447'
