## Create and run and end-to-end ML pipeline using AWS SageMaker and Lambda

**This sample is provided for demonstration purposes, make sure to conduct appropriate testing if derivating this code for your own use-cases!**

### Step 0: Get Admin Setup Results
Bucket names, codecommit repo, docker image, IAM roles, ...

In order to keep things orginized, we will save our `Source Code` (data processing, model training/serving scripts), `datasets`, as well as our trained `model(s) binaries` and their `test-performance metrics` all on S3, **versioned with respect to the date/time of each update.**

In [1]:
import sagemaker
import boto3
import zipfile
import json
from time import gmtime, strftime
from sagemaker.s3 import S3Uploader
session = boto3.session.Session()

# Grab admin resources (S3 Bucket name, IAM Roles and Docker Image for Training)
with open('admin_setup.txt', 'r') as filehandle:
    admin_setup = json.load(filehandle)

# MLOps Hygiene
WORKFLOW_NAME = "my-project-2"
BUCKET = admin_setup["project_bucket"]
SOURCE_DATA = admin_setup["raw_data_path"]
BRANCH = "master"
REPO = admin_setup["repo_name"]

REGION = session.region_name
TRAINING_IMAGE = admin_setup["docker_image"]
WORKFLOW_EXECUTION_ROLE = admin_setup["workflow_execution_role"]
WORKFLOW_DATE_TIME = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
TRAINING_JOB_NAME = "{}-{}".format(WORKFLOW_NAME, WORKFLOW_DATE_TIME)
SOURCE_CODE_PREFIX = "{}/{}".format(WORKFLOW_DATE_TIME, "source-code")

my_workflow_input = {
    #ADMIN
    "REGION":REGION,
    "ROLE_ARN":WORKFLOW_EXECUTION_ROLE,
    "BUCKET":BUCKET,
    "WORKFLOW_NAME":WORKFLOW_NAME,
    "WORKFLOW_DATE_TIME":WORKFLOW_DATE_TIME,
    "DATA_SOURCE":SOURCE_DATA,

    # CodeCommit
    "REPO":REPO,
    "BRANCH":BRANCH,
    "DATA_PROCESSING_DIR": "sagemaker-processing-src",
    "ML_DIR": "sagemaker-train-serve-src",
    
    # SM Processing
    "PROCESSING_SCRIPT":"processing.py",
    "PROCESSING_IMAGE":TRAINING_IMAGE,
    "PROCESSING_INSTANCE_TYPE":"ml.c5.xlarge",
    "PROCESSING_INSTANCE_COUNT":1,
    "PROCESSING_VOLUME_SIZE_GB":10,
    
    # SM TRAINING
    "TRAINING_SCRIPT":"train.py",
    "TRAINING_IMAGE":TRAINING_IMAGE,
    "TRAINING_INSTANCE_TYPE":"ml.c5.xlarge",
    "TRAINING_INSTANCE_COUNT":1,
    "TRAINING_VOLUME_SIZE_GB":10,
    
    # SM SERVING
    "SERVING_SCRIPT":"serve.py",
    "SERVING_IMAGE":TRAINING_IMAGE,
    "SERVING_INSTANCE_TYPE":"ml.c5.xlarge",
    "SERVING_INSTANCE_COUNT":1,
    "SERVING_VOLUME_SIZE_GB":10,
}

# The following method will be used throughout this notebook to create Lambda functions without going to the console
session = sagemaker.Session()
lambda_client = boto3.client('lambda')

def create_lambda_function(zip_name, lambda_source_code, function_name, description):
    zf = zipfile.ZipFile(zip_name, mode='w')
    zf.write(lambda_source_code, arcname=lambda_source_code.split('/')[-1])
    zf.close()

    S3Uploader.upload(local_path=zip_name, 
                      desired_s3_uri="s3://{}/{}".format(BUCKET, SOURCE_CODE_PREFIX),
                      session=session
                     )

    response = lambda_client.create_function(
        FunctionName=function_name,
        Runtime='python3.6',
        Role=WORKFLOW_EXECUTION_ROLE,
        Handler=zip_name.split('.')[0]+'.lambda_handler',
        Code={
            'S3Bucket': BUCKET,
            'S3Key': '{}/{}'.format(SOURCE_CODE_PREFIX, zip_name)
        },
        Description=description,
        Timeout=180,
        MemorySize=256
    )

### Step 1: Move Code from CodeCommit to S3
The first step in training a model on sagemaker is to copy our source code to S3. This step is automatically done for you when you use the SageMaker SDK.

In [2]:
!pygmentize ./workflow-orchestration-src/codecommit_to_s3.py

[34mimport[39;49;00m [04m[36mboto3[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mmimetypes[39;49;00m
[34mimport[39;49;00m [04m[36mtarfile[39;49;00m
[34mimport[39;49;00m [04m[36mio[39;49;00m

[34mdef[39;49;00m [32mlambda_handler[39;49;00m(event, context):
    [33m""" Pulls AWS Glue and SageMaker source code from CodeCommit and writes it to S3.[39;49;00m
[33m    This funciton creates a tarball of the SageMaker scripts before sending to S3 since[39;49;00m
[33m    SageMaker training jobs expect code to be in a tarball on S3.[39;49;00m
[33m    """[39;49;00m
    [37m# target bucket[39;49;00m
    bucket = boto3.resource([33m'[39;49;00m[33ms3[39;49;00m[33m'[39;49;00m).Bucket(event[[33m'[39;49;00m[33mBUCKET[39;49;00m[33m'[39;49;00m])
    output_prefix = [33m"[39;49;00m[33m{}[39;49;00m[33m/[39;49;00m[33m{}[39;49;00m[33m"[39;49;00m.format(event[[33m"[39;49;00m[33mWORKFLOW_DATE_TIME[39;

Let's run the above script from a Lambda function, this will help us automoate this task later.

First create the Lambda function:

In [3]:
create_lambda_function(zip_name="codecommit_to_s3.zip",
                       lambda_source_code="./workflow-orchestration-src/codecommit_to_s3.py",
                       function_name=WORKFLOW_NAME + '-codecommit-to-s3',
                       description="Copy code files from CodeCommit to a tarball on S3"
                      )

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


Run it:

In [34]:
response = lambda_client.invoke(
    FunctionName=WORKFLOW_NAME + '-codecommit-to-s3',
    Payload=json.dumps(my_workflow_input).encode()
)

In [35]:
response["Payload"].read()

b'"SUCCESS"'

In [36]:
!aws s3 ls {"s3://{}/{}/source-code/".format(BUCKET, WORKFLOW_DATE_TIME)}

2020-09-23 05:16:22       2894 codecommit_to_s3.zip
2020-09-23 05:16:34       3778 create_sagemaker_prcoessing_job.zip
2020-09-23 05:27:54       3211 create_sagemaker_training_job.zip
2020-09-23 06:04:02       1493 processing.py
2020-09-23 05:16:53       1010 query_data_processing_status.zip
2020-09-23 05:28:07       1046 query_training_status.zip
2020-09-23 06:04:01       1695 sourcedir.tar.gz


## Step 2: Run SageMaker Processing Job with `boto3`

The `boto3` client for SageMaker is more verbose than the SageMaker SDK yet gives more visibility in the low-level details of Amazon SageMaker.

Let's look at the python script for our data processing:

In [7]:
!pygmentize ./sagemaker-processing-src/processing.py

[34mimport[39;49;00m [04m[36margparse[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m

[34mimport[39;49;00m [04m[36mnumpy[39;49;00m [34mas[39;49;00m [04m[36mnp[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m
[37m#from sklearn.model_selection import train_test_split[39;49;00m
[37m#from sklearn.preprocessing import PowerTransformer, StandardScaler[39;49;00m

[34mfrom[39;49;00m [04m[36mtime[39;49;00m [34mimport[39;49;00m gmtime, strftime


LOCAL_DATA_PATH = [33m"[39;49;00m[33m/opt/ml/processing[39;49;00m[33m"[39;49;00m 

[34mif[39;49;00m [31m__name__[39;49;00m == [33m"[39;49;00m[33m__main__[39;49;00m[33m"[39;49;00m:
    parser = argparse.ArgumentParser()
    parser.add_argument([33m'[39;49;00m[33m--train-test-split-ratio[39;49;00m[33m'[39;49;00m, [36mtype[39;49;00m=[36mfloat[39;49;00m, default=[34m0.3[39;49;00m)
    args, _ = parser.parse_known_args()
   

To run the above script, we will use [boto3.client('sagemaker')
.create_processing_job()](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_processing_job) inside a lambda function.

Here is the code for the lambda function:

In [8]:
!pygmentize ./workflow-orchestration-src/create_sagemaker_prcoessing_job.py

[34mimport[39;49;00m [04m[36mboto3[39;49;00m
sagemaker_boto3 = boto3.client([33m'[39;49;00m[33msagemaker[39;49;00m[33m'[39;49;00m)

[34mdef[39;49;00m [32mlambda_handler[39;49;00m(event, context):
    BUCKET = event[[33m"[39;49;00m[33mBUCKET[39;49;00m[33m"[39;49;00m]
    WORKFLOW_DATE_TIME = event[[33m"[39;49;00m[33mWORKFLOW_DATE_TIME[39;49;00m[33m"[39;49;00m]
    JOB_NAME = [33m"[39;49;00m[33m{}[39;49;00m[33m-[39;49;00m[33m{}[39;49;00m[33m"[39;49;00m.format(event[[33m"[39;49;00m[33mWORKFLOW_NAME[39;49;00m[33m"[39;49;00m], WORKFLOW_DATE_TIME)

    DATA_SOURCE = event[[33m"[39;49;00m[33mDATA_SOURCE[39;49;00m[33m"[39;49;00m]
    SOURCE_CODE_PREFIX = [33m"[39;49;00m[33m{}[39;49;00m[33m/source-code[39;49;00m[33m"[39;49;00m.format(WORKFLOW_DATE_TIME)
    PROCESSING_SCRIPT = event[[33m"[39;49;00m[33mPROCESSING_SCRIPT[39;49;00m[33m"[39;49;00m]
    
    [37m# Output data paths[39;49;00m
    TRAIN_PATH = [33m'[39;49;00m[33ms3://

Let's build it:

In [9]:
create_lambda_function(zip_name="create_sagemaker_prcoessing_job.zip",
                       lambda_source_code="./workflow-orchestration-src/create_sagemaker_prcoessing_job.py",
                       function_name=WORKFLOW_NAME + '-create-sagemaker-prcoessing-job',
                       description="Creates Sagemaker Processing Job"
                      )

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


Run it:

In [10]:
response = lambda_client.invoke(
    FunctionName=WORKFLOW_NAME + '-create-sagemaker-prcoessing-job',
    Payload=json.dumps(my_workflow_input).encode()
)

In [11]:
response["Payload"].read()

b'{"REGION": "us-east-1", "ROLE_ARN": "arn:aws:iam::227921966468:role/My-StepFunction-Workflow-Role", "BUCKET": "my-project-227921966468", "WORKFLOW_NAME": "my-project-2", "WORKFLOW_DATE_TIME": "2020-09-23-05-16-18", "DATA_SOURCE": "s3://my-datalake-227921966468/data/boston.csv", "REPO": "my-project", "BRANCH": "master", "DATA_PROCESSING_DIR": "sagemaker-processing-src", "ML_DIR": "sagemaker-train-serve-src", "PROCESSING_SCRIPT": "processing.py", "PROCESSING_IMAGE": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3", "PROCESSING_INSTANCE_TYPE": "ml.c5.xlarge", "PROCESSING_INSTANCE_COUNT": 1, "PROCESSING_VOLUME_SIZE_GB": 10, "TRAINING_SCRIPT": "train.py", "TRAINING_IMAGE": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3", "TRAINING_INSTANCE_TYPE": "ml.c5.xlarge", "TRAINING_INSTANCE_COUNT": 1, "TRAINING_VOLUME_SIZE_GB": 10, "SERVING_SCRIPT": "serve.py", "SERVING_IMAGE": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sag

Let's build a mechanism to check on the processing job status... again using a Lambda!

In [12]:
create_lambda_function(zip_name='query_data_processing_status.zip',
                       lambda_source_code='./workflow-orchestration-src/query_data_processing_status.py',
                       function_name=WORKFLOW_NAME + '-query-data-processing-status',
                       description='Get Status of SageMaker Processing Job'
                      )

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


#### Make sure the SageMaker processing job is done (status = Completed) before luanching the training step

In [22]:
%cd ./workflow-orchestration-src
import query_data_processing_status as qs
print(qs.lambda_handler(my_workflow_input, ""))
%cd ..

/home/ec2-user/SageMaker/my-github-repos/eventengine/serverless-mlops-with-aws-sagemaker-lambda-and-stepfunctions/workflow-orchestration-src
{'statusCode': 200, 'ProcessingJobStatus': 'Completed'}
/home/ec2-user/SageMaker/my-github-repos/eventengine/serverless-mlops-with-aws-sagemaker-lambda-and-stepfunctions


In [23]:
!aws s3 ls {"s3://{}/{}/".format(BUCKET, WORKFLOW_DATE_TIME)}

                           PRE data/
                           PRE source-code/


## Step 3: Create SageMaker Training Job Using `boto3`

When using `boto3` to launch a training job, we must explicitly point it to our source code on S3 and docker image in addition to what SageMaker estimators expect.

Let's look at the code for the `create_sagemaker_training_job` lambda function.

In [24]:
!pygmentize ./workflow-orchestration-src/create_sagemaker_training_job.py

[34mimport[39;49;00m [04m[36mboto3[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
sagemaker_boto3 = boto3.client([33m'[39;49;00m[33msagemaker[39;49;00m[33m'[39;49;00m)

[34mdef[39;49;00m [32mlambda_handler[39;49;00m(event, context):
    [33m""" Creates a SageMaker training job[39;49;00m
[33m    """[39;49;00m

    BUCKET = event[[33m"[39;49;00m[33mBUCKET[39;49;00m[33m"[39;49;00m]
    WORKFLOW_DATE_TIME = event[[33m"[39;49;00m[33mWORKFLOW_DATE_TIME[39;49;00m[33m"[39;49;00m]
    PREFIX = [33m"[39;49;00m[33ms3://[39;49;00m[33m{}[39;49;00m[33m/[39;49;00m[33m{}[39;49;00m[33m"[39;49;00m.format(BUCKET, WORKFLOW_DATE_TIME)

    TRAINING_DATA = [33m"[39;49;00m[33m{}[39;49;00m[33m/data/train/train.csv[39;49;00m[33m"[39;49;00m.format(PREFIX)
    VALIDATION_DATA = [33m"[39;49;00m[33m{}[39;49;00m[33m/data/validation/validation.csv[39;49;00m[33m"[39;49;00m.format(PREFIX)
    SOURCE_CODE = [33m"[39;49;00m[33m{}[39;49;00m[33m/[3

Let's create it:

In [25]:
create_lambda_function(zip_name='create_sagemaker_training_job.zip',
                       lambda_source_code='./workflow-orchestration-src/create_sagemaker_training_job.py',
                       function_name=WORKFLOW_NAME + '-create-sagemaker-training-job',
                       description='Creates SageMaker Training Job'
                      )

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


Run the training job once processing job is done:

In [26]:
response = lambda_client.invoke(
    FunctionName=WORKFLOW_NAME + '-create-sagemaker-training-job',
    Payload=json.dumps(my_workflow_input).encode()
)

In [27]:
response["Payload"].read()

b'{"TrainingJobArn": "arn:aws:sagemaker:us-east-1:227921966468:training-job/my-project-2-2020-09-23-05-16-18", "ResponseMetadata": {"RequestId": "d3bc01c5-791b-475b-aa54-1304914ceb0e", "HTTPStatusCode": 200, "HTTPHeaders": {"x-amzn-requestid": "d3bc01c5-791b-475b-aa54-1304914ceb0e", "content-type": "application/x-amz-json-1.1", "content-length": "107", "date": "Wed, 23 Sep 2020 05:27:57 GMT"}, "RetryAttempts": 0}}'

Let's check on its status

In [28]:
create_lambda_function(zip_name='query_training_status.zip',
                       lambda_source_code='./workflow-orchestration-src/query_training_status.py',
                       function_name=WORKFLOW_NAME + '-query-training-status',
                       description='Get Status of SageMaker Training Job'
                      )

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


#### Make sure the SageMaker training job is done (status = Completed) before deploying the model

In [30]:
%cd ./workflow-orchestration-src
import query_training_status as qs
print(qs.lambda_handler(my_workflow_input, ""))
%cd ..

/home/ec2-user/SageMaker/my-github-repos/eventengine/serverless-mlops-with-aws-sagemaker-lambda-and-stepfunctions/workflow-orchestration-src
{'statusCode': 200, 'TrainingJobStatus': 'Failed'}
/home/ec2-user/SageMaker/my-github-repos/eventengine/serverless-mlops-with-aws-sagemaker-lambda-and-stepfunctions


In [31]:
!aws s3 ls {"s3://{}/{}/".format(BUCKET, WORKFLOW_DATE_TIME)}

                           PRE data/
                           PRE source-code/


## Step 4: Deploy model on SageMaker using model artifacts on S3 using `boto3`

#### If training is done, then check model accuracy before deploying

In [37]:
create_lambda_function(zip_name='query_model_accuracy.zip',
                       lambda_source_code='./workflow-orchestration-src/query_model_accuracy.py',
                       function_name=WORKFLOW_NAME + '-query-model-accuracy',
                       description='Get Model Accuracy from SageMaker Training Job'
                      )

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


In [38]:
%cd ./workflow-orchestration-src
import query_model_accuracy as qs
print(qs.lambda_handler(my_workflow_input, ""))
%cd ..

/home/ec2-user/SageMaker/my-github-repos/eventengine/serverless-mlops-with-aws-sagemaker-lambda-and-stepfunctions/workflow-orchestration-src
{'statusCode': 200, 'trainingMetrics': []}
/home/ec2-user/SageMaker/my-github-repos/eventengine/serverless-mlops-with-aws-sagemaker-lambda-and-stepfunctions


Let's look at the code for the `deploy_sagemaker_model` lambda function. This function will be incharge of creating a SageMaker endpoint for our trained model. If endpoint exists, then it will update the endpoint with the new retrained model.

In [41]:
!pygmentize ./workflow-orchestration-src/deploy_sagemaker_model.py

[34mimport[39;49;00m [04m[36mboto3[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m

sagemaker_boto3 = boto3.client([33m'[39;49;00m[33msagemaker[39;49;00m[33m'[39;49;00m)

[34mdef[39;49;00m [32mlambda_handler[39;49;00m(event, context):
    [33m""" Creates a SageMaker model and either[39;49;00m
[33m    updates or creates an endpoint[39;49;00m
[33m    """[39;49;00m
    BUCKET = event[[33m"[39;49;00m[33mBUCKET[39;49;00m[33m"[39;49;00m]
    WORKFLOW_NAME = event[[33m"[39;49;00m[33mWORKFLOW_NAME[39;49;00m[33m"[39;49;00m]
    WORKFLOW_DATE_TIME = event[[33m"[39;49;00m[33mWORKFLOW_DATE_TIME[39;49;00m[33m"[39;49;00m]

    prefix = [33m"[39;49;00m[33ms3://[39;49;00m[33m{}[39;49;00m[33m/[39;49;00m[33m{}[39;49;00m[33m"[39;49;00m.format(BUCKET, WORKFLOW_DATE_TIME)
    name = [33m"[39;49;00m[33m{}[39;49;00m[33m-[39;49;00m[33m{}[39;49;00m[33m"[39;49;00m.format(WORKFLOW_NAME, WORKFLOW_DATE_TIME)
    endpoint = WORKFLO

Again, let's put this function in a Lambda:

In [42]:
create_lambda_function(zip_name='deploy_sagemaker_model.zip',
                       lambda_source_code='./workflow-orchestration-src/deploy_sagemaker_model.py',
                       function_name=WORKFLOW_NAME + '-deploy-sagemaker-model-job',
                       description='Creates and Deploys SageMaker Model From Training Artifacts'
                      )

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


And run it:

In [43]:
response = lambda_client.invoke(
    FunctionName=WORKFLOW_NAME + '-deploy-sagemaker-model-job',
    Payload=json.dumps(my_workflow_input).encode()
)

In [44]:
response["Payload"].read()

b'{"errorMessage": "An error occurred (ValidationException) when calling the CreateModel operation: Could not find model data at s3://my-project-227921966468/2020-09-23-05-16-18/model-artifacts/my-project-2-2020-09-23-05-16-18/output/model.tar.gz.", "errorType": "ClientError", "stackTrace": [["/var/task/deploy_sagemaker_model.py", 31, "lambda_handler", "create_model(name, container, model_data_url, event)"], ["/var/task/deploy_sagemaker_model.py", 78, "create_model", "raise(e)"], ["/var/task/deploy_sagemaker_model.py", 73, "create_model", "ExecutionRoleArn=env_params[\'ROLE_ARN\']"], ["/var/runtime/botocore/client.py", 316, "_api_call", "return self._make_api_call(operation_name, kwargs)"], ["/var/runtime/botocore/client.py", 635, "_make_api_call", "raise error_class(parsed_response, operation_name)"]]}'

### Test Endpoint

In [20]:
from six import BytesIO
import numpy as np
import boto3
import json
import os

# Extract Enviroment Vars
ENDPOINT_NAME = "octank"
PROD_FEATURES_TABLE_NAME = "octank-prod-features"

# Set up DynamoDB and SageMaker clients
sagemaker_runtime = boto3.client('sagemaker-runtime')
dynamodb = boto3.resource('dynamodb')
prod_features_table = dynamodb.Table(PROD_FEATURES_TABLE_NAME)


# Fetch features from DynamoDB
payload = prod_features_table.get_item(
    Key={'tconst': "tt1064899",
         'season': 1
        }
)["Item"]

# JSON serialize the features vector
converted_payload = {key:str(value) for (key,value) in payload.items()}
serialized_payload = json.dumps(converted_payload).encode('utf-8')

In [21]:
# Invoke SageMaker endpoint
response = sagemaker_runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME,
                                             Body=serialized_payload,
                                             ContentType='application/json'
                                            )


ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (500) from model with message "<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>500 Internal Server Error</title>
<h1>Internal Server Error</h1>
<p>The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.</p>
". See https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws/sagemaker/Endpoints/octank in account 227921966468 for more information.

In [19]:
# Endpoint sends serialized numpy array, let's unpack
stream = BytesIO(response['Body'].read())
prediction = np.load(stream, allow_pickle=True)
print(prediction)

[0.00026849]
