## Setup

First, we'll need to install and load all the required modules. Then we'll create fine-grained IAM roles for the Lambda, Glue, and Step Functions resources that we will create. The IAM roles grant the services permissions within your AWS environment.

In [1]:
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

Requirement already up-to-date: stepfunctions in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (1.0.0.9)
Requirement not upgraded as not directly required: pyyaml in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from stepfunctions) (5.3.1)
Requirement not upgraded as not directly required: sagemaker>=1.42.8 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from stepfunctions) (1.64.0)
Requirement not upgraded as not directly required: boto3>=1.9.213 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from stepfunctions) (1.14.3)
Requirement not upgraded as not directly required: protobuf>=3.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=1.42.8->stepfunctions) (3.6.1)
Requirement not upgraded as not directly required: smdebug-rulesconfig==0.1.4 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=1.42.8->stepfunctions) (0.1.4)
Requireme

In [2]:
import uuid
import time
import logging
from IPython import display
import stepfunctions
import boto3
import sagemaker

from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker import get_execution_role
from sagemaker import s3_input
from sagemaker.s3 import S3Uploader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

In [3]:
session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

region = boto3.Session().region_name
sm_arn = get_execution_role()
id = uuid.uuid4().hex

#Create a unique name for the AWS Glue job to be created. If you change the 
#default name, you may need to change the Step Functions execution role.
glue_job_name = 'glue-customer-churn-etl-{}'.format(id)

#Create a unique name for the AWS Lambda function to be created. If you change
#the default name, you may need to change the Step Functions execution role.
function_name = 'query-training-status-{}'.format(id)

In [4]:
print('Notebook instance Role ARN:', sm_arn)
stack_name='ml-pipeline-config'
print('Stack Name:', stack_name)

Notebook instance Role ARN: arn:aws:iam::725879053979:role/MLOps
Stack Name: ml-pipeline-config


In [5]:
!aws cloudformation create-stack --stack-name {stack_name} --template-body file://cfns/cfn-config-ml-pipeline.json --parameters ParameterKey=NotebookRoleArn,ParameterValue={sm_arn} --capabilities CAPABILITY_NAMED_IAM
!aws cloudformation wait stack-create-complete --stack-name {stack_name}

{
    "StackId": "arn:aws:cloudformation:us-east-1:725879053979:stack/ml-pipeline-config/14c9f080-b19a-11ea-985c-12da3ecd6186"
}


In [6]:
# Let's save Cfn output variables to make it easier
bucket = !aws cloudformation describe-stacks --stack-name {stack_name} --query "Stacks[0].Outputs[?OutputKey=='S3PipelineBucket'].OutputValue" --output text; 
bucket = bucket.s

step_functions_role_arn = !aws cloudformation describe-stacks --stack-name {stack_name} --query "Stacks[0].Outputs[?OutputKey=='StepFunctionsRoleArn'].OutputValue" --output text; 
step_functions_role_arn = step_functions_role_arn.s

glue_role_arn = !aws cloudformation describe-stacks --stack-name {stack_name} --query "Stacks[0].Outputs[?OutputKey=='GlueRoleArn'].OutputValue" --output text; 
glue_role_arn = glue_role_arn.s

lambda_role_arn = !aws cloudformation describe-stacks --stack-name {stack_name} --query "Stacks[0].Outputs[?OutputKey=='LambdaRoleArn'].OutputValue" --output text; 
lambda_role_arn = lambda_role_arn.s


In [7]:
print("Bucket Name:", bucket)
print("step_functions_role_arn:", step_functions_role_arn)
print("glue_role_arn:",glue_role_arn)
print("lambda_role_arn:",lambda_role_arn)

Bucket Name: ml-pipeline-725879053979
step_functions_role_arn: arn:aws:iam::725879053979:role/StepFunctions_DSSDK-725879053979
glue_role_arn: arn:aws:iam::725879053979:role/AWS-Glue-S3-Bucket-Access-725879053979
lambda_role_arn: arn:aws:iam::725879053979:role/query_training_status-role-725879053979


### Prepare the Dataset
This notebook uses the XGBoost algorithm to automate the classification of unhappy customers for telecommunication service providers. The goal is to identify customers who may cancel their service soon so that you can entice them to stay. This is known as customer churn prediction.

The dataset we use is publicly available and was mentioned in the book [Discovering Knowledge in Data](https://www.amazon.com/dp/0470908742/) by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets.

In [8]:
project_name = 'xgboost-churn' #same name of previous S3 path for our xgboost-churn project

#Remember that data was already uploaded in the following structure in the S3 bucket:
# /MY-BUCKET (bb-wksp in my case)
# │ 
# └── xgboost-churn (our project name)
#     ├── raw
#     ├── train 
#     └── validation

raw_prefix = 'raw'
train_prefix = 'train'
val_prefix = 'validation'

source_data = 's3://{}/{}/{}/'.format(bucket, project_name, raw_prefix) 
train_data = 's3://{}/{}/{}/'.format(bucket, project_name, train_prefix)
validation_data = 's3://{}/{}/{}/'.format(bucket, project_name, val_prefix)

print('source_data:',source_data) 
print('train_data:',train_data) 
print('validation_data:',validation_data)

source_data: s3://ml-pipeline-725879053979/xgboost-churn/raw/
train_data: s3://ml-pipeline-725879053979/xgboost-churn/train/
validation_data: s3://ml-pipeline-725879053979/xgboost-churn/validation/


In [9]:
s3 = boto3.client('s3')
file_name = 'data/cleaned-customer-churn.csv'
object_name = '{}/{}/churn.csv'.format(project_name, raw_prefix)
s3.upload_file(file_name, bucket, object_name)

## Create Resources
In the following steps we'll create the Glue job and Lambda function that are called from the Step Functions workflow.

### Create the AWS Glue Job

In [10]:
code_path = 's3://{}/{}/code'.format(bucket, project_name, train_prefix)
print('code_path:', code_path)

code_path: s3://ml-pipeline-725879053979/xgboost-churn/code


In [11]:
glue_script_location = S3Uploader.upload(local_path='./code/simple_glue_etl.py',
                               desired_s3_uri=code_path,
                               session=session)
glue_client = boto3.client('glue')

response = glue_client.create_job(
    Name=glue_job_name,
    Description='PySpark job to extract the data and split in to training and validation data sets',
    Role=glue_role_arn,
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': glue_script_location,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--job-language': 'python'
    },
    GlueVersion='1.0',
    WorkerType='Standard',
    NumberOfWorkers=2,
    Timeout=60
)

In [12]:
response

{'Name': 'glue-customer-churn-etl-beb8008a875a479ab6842ba94f371abc',
 'ResponseMetadata': {'RequestId': 'f4ae4c2e-7a81-4543-832d-2f532088fb98',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 18 Jun 2020 19:30:56 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '67',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'f4ae4c2e-7a81-4543-832d-2f532088fb98'},
  'RetryAttempts': 0}}

### Create the AWS Lambda Function

In [13]:
import zipfile
zip_name = 'query_training_status.zip'
lambda_source_code = './code/query_training_status.py'

zf = zipfile.ZipFile(zip_name, mode='w')
zf.write(lambda_source_code, arcname=lambda_source_code.split('/')[-1])
zf.close()

S3Uploader.upload(local_path=zip_name, 
                  desired_s3_uri=code_path,
                  session=session)

's3://ml-pipeline-725879053979/xgboost-churn/code/query_training_status.zip'

In [14]:
lambda_client = boto3.client('lambda')

response = lambda_client.create_function(
    FunctionName=function_name,
    Runtime='python3.7',
    Role=lambda_role_arn,
    Handler='query_training_status.lambda_handler',
    Code={
        'S3Bucket': bucket,
        'S3Key': '{}/code/{}'.format(project_name, zip_name)
    },
    Description='Queries a SageMaker training job and return the results.',
    Timeout=15,
    MemorySize=128
)

In [15]:
response

{'ResponseMetadata': {'RequestId': '2a2e32ad-d539-4670-b123-869b74a71947',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Thu, 18 Jun 2020 19:30:56 GMT',
   'content-type': 'application/json',
   'content-length': '966',
   'connection': 'keep-alive',
   'x-amzn-requestid': '2a2e32ad-d539-4670-b123-869b74a71947'},
  'RetryAttempts': 0},
 'FunctionName': 'query-training-status-beb8008a875a479ab6842ba94f371abc',
 'FunctionArn': 'arn:aws:lambda:us-east-1:725879053979:function:query-training-status-beb8008a875a479ab6842ba94f371abc',
 'Runtime': 'python3.7',
 'Role': 'arn:aws:iam::725879053979:role/query_training_status-role-725879053979',
 'Handler': 'query_training_status.lambda_handler',
 'CodeSize': 1580,
 'Description': 'Queries a SageMaker training job and return the results.',
 'Timeout': 15,
 'MemorySize': 128,
 'LastModified': '2020-06-18T19:30:56.505+0000',
 'CodeSha256': 'pmTuDs1JWQtxpuHcnNzFV8bMaTYlC3/ng9ao1j6IKBo=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassTh

### Configure the AWS SageMaker Estimator

In [44]:
container = get_image_uri(region, 'xgboost')

xgb = sagemaker.estimator.Estimator(container,
                                    sm_arn, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/training_output'.format(bucket, project_name))

xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        eval_metric='error',
                        num_round=100)


## Build a Machine Learning Workflow

You can use a state machine workflow to create a model retraining pipeline. The AWS Data Science Workflows SDK provides several AWS SageMaker workflow steps that you can use to construct an ML pipeline. In this tutorial you will create the following steps:

* [**ETLStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) - Starts an AWS Glue job to extract the latest data from our source database and prepare our data.
* [**TrainingStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) - Creates the training step and passes the defined estimator.
* [**ModelStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) - Creates a model in SageMaker using the artifacts created during the TrainingStep.
* [**LambdaStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) - Creates the task state step within our workflow that calls a Lambda function.
* [**ChoiceStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Choice) - Creates the choice state step within our workflow.
* [**EndpointConfigStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) - Creates the endpoint config step to define the new configuration for our endpoint.
* [**EndpointStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointStep) - Creates the endpoint step to update our model endpoint.
* [**FailStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) - Creates fail state step within our workflow.

In [45]:
# SageMaker expects unique names for each job, model and endpoint. 
# If these names are not unique the execution will fail.
execution_input = ExecutionInput(schema={
    'TrainingJobName': str,
    'GlueJobName': str,
    'ModelName': str,
    'EndpointName': str,
    'LambdaFunctionName': str,
    'UpdateEndpoint': bool
})

### Create an ETL step with AWS Glue
In the following cell, we create a Glue step thats runs an AWS Glue job. The Glue job extracts the latest data from our source database, removes unnecessary columns, splits the data in to training and validation sets, and saves the data to CSV format in S3. Glue is performing this extraction, transformation, and load (ETL) in a serverless fashion, so there are no compute resources to configure and manage. See the [GlueStartJobRunStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) Compute step in the AWS Step Functions Data Science SDK documentation.

In [46]:
etl_step = steps.GlueStartJobRunStep(
    'Extract, Transform, Load',
    parameters={"JobName": execution_input['GlueJobName'],
                "Arguments":{
                    '--S3_SOURCE': source_data,
                    '--S3_DEST': 's3a://{}/{}/'.format(bucket, project_name),
                    '--TRAIN_KEY': train_prefix + '/',
                    '--VAL_KEY': val_prefix +'/'}
               }
)

### Create a SageMaker Training Step 

In the following cell, we create the training step and pass the estimator we defined above. See  [TrainingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [47]:
training_step = steps.TrainingStep(
    'Model Training', 
    estimator=xgb,
    data={
        'train': s3_input(train_data, content_type='csv'),
        'validation': s3_input(validation_data, content_type='csv')
    },
    job_name=execution_input['TrainingJobName'],
    wait_for_completion=True
)

### Create a Model Step 

In the following cell, we define a model step that will create a model in Amazon SageMaker using the artifacts created during the TrainingStep. See  [ModelStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) in the AWS Step Functions Data Science SDK documentation to learn more.

The model creation step typically follows the training step. The Step Functions SDK provides the [get_expected_model](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep.get_expected_model) method in the TrainingStep class to provide a reference for the trained model artifacts. Please note that this method is only useful when the ModelStep directly follows the TrainingStep.

In [48]:
model_step = steps.ModelStep(
    'Save Model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName'],
    result_path='$.ModelStepResults'
)

### Create a Lambda Step
In the following cell, we define a lambda step that will invoke the previously created lambda function as part of our Step Function workflow. See [LambdaStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [49]:
lambda_step = steps.compute.LambdaStep(
    'Query Training Results',
    parameters={  
        "FunctionName": execution_input['LambdaFunctionName'],
        'Payload':{
            "TrainingJobName.$": '$.TrainingJobName'
        }
    }
)

### Create a Choice State Step 
In the following cell, we create a choice step in order to build a dynamic workflow. This choice step branches based off of the results of our SageMaker training step: did the training job fail or should the model be saved and the endpoint be updated? We will add specfic rules to this choice step later on in section 8 of this notebook.

In [50]:
check_accuracy_step = steps.states.Choice(
    'Accuracy > 90%'
)

### Create an Endpoint Configuration Step
In the following cell we create an endpoint configuration step. See [EndpointConfigStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [51]:
data_capture_configuration = sagemaker.model_monitor.data_capture_config.DataCaptureConfig(
    enable_capture=True, 
    sampling_percentage=100, 
    destination_s3_uri='s3://{}/{}/endpoint_monitoring/'.format(bucket, project_name), 
    sagemaker_session=session
)

In [52]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    variant_name='xgbVariant',
    instance_type='ml.m4.xlarge',
    data_capture_config=data_capture_configuration
)

### Update the Model Endpoint Step
In the following cell, we create the Endpoint step to deploy the new model as a managed API endpoint, updating an existing SageMaker endpoint if our choice state is sucessful.

In [53]:
endpoint_step = steps.EndpointStep(
    'Update Model Endpoint',
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName'],
#     update=execution_input['UpdateEndpoint']
)

### Create the Fail State Step
In addition, we create a Fail step which proceeds from our choice state if the validation accuracy of our model is lower than the threshold we define. See [FailStateStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation to learn more.

In [54]:
fail_step = steps.states.Fail(
    'Model Accuracy Too Low',
    comment='Validation accuracy lower than threshold'
)

### Add Rules to Choice State
In the cells below, we add a threshold rule to our choice state. Therefore, if the validation accuracy of our model is below 0.90, we move to the Fail State. If the validation accuracy of our model is above 0.90, we move to the save model step with proceeding endpoint update. See [here](https://github.com/dmlc/xgboost/blob/master/doc/parameter.rst) for more information on how XGBoost calculates classification error.

For binary classification problems the XGBoost algorithm defines the model error as: 

\begin{equation*}
\frac{incorret\:predictions}{total\:number\:of\:predictions}
\end{equation*}

To achieve an accuracy of 90%, we need error <.10.

In [55]:
threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(variable=lambda_step.output()['Payload']['trainingMetrics'][0]['Value'], value=.1)

check_accuracy_step.add_choice(rule=threshold_rule, next_step=endpoint_config_step)
check_accuracy_step.default_choice(next_step=fail_step)

### Link all the Steps Together
Finally, create your workflow definition by chaining all of the steps together that we've created. See [Chain](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.states.Chain) in the AWS Step Functions Data Science SDK documentation to learn more.

In [56]:
endpoint_config_step.next(endpoint_step)

Update Model Endpoint EndpointStep(resource='arn:aws:states:::sagemaker:createEndpoint', parameters={'EndpointConfigName': <stepfunctions.inputs.placeholders.ExecutionInput object at 0x7f9866cebbe0>, 'EndpointName': <stepfunctions.inputs.placeholders.ExecutionInput object at 0x7f9866cebc88>}, type='Task')

In [57]:
workflow_definition = steps.Chain([
    etl_step,
    training_step,
    model_step,
    lambda_step,
    check_accuracy_step
])

## Run the Workflow
Create your workflow using the workflow definition above, and render the graph with [render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):

In [76]:
pipeline_name = f'Demo-ML-Pipeline-{id}'

In [77]:
workflow = Workflow(
    name=pipeline_name,
    definition=workflow_definition,
    role=step_functions_role_arn,
    execution_input=execution_input
)

# For using existing workflow
# workflow = Workflow.list_workflows(); wkflws
# workflow = Workflow.attach(workflow[0]['stateMachineArn']); wkflw

In [78]:
workflow.render_graph(portrait=True)

### Export to CloudFormation (if desired)
It is possible to simply export the State Machine above in a CloudFormation template, enabling teams to easily re-use pipelines:

In [79]:
print(workflow.get_cloudformation_template())

AWSTemplateFormatVersion: '2010-09-09'
Description: CloudFormation template for AWS Step Functions - State Machine
Resources:
  StateMachineComponent:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: Demo-ML-Pipeline-beb8008a875a479ab6842ba94f371abc3
      DefinitionString: |-
        {
          "StartAt": "Extract, Transform, Load",
          "States": {
            "Extract, Transform, Load": {
              "Parameters": {
                "JobName.$": "$$.Execution.Input['GlueJobName']",
                "Arguments": {
                  "--S3_SOURCE": "s3://ml-pipeline-725879053979/xgboost-churn/raw/",
                  "--S3_DEST": "s3a://ml-pipeline-725879053979/xgboost-churn/",
                  "--TRAIN_KEY": "train/",
                  "--VAL_KEY": "validation/"
                }
              },
              "Resource": "arn:aws:states:::glue:startJobRun.sync",
              "Type": "Task",
              "Next": "Model Training"
            }

### Create the workflow
More details in documentation of AWS Step Functions: [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create)

In [80]:
workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:us-east-1:725879053979:stateMachine:Demo-ML-Pipeline-beb8008a875a479ab6842ba94f371abc3'

Let's see the state machines in the current region in this AWS account:

In [81]:
Workflow.list_workflows(html=True)

Name,Creation Date
Demo-ML-Pipeline-beb8008a875a479ab6842ba94f371abc,"Jun 18, 2020 07:30:57.115 PM"
Demo-ML-Pipeline-beb8008a875a479ab6842ba94f371abc3,"Jun 18, 2020 08:10:53.896 PM"
Demo-ML-Pipeline2-beb8008a875a479ab6842ba94f371abc,"Jun 18, 2020 07:52:19.425 PM"
DevelopingWithStepFunctionsDotNetCore,"Jan 10, 2019 02:08:54.714 AM"


### Let's trigger the ML Pipeline

In [82]:
training_job_name = f'xgb-churn-pipeline-{id}'
model_name = f'CustomerChurn-{id}'
endpoint_name = 'CustomerChurnMLPipeline'
update_endpoint = False
print('INPUT CONFIGURATIONS:\n')
print('TrainingJobName (each Sagemaker Job requires a unique number):\n',training_job_name)
print('\nGlueJobName:\n', glue_job_name)
print('\nModelName (each SageMaker Model requires a unique name,):\n', model_name)
print('\nEndpointName (each Endpoint requires a unique name):\n', endpoint_name)
print('\nLambdaFunctionName:\n', endpoint_name)
print('UpdateEndpoint:', update_endpoint)

INPUT CONFIGURATIONS:

TrainingJobName (each Sagemaker Job requires a unique number):
 xgb-churn-pipeline-beb8008a875a479ab6842ba94f371abc

GlueJobName:
 glue-customer-churn-etl-beb8008a875a479ab6842ba94f371abc

ModelName (each SageMaker Model requires a unique name,):
 CustomerChurn-beb8008a875a479ab6842ba94f371abc

EndpointName (each Endpoint requires a unique name):
 CustomerChurnMLPipeline

LambdaFunctionName:
 CustomerChurnMLPipeline
UpdateEndpoint: False


In [83]:
execution = workflow.execute(
    inputs={
        'TrainingJobName': training_job_name,
        'GlueJobName': glue_job_name,
        'ModelName': model_name,
        'EndpointName': endpoint_name,
        'LambdaFunctionName': function_name,
        'UpdateEndpoint': update_endpoint
    }
)

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m


Render workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress). This generates a snapshot of the current state of your workflow as it executes. This is a static image therefore you must run the cell again to check progress:

In [84]:
# If don't want to wait for the whole pipeline to finish, set to False
wait = True

In [85]:
# Just check current status if we don't want to check all flow
if not wait:
    execution.render_progress(portrait=True)

Running the pipeline for the first time can take a few minutes because we are training and creating a SageMaker endpoint (spinning up instances). After that, updating an existing endpoint with newer models should be faster.

In [88]:
status = execution.describe().get('status')
while status == 'RUNNING' and wait:
    display.clear_output(wait=True)
    display.display(execution.render_progress(portrait=True))
    time.sleep(10)
    status = execution.describe().get('status')

Use [list_executions](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.list_executions) to list all executions for a specific workflow:

In [89]:
workflow.list_executions(html=True)

Name,Status,Started,End Time
6cdeebed-bdac-4ff0-8b51-082b29b3aa54,SUCCEEDED,"Jun 18, 2020 08:11:16.492 PM","Jun 18, 2020 08:29:24.041 PM"


Use [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution:

In [96]:
execution.list_events() #html=True

[{'timestamp': datetime.datetime(2020, 6, 18, 20, 11, 16, 492000, tzinfo=tzlocal()),
  'type': 'ExecutionStarted',
  'id': 1,
  'previousEventId': 0,
  'executionStartedEventDetails': {'input': '{\n    "TrainingJobName": "xgb-churn-pipeline-beb8008a875a479ab6842ba94f371abc3",\n    "GlueJobName": "glue-customer-churn-etl-beb8008a875a479ab6842ba94f371abc",\n    "ModelName": "CustomerChurn-beb8008a875a479ab6842ba94f371abc3",\n    "EndpointName": "CustomerChurnMLPipeline",\n    "LambdaFunctionName": "query-training-status-beb8008a875a479ab6842ba94f371abc",\n    "UpdateEndpoint": false\n}',
   'roleArn': 'arn:aws:iam::725879053979:role/StepFunctions_DSSDK-725879053979'}},
 {'timestamp': datetime.datetime(2020, 6, 18, 20, 11, 16, 548000, tzinfo=tzlocal()),
  'type': 'TaskStateEntered',
  'id': 2,
  'previousEventId': 0,
  'stateEnteredEventDetails': {'name': 'Extract, Transform, Load',
   'input': '{\n    "TrainingJobName": "xgb-churn-pipeline-beb8008a875a479ab6842ba94f371abc3",\n    "GlueJo

### Let's generate some artificial traffic to our endpoint

In [1]:
from stress import stress_button

In [2]:
stress_button

HBox(children=(Button(button_style='success', description='Run stress test', icon='check', style=ButtonStyle()…

Sending some artificial traffic...
Go check endpoint metrics and S3 bucket!


Note that you need to stop the kernel to stop the invocations.

### See traffic metrics and data captured

In your `CustomerChurnMLPipeline` endpoint in SageMaker, select the S3 bucket where the data captured from requests and predictions are stored:

![sm_ep_data_capture](./imgs/sm_ep_data_capture.png)

We should see objects being saved in the S3 bucket:
![s3_data_capture](./imgs/s3_data_capture.png)

In your `CustomerChurnMLPipeline` endpoint in SageMaker, click on the `View invocation metrics` link. We will be redirected to the CloudWatch console:
![sm_cw_metrics](./imgs/sm_ep_metrics.png)

For CloudWatch, <a href="https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#metricsV2:graph=~(metrics~(~(~'AWS*2fSageMaker~'Invocations~'EndpointName~'CustomerChurnMLPipeline~'VariantName~'xgbVariant))~view~'timeSeries~stacked~false~region~'us-east-1~start~'-PT5M~end~'P0D~stat~'Sum~period~60);query=~'*7bAWS*2fSageMaker*2cEndpointName*2cVariantName*7d*20CustomerChurnMLPipeline">just click here.</a>

OR

Configure the Dashboard by selecting the metric `Invocations` and in the `Graphics metrics` choose Invocations and `Sum` in `Statistic`. Choose `Period` equals to 1 Minute and modify graphics in the top to show the window of 5 minutes.

![sm_ep_metrics](./imgs/sm_cw_metrics.png)