# SageMaker Pipeline Demo

This demo contains xgboost model regularly model training, deploy through SageMaker pipeline, and inference.

In [None]:
# cell 02
import sagemaker
bucket = sagemaker.Session().default_bucket()
prefix = 'sagemaker/DEMO-xgboost-dm'
 
# Define IAM role
import boto3
import re
from sagemaker import get_execution_role

role = get_execution_role()

In [None]:
# cell 03
import numpy as np                                # For matrix operations and numerical processing
import pandas as pd                               # For munging tabular data
import matplotlib.pyplot as plt                   # For charts and visualizations
from IPython.display import Image                 # For displaying images in the notebook
from IPython.display import display               # For displaying outputs in the notebook
from time import gmtime, strftime                 # For labeling SageMaker models, endpoints, etc.
import sys                                        # For writing outputs to notebook
import math                                       # For ceiling function
import json                                       # For parsing hosting outputs
import os                                         # For manipulating filepath names
import sagemaker 
import zipfile     # Amazon SageMaker's Python SDK provides many helper functions

import time

In [None]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name
model_package_group_name = "xgboost-dir-mkt-bin-classification"  # Model name in model registry
prefix = "xgboost-demo-pipelines"
pipeline_name = "Xgboost-demo-pipelines"  # SageMaker Pipeline name
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

In [None]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# # raw input data
# input_data = ParameterString(name="InputData", default_value=raw_s3)

# # processing step parameters
# processing_instance_type = ParameterString(
#     name="ProcessingInstanceType", default_value="ml.m5.large"
# )

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.large")
training_epochs = ParameterString(name="TrainingEpochs", default_value="100")

# # model performance step parameters
# accuracy_mse_threshold = ParameterFloat(name="AccuracyMseThreshold", default_value=0.75)

# Inference step parameters
endpoint_instance_type = ParameterString(name="EndpointInstanceType", default_value="ml.m5.large")

## Define model training step

In [None]:
container = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='xgboost', version='latest')

In [None]:
data_prefix = 'sagemaker/DEMO-xgboost-dm'

In [None]:

s3_input_train = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/train'.format(bucket, data_prefix), content_type='csv')
s3_input_validation = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/validation/'.format(bucket, data_prefix), content_type='csv')

In [None]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
import time

model_path = f"s3://{bucket}/{prefix}/model/"


xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    instance_count=1, 
                                    instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/output'.format(bucket, prefix),
                                    sagemaker_session=sagemaker_session)
xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100,
                        _kfold=5,
                        _num_cv_round=3)

step_train_model = TrainingStep(
    name="xgb-model-training",
    estimator=xgb,
    inputs={
        "train": s3_input_train,
        "validation": s3_input_validation,
    },
)



## Create the Model step

In [None]:
# from sagemaker.workflow.step_collections import CreateModelStep

model = sagemaker.model.Model(
    image_uri=step_train_model.properties.AlgorithmSpecification.TrainingImage,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role
    )


step_create_model = CreateModelStep(
    name="Create-Xgboost-dir-mkt-Model",
    model=model,
    inputs=sagemaker.inputs.CreateModelInput(instance_type=endpoint_instance_type),
)

## Deploy model to SageMaker Endpoint Lambda Step

When defining the LambdaStep, the SageMaker Lambda helper class provides helper functions for creating the Lambda function. Users can either use the lambda_func argument to provide the function ARN to an already deployed Lambda function OR use the Lambda class to create a Lambda function by providing a script, function name and role for the Lambda function.

When passing inputs to the Lambda, the inputs argument can be used and within the Lambda function's handler, the event argument can be used to retrieve the inputs.

The dictionary response from the Lambda function is parsed through the LambdaOutput objects provided to the outputs argument. The output_name in LambdaOutput corresponds to the dictionary key in the Lambda's return dictionary.

In [None]:
%%writefile deploy_model_lambda.py


"""
This Lambda function deploys the model to SageMaker Endpoint. 
If Endpoint exists, then Endpoint will be updated with new Endpoint Config.
"""

import json
import boto3
import time


sm_client = boto3.client("sagemaker")


def lambda_handler(event, context):

    print(f"Received Event: {event}")

    current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())
    endpoint_instance_type = event["endpoint_instance_type"]
    model_name = event["model_name"]
    endpoint_config_name = "{}-{}".format(event["endpoint_config_name"], current_time)
    endpoint_name = event["endpoint_name"]

    # Create Endpoint Configuration
    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": endpoint_instance_type,
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ],
    )
    print(f"create_endpoint_config_response: {create_endpoint_config_response}")

    # Check if an endpoint exists. If no - Create new endpoint, if yes - Update existing endpoint
    list_endpoints_response = sm_client.list_endpoints(
        SortBy="CreationTime",
        SortOrder="Descending",
        NameContains=endpoint_name,
    )
    print(f"list_endpoints_response: {list_endpoints_response}")

    if len(list_endpoints_response["Endpoints"]) > 0:
        print("Updating Endpoint with new Endpoint Configuration")
        update_endpoint_response = sm_client.update_endpoint(
            EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
        )
        print(f"update_endpoint_response: {update_endpoint_response}")
    else:
        print("Creating Endpoint")
        create_endpoint_response = sm_client.create_endpoint(
            EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
        )
        print(f"create_endpoint_response: {create_endpoint_response}")

    return {"statusCode": 200, "body": json.dumps("Endpoint Created Successfully")}

In [None]:
## Run only one time

import boto3
import time
import json


iam = boto3.client('iam')

def create_s3_lambda_role(role_name):
    try:
        response = iam.create_role(
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "lambda.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description='Role for Lambda to provide S3 read only access'
        )

        role_arn = response['Role']['Arn']

        response = iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        )

        response = iam.attach_role_policy(
            PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess',
            RoleName=role_name
        )

        print('Waiting 30 seconds for the IAM role to propagate')
        time.sleep(30)
        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print(f'Using ARN from existing role: {role_name}')
        response = iam.get_role(RoleName=role_name)
        return response['Role']['Arn']
    

def create_sagemaker_lambda_role(role_name):
    try:
        response = iam.create_role(
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "lambda.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description='Role for Lambda to call SageMaker functions'
        )

        role_arn = response['Role']['Arn']

        response = iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        )

        response = iam.attach_role_policy(
            PolicyArn='arn:aws:iam::aws:policy/AmazonSageMakerFullAccess',
            RoleName=role_name
        )

        print('Waiting 30 seconds for the IAM role to propagate')
        time.sleep(30)
        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print(f'Using ARN from existing role: {role_name}')
        response = iam.get_role(RoleName=role_name)
        return response['Role']['Arn']

In [None]:
# Only create once

lambda_role = create_sagemaker_lambda_role("deploy-model-lambda-role-sagemaker")

In [None]:
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

endpoint_config_name = "xgb-dir-mkt-endpoint-config"
endpoint_name = "xgb-dir-endpoint-" + current_time

deploy_model_lambda_function_name = "sagemaker-deploy-model-lambda-" + current_time

deploy_model_lambda_function = Lambda(
    function_name=deploy_model_lambda_function_name,
    execution_role_arn=lambda_role,
    script="deploy_model_lambda.py",
    handler="deploy_model_lambda.lambda_handler",
)

step_deploy_model_lambda = LambdaStep(
    name="Deploy-Xgboost-dir-mkt-Endpoint",
    lambda_func=deploy_model_lambda_function,
    inputs={
        "model_name": step_create_model.properties.ModelName,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "endpoint_instance_type": endpoint_instance_type,
    },
)

## Pipeline Creation: Orchestrate all steps

In [None]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        training_instance_type,
        training_epochs,
        endpoint_instance_type,
    ],
    steps=[step_train_model, step_create_model, step_deploy_model_lambda],
)

## Execute the Pipeline

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

## Submit pipeline

In [None]:
pipeline.upsert(role_arn=role)

## Execute pipeline using the default parameters

In [None]:
execution = pipeline.start()

## Inference Test

In [None]:
test_data = pd.read_csv('validation.csv', header=None)
test_data = test_data.drop([0], axis=1)

data = test_data.iloc[1]
payload = data.values.tolist()

payload_new = [str(x) for x in payload]
payload_new = ','.join(payload_new)
payload_new

In [None]:
import boto3
import json
import numpy as np

client = boto3.client('sagemaker-runtime')

response=client.invoke_endpoint(EndpointName=endpoint_name,
    Body=payload_new,
    ContentType='text/csv'
    # Accept='string',
    )

result = response['Body'].read()
result = json.loads(result)
print(result)

## Regular trigger

In [None]:
%%writefile pipeline_execution_lambda.py


"""
This Lambda function execute sagemaker pipelines, it is triggered by EventBridge rules. 
"""

import boto3
import time

client = boto3.client('sagemaker')

def lambda_handler(event, context):

    print(f"Received Event: {event}")

    current_time = time.strftime("%y-%m-%d-%H-%M-%S", time.localtime())
    exec_display_name = 'xgb-dir-mkt-' + current_time

    response = client.start_pipeline_execution(
                PipelineName='Xgboost-demo-pipelines',
                    PipelineExecutionDisplayName=exec_display_name,
                        PipelineParameters=[
                                    {
                                        'Name': 'TrainingInstanceType',
                                        'Value': 'ml.m5.large'
                                    },
                                    {
                                        'Name': 'TrainingEpochs',
                                        'Value': '100'
                                    },
                                    {
                                        'Name': 'EndpointInstanceType',
                                        'Value': 'ml.m5.large'
                                    }
                                        ],
                          #  PipelineExecutionDescription='string',
                          #      ClientRequestToken='string'
                                )

    print(response)
    
    return {"statusCode": 200, "body": response['PipelineExecutionArn']}

In [None]:
from sagemaker.lambda_helper import Lambda



pipeline_exec_lambda_function_name = "sagemaker-pipeline-exec-lambda"

pipeline_exec_lambda_function = Lambda(
    function_name=pipeline_exec_lambda_function_name,
    execution_role_arn=lambda_role,
    script="pipeline_execution_lambda.py",
    handler="pipeline_execution_lambda.lambda_handler",
)

print(pipeline_exec_lambda_function.create())

## EventBus to trigger

https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule-schedule.html