# Creating automated Machine Learning pipeline

*** For this lab remember to use the Jupyter Notebook UI (not JupyterLab).** 

>We'll be using the [Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/) which currently [doesn't support for visualizations Jupyter Lab](https://github.com/aws/aws-step-functions-data-science-sdk-python/issues/29). If you want to run it in the JupyterLab environment, no problem, you just won't be able to visualize the Step Functions state machines inside the notebook (you'll have to go to the AWS console).


Now that we have successfully developed the LightGBM model, we can proceed and create our ML pipeline. In a high-level, we want to create the following flow:

>**Raw data** &rarr; **ETL** &rarr; **Train/Test datasets** &rarr; **Train our LightGBM model** &rarr; **Evaluate the F1 score automatically** &rarr; **Deploy model**

This part of the workshop is composed of 4 parts:

1. <a href="#setup">Configuring the <strong>AWS Step Functions Data Science SDK</strong> and creating other necessary resources</a>
2. <a href="#create_resources">Create a <strong>AWS Glue ETL Job</strong> and <strong>AWS Lambda function</strong> for the Extract/Transform/Load step and model evaluation respectively and configure <strong>Amazon SageMaker</strong></a>
3. <a href="#step_functions">Building our Machine Learning pipeline with <strong>AWS Step Functions and the Data Science SDK</strong></a>
4. <a href="#running_the_workflow"><strong>Creating, running and testing the ML Workflow</strong></a>

---

<div id="setup">
<h2>1. Setup</h2>
</div>

First, we'll need to **install and load all the required modules**. Then we'll create fine-grained IAM roles for the Lambda, Glue, and Step Functions resources that we will create. The IAM roles grant the services permissions within your AWS environment.

In [None]:
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

In [None]:
import uuid
import time
import logging
from IPython import display
import stepfunctions
import boto3
import sagemaker

from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker import get_execution_role
from sagemaker import s3_input
from sagemaker.s3 import S3Uploader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

In [None]:
session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

region = boto3.Session().region_name
sm_arn = get_execution_role()
id = uuid.uuid4().hex

#Create a unique name for the AWS Glue job to be created. If you change the 
#default name, you may need to change the Step Functions execution role.
glue_job_name = 'glue-iris-etl-{}'.format(id)

#Create a unique name for the AWS Lambda function to be created. If you change
#the default name, you may need to change the Step Functions execution role.
function_name = 'query-training-status-{}'.format(id)

In [None]:
print('Notebook instance Role ARN:', sm_arn)
stack_name='ml-pipeline-config'
print('Stack Name:', stack_name)

**To facilitate the creation of resources, we use some given CloudFormaion templates for creating IAM roles, policies and a Amazon S3 bucket:**

In [None]:
!aws cloudformation create-stack --stack-name {stack_name} --template-body file://cfns/cfn-config-ml-pipeline.json --parameters ParameterKey=NotebookRoleArn,ParameterValue={sm_arn} --capabilities CAPABILITY_NAMED_IAM
!aws cloudformation wait stack-create-complete --stack-name {stack_name}

In [None]:
# Let's save Cfn output variables to make it easier
bucket = !aws cloudformation describe-stacks --stack-name {stack_name} --query "Stacks[0].Outputs[?OutputKey=='S3PipelineBucket'].OutputValue" --output text; 
bucket = bucket.s

step_functions_role_arn = !aws cloudformation describe-stacks --stack-name {stack_name} --query "Stacks[0].Outputs[?OutputKey=='StepFunctionsRoleArn'].OutputValue" --output text; 
step_functions_role_arn = step_functions_role_arn.s

glue_role_arn = !aws cloudformation describe-stacks --stack-name {stack_name} --query "Stacks[0].Outputs[?OutputKey=='GlueRoleArn'].OutputValue" --output text; 
glue_role_arn = glue_role_arn.s

lambda_role_arn = !aws cloudformation describe-stacks --stack-name {stack_name} --query "Stacks[0].Outputs[?OutputKey=='LambdaRoleArn'].OutputValue" --output text; 
lambda_role_arn = lambda_role_arn.s


In [None]:
ecr_repository_name = 'iris-model'
role = get_execution_role()
account_id = role.split(':')[4]
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

print('ecr_repository_name:', ecr_repository_name)
print('account_id:',account_id)
print('region:',region)
print('SageMaker notebook instance role:',role)
print("Bucket Name:", bucket)
print("step_functions_role_arn:", step_functions_role_arn)
print("glue_role_arn:",glue_role_arn)
print("lambda_role_arn:",lambda_role_arn)

**Prepare the Dataset**

Now we are passed development, we want to save the data in our data lake, in a controled S3 bucket ([AWS Lake Formation can help us with this](https://docs.aws.amazon.com/lake-formation/latest/dg/what-is-lake-formation.html)). The bucket we created with CloudFormation above is going to be used for that.

We'll save the data in the S3 bucket and we will organize it following this pattern:

```
/MY-BUCKET
│ 
└── iris-classification (our project name)
    ├── raw
    ├── train 
    └── validation
    
```

In [None]:
project_name = 'iris-classification' #same name of previous S3 path for our iris-classification project

raw_prefix = 'raw'
train_prefix = 'train'
test_prefix = 'test'

source_data = 's3://{}/{}/{}/'.format(bucket, project_name, raw_prefix) 
train_data = 's3://{}/{}/{}/'.format(bucket, project_name, train_prefix)
validation_data = 's3://{}/{}/{}/'.format(bucket, project_name, test_prefix)

print('source_data:',source_data) 
print('train_data:',train_data) 
print('validation_data:',validation_data)

Let's upload the raw data to our S3 bucket:

In [None]:
s3 = boto3.client('s3')
file_name = '../0_custom_train/lab/data/raw/iris.csv'
object_name = '{}/{}/iris-raw.csv'.format(project_name, raw_prefix)
s3.upload_file(file_name, bucket, object_name)

![s3-ml-pipe](./media/s3-ml-pipe.png)

<div id="create_resources">
<h2>2. Creating the AWS Glue ETL job, AWS Lambda function and configuring Amazon SageMaker</h2>
</div>
 
In the following steps we'll create the Glue job and Lambda function that are called from the Step Functions workflow.

#### Create the AWS Glue Job

In [None]:
code_path = 's3://{}/{}/code'.format(bucket, project_name)
print('code_path:', code_path)

Let's take a look at this simple PySpark code:

In [None]:
!pygmentize ./code/simple_glue_etl.py

In this example we use the **Glue ETL Job with PySpark** just to split the data into a training and validation datasets. 

Obviously here it's an overkill, however the idea could be expanded to much larger datasets, allowing us to perform feature engineering at scale if wanted. Another option would be to use [Amazon SageMaker Processing Jobs](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#amazon-sagemaker-processing) if wanted (the idea would be similar with a cluster of multiple nodes processing the data in parallel).

In [None]:
glue_script_location = S3Uploader.upload(local_path='./code/simple_glue_etl.py',
                               desired_s3_uri=code_path,
                               session=session)
glue_client = boto3.client('glue')

response = glue_client.create_job(
    Name=glue_job_name,
    Description='PySpark job to extract the data and split in to training and validation data sets',
    Role=glue_role_arn,
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': glue_script_location,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--job-language': 'python'
    },
    GlueVersion='2.0',
    WorkerType='Standard',
    NumberOfWorkers=2,
    Timeout=60
)

In [None]:
response

Go to the AWS Glue console and see the created ETL Job:

[Click here!](https://console.aws.amazon.com/glue/home?region=us-east-1#etl:tab=jobs)

![glue-etl](./media/glue-etl.png)



#### Create the AWS Lambda Function

Let's take a look at this simple Lambda function:

In [None]:
!pygmentize ./code/query_training_status.py

The function queries the specified SageMaker training Job with `sm_client.describe_training_job(TrainingJobName=job_name)` and then filters the metrics with the name `validation:f1` (in `if metric['MetricName']=='validation:f1'`).

The function returns the metric to Step Functions. Later, we'll create a state that checks the value of this F1 Score and approve or not the deployment automatically.

In [None]:
import zipfile
zip_name = 'query_training_status.zip'
lambda_source_code = './code/query_training_status.py'

zf = zipfile.ZipFile(zip_name, mode='w')
zf.write(lambda_source_code, arcname=lambda_source_code.split('/')[-1])
zf.close()

S3Uploader.upload(local_path=zip_name, 
                  desired_s3_uri=code_path,
                  session=session)

In [None]:
lambda_client = boto3.client('lambda')

response = lambda_client.create_function(
    FunctionName=function_name,
    Runtime='python3.7',
    Role=lambda_role_arn,
    Handler='query_training_status.lambda_handler',
    Code={
        'S3Bucket': bucket,
        'S3Key': '{}/code/{}'.format(project_name, zip_name)
    },
    Description='Queries a SageMaker training job and return the results.',
    Timeout=15,
    MemorySize=128
)

In [None]:
response

Go to the AWS Lambda console and see the created ETL Job:

[Click here!](https://console.aws.amazon.com/lambda/home?region=us-east-1#/functions)

![lambda-model-eval](./media/lambda-model-eval.png)


<div id="sm-estimator">
    <h4>Configure the SageMaker Estimator</h4>
</div>

In [None]:
container_image_uri = f'{account_id}.dkr.ecr.us-east-1.amazonaws.com/iris-model:latest'
sources = f's3://sagemaker-us-east-1-{account_id}/sagemaker-custom/code/sourcedir.tar.gz'
entry_point = 'train.py'

print('SAGEMAKER TRAINING JOB CONFIGURATIONS:\n')
print('Container image URI:\n', container_image_uri)
print('\nSource tarball with training script:\n', sources)
print('\nPython source file to be executed as the entry point to training:\n', entry_point)

We create an estimator as in [the previous lab](../1_custom_inference/lab/2_inference-container.ipynb) (in the part `4. Testing the inference locally with our container using the SageMaker Python SDK`).

Now we [set some Regex so that SageMaker can capture the F1 Score metric](https://docs.aws.amazon.com/sagemaker/latest/dg/training-metrics.html#define-train-metrics-sdk) that our training container emits.

> Remember that our training emits a log indicating the F1 score metric in the form `[F1 score] 0.94`, for example. The following Regex will capture the metric `'\[F1 score\] (.*?)$'`

Finally, note that **we didn't set the hyperparameters. We will do that dynamically to trigger the ML pipeline with Step Functions**. We'll pass the configurations to the execution (so that we could trigger the pipeline multiple times and with different hyperparameters if we wanted).

In [None]:
container_image_uri = container_image_uri = '{0}.dkr.ecr.{1}.amazonaws.com/sagemaker-custom-lightgbm:latest'.format(account_id, region)
print(container_image_uri)

In [None]:
estimator = sagemaker.estimator.Estimator(container_image_uri,
                                    sm_arn, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/training_output'.format(bucket, project_name),
                                    enable_sagemaker_metrics=True,
                                    metric_definitions=[
                                        {'Name':'validation:loss', 'Regex': 'multi_logloss: (.*?)$'},
                                        {'Name':'validation:f1', 'Regex':'\[F1 score\] (.*?)$'}
                                    ]                                          
                                     )

<div id="step_functions">
<h2>3. Building our ML pipeline with AWS Step Functions and the Data Science SDK</h2>
</div>


### First of all, what is AWS Step Functions? Why are we using the AWS Step Functions Data Science SDK?

In order to create a ML pipeline we can use [AWS Step Functions](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html) orchestrate the calls to multiple AWS services. [AWS Step Functions natively supports multiple AWS services like Amazon SageMaker, AWS Glue, Amazon EMR, AWS Lambda, and many others.](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-service-integrations.html)

[Amazon States Language](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-amazon-states-language.html), a JSON-based language, to define the state machines. 

A state machine is composed of **states** that can do work (**Task states**), determine which states to transition to next (**Choice states**), stop an execution with an error (**Fail states**), and so on. [More details here.](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-states.html) 

The basic idea is each a state **receives information from the previous state** (or the input that triggers the execution), performs some processing, and **it generates an output**, [appending data to the input, modifying the input data, filtering the data, etc.](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-input-output-filtering.html)

A simple example of a state machine would be:

```
{
  "Comment": "A Hello World example of the Amazon States Language using Pass states",
  "StartAt": "Hello",
  "States": {
    "Hello": {
      "Type": "Pass",
      "Result": "Hello",
      "Next": "World"
    },
    "World": {
      "Type": "Pass",
      "Result": "World",
      "End": true
    }
  }
}
```

**Yielding:**

<img class="center" src="./media/hello-world-state-machine.png" alt="hello-world-state-machine" width=150px>

To make things easier, with the **AWS Data Science Workflows SDK** we can use several abstractions for creating state machines with AWS Step Functions directly. This way, we don't have to write big JSON, nor know specific details about the Amazon State Languages, nor know specific syntax for each service integration (e.g. request syntax for calling the [SageMaker CreateTrainingJob API](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTrainingJob.html))


In this part of the lab you will create the following steps:

* [**ETLStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) - Starts an AWS Glue job to extract the latest data from our source database and prepare our data.
* [**TrainingStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) - Creates the training step and passes the defined estimator.
* [**ModelStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) - Creates a model in SageMaker using the artifacts created during the TrainingStep.
* [**LambdaStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) - Creates the task state step within our workflow that calls a Lambda function.
* [**ChoiceStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Choice) - Creates the choice state step within our workflow.
* [**EndpointConfigStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) - Creates the endpoint config step to define the new configuration for our endpoint.
* [**EndpointStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointStep) - Creates the endpoint step to update our model endpoint.
* [**FailStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) - Creates fail state step within our workflow.

In [None]:
# SageMaker expects unique names for each job, model and endpoint. 
# If these names are not unique the execution will fail.
execution_input = ExecutionInput(schema={
    'GlueJobName': str,
    'TrainingJobName': str,
    'Hyperparameters': {'sagemaker_program': str,
                        'sagemaker_submit_directory': str,
                        'num_leaves': str,
                        'max_depth': str,
                        'learning_rate': str,
                        'random_state': str},
    'ModelName': str,
    'EndpointName': str,
    'LambdaFunctionName': str,
    'UpdateEndpoint': bool
})


#### Create an ETL step with AWS Glue
In the following cell, we create a Glue step thats runs an AWS Glue job. The Glue job extracts the latest data from our source database, removes unnecessary columns, splits the data in to training and validation sets, and saves the data to CSV format in S3. Glue is performing this extraction, transformation, and load (ETL) in a serverless fashion, so there are no compute resources to configure and manage. See the [GlueStartJobRunStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) Compute step in the AWS Step Functions Data Science SDK documentation.

In [None]:
print('ETL CONFIGURATION:')
print('source_data:',source_data)
print('S3 destination URI:','s3a://{}/{}/'.format(bucket, project_name))
print('train_prefix:', train_prefix)
print('test_prefix:', test_prefix)

In [None]:
etl_step = steps.GlueStartJobRunStep(
    'Extract, Transform, Load',
    parameters={"JobName": execution_input['GlueJobName'],
                "Arguments":{
                    '--S3_SOURCE': source_data,
                    '--S3_DEST': 's3a://{}/{}/'.format(bucket, project_name),
                    '--TRAIN_KEY': train_prefix + '/',
                    '--TEST_KEY': test_prefix +'/'}
               }
)

#### Create a SageMaker Training Step 

In the following cell, we create the training step and pass the estimator we defined above. See  [TrainingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
print('TRAINING STEP CONFIGURATIONS:')
print('train_data:',train_data)
print('validation_data:', validation_data)

Here we pass the hyperparameters with the placeholder `Hyperparameters`:

In [None]:
training_step = steps.TrainingStep(
    'Model Training', 
    estimator=estimator,
    hyperparameters=execution_input['Hyperparameters'],
    data={
        'train': s3_input(train_data, content_type='csv'),
        'validation': s3_input(validation_data, content_type='csv')
    },
    job_name=execution_input['TrainingJobName'],
    wait_for_completion=True
)

#### Create a Model Step 

In the following cell, we define a model step that will create a model in Amazon SageMaker using the artifacts created during the TrainingStep. See  [ModelStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) in the AWS Step Functions Data Science SDK documentation to learn more.

The model creation step typically follows the training step. The Step Functions SDK provides the [get_expected_model](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep.get_expected_model) method in the TrainingStep class to provide a reference for the trained model artifacts. Please note that this method is only useful when the ModelStep directly follows the TrainingStep.

In [None]:
model_step = steps.ModelStep(
    'Save Model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName'],
    result_path='$.ModelStepResults'
)

#### Create a Lambda Step
In the following cell, we define a lambda step that will invoke the previously created lambda function as part of our Step Function workflow. See [LambdaStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
lambda_step = steps.compute.LambdaStep(
    'Query Training Results',
    parameters={  
        "FunctionName": execution_input['LambdaFunctionName'],
        'Payload':{
            "TrainingJobName.$": '$.TrainingJobName'
        }
    }
)

#### Create a Choice State Step 
In the following cell, we create a choice step in order to build a dynamic workflow. This choice step branches based off of the results of our SageMaker training step: did the training job fail or should the model be saved and the endpoint be updated? We will add specfic rules to this choice step later on in section 8 of this notebook.

In [None]:
check_accuracy_step = steps.states.Choice(
    'F1 score > 90%'
)

#### Create an Endpoint Configuration Step
In the following cell we create an endpoint configuration step. See [EndpointConfigStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
data_capture_configuration = sagemaker.model_monitor.data_capture_config.DataCaptureConfig(
    enable_capture=True, 
    sampling_percentage=100, 
    destination_s3_uri='s3://{}/{}/endpoint_monitoring/'.format(bucket, project_name), 
    sagemaker_session=session
)

In [None]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    variant_name='lgbmVariant',
    instance_type='ml.m4.xlarge',
    data_capture_config=data_capture_configuration
)

#### Update the Model Endpoint Step
In the following cell, we create the Endpoint step to deploy the new model as a managed API endpoint, updating an existing SageMaker endpoint if our choice state is sucessful.

In [None]:
endpoint_step = steps.EndpointStep(
    'Update Model Endpoint',
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName'],
#     update=execution_input['UpdateEndpoint']
)

#### Create the Fail State Step
In addition, we create a Fail step which proceeds from our choice state if the validation accuracy of our model is lower than the threshold we define. See [FailStateStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
fail_step = steps.states.Fail(
    'Model F1 score Too Low',
    comment='Validation F1 score lower than threshold'
)

#### Add Rules to Choice State
In the cells below, we add a threshold rule to our choice state. Therefore, if the **F1 score** of our model is below 0.90, we move to the Fail State. If the validation F1 score of our model is above 0.90, we move to the save model step with proceeding endpoint update. 

In [None]:
threshold_rule = steps.choice_rule.ChoiceRule.NumericGreaterThan(variable=lambda_step.output()['Payload']['trainingMetrics'][0]['Value'], value=.9)

check_accuracy_step.add_choice(rule=threshold_rule, next_step=endpoint_config_step)
check_accuracy_step.default_choice(next_step=fail_step)


#### Link all the Steps Together
Finally, create your workflow definition by chaining all of the steps together that we've created. See [Chain](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.states.Chain) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
endpoint_config_step.next(endpoint_step)

In [None]:
workflow_definition = steps.Chain([
    etl_step,
    training_step,
    model_step,
    lambda_step,
    check_accuracy_step
])

<div id="running_the_workflow">
<h2>4. Creating, running and testing the ML Workflow</h2>
</div>

Create your workflow using the workflow definition above, and render the graph with [render_graph(...) method](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):

In [None]:
pipeline_name = f'Iris-ML-Pipeline-{id}'

In [None]:
workflow = Workflow(
    name=pipeline_name,
    definition=workflow_definition,
    role=step_functions_role_arn,
    execution_input=execution_input
)

# For using existing workflow
# workflow = Workflow.list_workflows(); workflow
# workflow = Workflow.attach(workflow[0]['stateMachineArn']); workflow

In [None]:
# # Update existing Step Functions State Machine
# workflow.update(definition=workflow_definition,
#                role=step_functions_role_arn)
#
# # View new definition
# workflow.definition.to_dict()

In [None]:
workflow.render_graph(portrait=True)

#### Export to CloudFormation (if desired)
It is possible to simply export the State Machine above in a CloudFormation template, enabling teams to easily re-use and share pipelines (it's possible to create and publish your own products in [AWS Service Catalog](https://docs.aws.amazon.com/servicecatalog/latest/adminguide/introduction.html) so that each team can easily use it):

In [None]:
print(workflow.get_cloudformation_template())

#### View JSON of State Machine definition

To see the AWS Step Functions state machine definition that the AWS Step Functions Data Science SDK created for us just use the `to_dict()` method (we can see how the SDK have made things easier for us and we didn't have to write the JSON):

In [None]:
workflow.definition.to_dict()

#### Create the workflow
More details in documentation of AWS Step Functions: [create() method](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create)

In [None]:
workflow.create()

Let's see the state machines in the current region in this AWS account:

In [None]:
Workflow.list_workflows(html=True)

### Let's trigger the ML Pipeline

In [None]:
import json

training_job_name = f'iris-pipeline-{id}'
model_name = f'Iris-{id}'
endpoint_name = 'IrisMLPipeline'
update_endpoint = False
sources = f's3://sagemaker-us-east-1-{account_id}/sagemaker-custom/code/sourcedir.tar.gz',
hyperparameters = {
    "sagemaker_program": 'train.py',
    "sagemaker_submit_directory": f's3://sagemaker-us-east-1-{account_id}/sagemaker-custom/code/sourcedir.tar.gz',
    "num_leaves": '40',
    "max_depth": '10',
    "learning_rate": '0.11',
    "random_state": '42'}

print('INPUT CONFIGURATIONS:\n')
print('GlueJobName:\n', glue_job_name)
print('\nHyperparameters\n', hyperparameters)
print('\nTrainingJobName (each Sagemaker Job requires a unique number):\n',training_job_name)
print('\nModelName (each SageMaker Model requires a unique name,):\n', model_name)
print('\nEndpointName (each Endpoint requires a unique name):\n', endpoint_name)
print('\nLambdaFunctionName:\n', endpoint_name)
print('\nUpdateEndpoint:', update_endpoint)


In [None]:
execution = workflow.execute(
    inputs={
        'GlueJobName': glue_job_name,
        'TrainingJobName': training_job_name,
        'Hyperparameters':  hyperparameters,
        'ModelName': model_name,
        'EndpointName': endpoint_name,
        'LambdaFunctionName': function_name,
        'UpdateEndpoint': update_endpoint
    }
)



Render workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress). This generates a snapshot of the current state of your workflow as it executes. This is a static image therefore you must run the cell again to check progress:

In [None]:
# If don't want to wait for the whole pipeline to finish, set to False
wait = True

In [None]:
# Just check current status if we don't want to check all flow
if not wait:
    execution.render_progress(portrait=True)

Running the pipeline for the first time can take a few minutes because we are training and creating a SageMaker endpoint (spinning up instances). After that, updating an existing endpoint with newer models should be faster.

In [None]:
status = execution.describe().get('status')
while status == 'RUNNING' and wait:
    status = execution.describe().get('status')
    display.clear_output(wait=True)
    time.sleep(10)
    display.display(execution.render_progress(portrait=True))
    
time.sleep(10)

When we get to the `Model Training` state, go to the Amazon SageMaker console and see if everything went OK.

**1. Look for the most recent training job:**

[Click here](https://console.aws.amazon.com/sagemaker/home?region=us-east-1#/jobs)

![sm-pipe-training](./media/sm-pipe-training.png)

**2. Scrolling down we can see that our metrics (regex) and hyperparameters were correctly set:**

![sm-pipe-training-configs](./media/sm-pipe-training-configs.png)

**3. Scrolling down again, click on the `View logs` and `View algorithm metrics` links:**

![sm-pipe-training-monitor](./media/sm-pipe-training-monitor.png)

**4. We see in CloudWatch our `Log stream` for the training. Click on it:**

![sm-cw-logs](./media/sm-cw-logs.png)

**5. We see the logs emited to the stdout of our training container. Go to the end of the logs.**

We see the log `[F1 score] 0.9565217391304348`. This log should be captured in the metrics and SageMaker will associate it to our training.

![sm-cw-logs-f1](./media/sm-cw-logs-f1.png)

**6. We'll check if the F1 score metric was captured. Go to the other tab you opened in step 3. when you clicked on `View algorithm metrics`.**

Select the both metrics captured by SageMaker with the <a href="#sm-estimator">2 regular expressions that you configured in the estimator in the part 2 of this lab:</a>

![sm-cw-metrics](./media/sm-cw-metrics.png)


We see that SageMaker captured the final validation F1 score of 0.96 and validation loss of 0.08:

![sm-cw-metrics-zoom](./media/sm-cw-metrics-zoom.png)

Since SageMaker captured the F1 score, the Lambda function that evaluates the model in the state `Query Training Results` was able to get it.

In our case, the validation F1 score was over 0.96. Therefore, Step Functions will go to the [Choice state](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-choice-state.html) and approve the deployment automatically.

We should see all states have passed and are green:

In [None]:
display.display(execution.render_progress(portrait=True))

Use [list_executions](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.list_executions) to list all executions for a specific workflow:

In [None]:
workflow.list_executions(html=True)

Use [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution:

In [None]:
# execution.list_events() #html=True

Let's wait until our endpoint in service:

In [None]:
import boto3
sm_client = boto3.client('sagemaker')

endpoint_status = 'Creating'
while endpoint_status!='InService':
    endpoint_status = sm_client.describe_endpoint(EndpointName='IrisMLPipeline')['EndpointStatus']
    print(endpoint_status)
    time.sleep(10)

### Let's generate some artificial traffic to our endpoint

In [None]:
runtime_client = boto3.client('runtime.sagemaker')

with open('../0_custom_train/lab/data/test_no_label/iris_test_no_label.csv', 'r') as f:
    for row in f:

        payload = row.rstrip('\n')
        response = runtime_client.invoke_endpoint(EndpointName=endpoint_name,
                                      ContentType='text/csv', 
                                      Accept='text/csv',
                                      Body=payload)
        prediction = float(response['Body'].read().decode('utf-8').strip())
        print('input:', row)
        print('prediction:', prediction, '\n')
        time.sleep(1)

endpoint_name = 'IrisMLPipeline'

In [None]:
from IPython.display import display
from stress import stress_button

display(stress_button)

### See traffic metrics and data captured

**Select your `IrisMLPipeline` endpoint in SageMaker:**

![sm-endpoint.png](./media/sm-endpoint.png)

**Click on the S3 bucket where the data captured from requests and predictions are stored:**

![sm-endpoint-capture](./media/sm-endpoint-capture.png)

**We should see objects being saved in the S3 bucket:**
![sm-capture-s3](./media/sm-capture-s3.png)

**We see that SageMaker has partitioned the bucket by endpoint (`IrisMLPipeline`), variant (`lgbVariant`), year, month, day hour:**

![sm-capture-s3-dirs](./media/sm-capture-s3-dirs.png)

**In your `IrisMLPipeline` endpoint in SageMaker, click on the `View invocation metrics` link. We will be redirected to the CloudWatch console:**
![sm_ep_metrics](./media/sm_ep_metrics.png)

**For CloudWatch, <a href="https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#metricsV2:graph=~(metrics~(~(~'AWS*2fSageMaker~'Invocations~'EndpointName~'IrisMLPipeline~'VariantName~'lgbmVariant)~(~'.~'ModelLatency~'.~'.~'.~'.~(yAxis~'right~stat~'Average)))~view~'timeSeries~stacked~false~region~'us-east-1~stat~'Sum~period~60~start~'-PT5M~end~'P0D);query=~'*7bAWS*2fSageMaker*2cEndpointName*2cVariantName*7d*20IrisMLPipeline">just click here.</a>**

OR

**Configure the Dashboard by selecting the metric `Invocations` and in the `Graphics metrics` choose Invocations and `Sum` in `Statistic`. Choose `Period` equals to 1 Minute and modify graphics in the top to show the window of 5 minutes.**

![sm_cw_metrics](./media/sm_cw_metrics.png)

# Congratulations!! You have finished the workshop!
