# Introduction

Welcome to your Amazon SageMaker notebook instance!  

This is a fully managed AWS environment that provides you a Jupyter Notebook to work with data.  To learn more about Amazon SageMake notebook instances, check out our [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/nbi.html).

## Summary

We're looking to build our machine learning workflow that will retrain our model when new data sets are added.  Instead of executing python code inside of a Notebook, we'd like to execute these steps in a state machine. We will leverage the [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io/) to create workflow. The steps below are still performed:

1. Setup serverless querying of data in S3 via [Amazon Athena](https://aws.amazon.com/athena/).
2. Prepare dataframes using [pandas](https://pandas.pydata.org/) and [numpy](https://numpy.org).
3. Build and train a machine learning model via the [Amazon SageMaker Python SDK](https://docs.aws.amazon.com/sagemaker/latest/dg/frameworks.html).

But instead across these states:

1. Prep data in Athena (Lambda function)
2. Split data into training and testing sets (Lambda function)
3. Model training (Step functions data science SDK)
4. Save the model (Step functions data science SDK)
5. Check model accuracy (Lambda function)
6. Test: Accuracy above our threshold?
7. Yes? Publish
8. No? Do nothing

To get started, let's input the name of the S3 bucket you created earlier in this workshop:

In [None]:
# ACTION: provide the data bucket NAME you are using for this workshop
data_bucket = '' 

## Setup

First, we'll need to install and load all the required modules. Then we'll create an IAM role for the Step Functions resources that we will create.

In [None]:
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

### Import libraries and dependencies

In [None]:
import uuid
import logging
import stepfunctions
import boto3
import sagemaker
import time
import json

from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.s3 import S3Uploader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

region = boto3.Session().region_name
bucket = session.default_bucket()
id = uuid.uuid4().hex

### Add permissions to your notebook role in IAM

The IAM role assumed by your notebook requires permission to create and run workflows in AWS Step Functions. If this notebook is running on a SageMaker notebook instance, do the following to provide IAM permissions to the notebook:

1. Open the Amazon  [SageMaker console](https://console.aws.amazon.com/sagemaker/).
1. Select **Notebook instances** and choose the name of your notebook instance.
1. Under **Permissions and encryption** select the role ARN to view the role on the IAM console.
1. Copy and save the IAM role ARN for later use.
1. Choose **Attach policies** and search for `AWSStepFunctionsFullAccess`.
1. Select the check box next to `AWSStepFunctionsFullAccess` and choose **Attach policy**.

Next, let's create an execution role in IAM for Step Functions.

### Create an Execution Role for Step Functions

Your Step Functions workflow requires an IAM role to interact with other services in your AWS environment.

1. Go to the [IAM console](https://console.aws.amazon.com/iam/).
1. Select **Roles** and then **Create role**.
1. Under **Choose the service that will use this role** select **Step Functions**.
1. Choose **Next** until you can enter a **Role name**.
1. Enter a name such as `StepFunctionsWorkflowExecutionRole` and then select **Create role**.

Next, create and attach a policy to the role you created. As a best practice, the following steps will attach a policy that only provides access to the specific resources and actions needed for this solution.

1. Under the **Permissions** tab, click **Attach policies** and then **Create policy**.
1. Enter the following in the **JSON** tab:
    ```json
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": "iam:PassRole",
                "Resource": "NOTEBOOK_ROLE_ARN",
                "Condition": {
                    "StringEquals": {
                        "iam:PassedToService": "sagemaker.amazonaws.com"
                    }
                }
            },
            {
                "Effect": "Allow",
                "Action": [
                    "sagemaker:CreateModel",
                    "sagemaker:DeleteEndpointConfig",
                    "sagemaker:DescribeTrainingJob",
                    "sagemaker:CreateEndpoint",
                    "sagemaker:StopTrainingJob",
                    "sagemaker:CreateTrainingJob",
                    "sagemaker:UpdateEndpoint",
                    "sagemaker:CreateEndpointConfig",
                    "sagemaker:DeleteEndpoint"
                ],
                "Resource": [
                    "arn:aws:sagemaker:*:*:*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "events:DescribeRule",
                    "events:PutRule",
                    "events:PutTargets"
                ],
                "Resource": [
                    "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "lambda:InvokeFunction"
                ],
                "Resource": [
                    "arn:aws:lambda:*:*:function:*QueryTraining*"
                ]
            }
        ]
    }
    ```
1. Replace **NOTEBOOK_ROLE_ARN** with the ARN for your notebook that you created in the previous step.
1. Choose **Review policy** and give the policy a name such as `StepFunctionsWorkflowExecutionPolicy`.
1. Choose **Create policy**.
1. Select **Roles** and search for your `StepFunctionsWorkflowExecutionRole` role.
1. Under the **Permissions** tab, click **Attach policies**.
1. Search for your newly created `StepFunctionsWorkflowExecutionPolicy` policy and select the check box next to it.
1. Choose **Attach policy**. You will then be redirected to the details page for the role.
1. Copy the `StepFunctionsWorkflowExecutionRole` **Role ARN** at the top of the Summary.

### Build the permissions that the Step Function Workflow will use

In [None]:
# paste the StepFunctionsWorkflowExecutionRole ARN from above
workflow_execution_role = ''

# SageMaker Execution Role
# You can use sagemaker.get_execution_role() if running inside sagemaker's notebook instance
sagemaker_execution_role = sagemaker.get_execution_role()

Let's retrieve the specific S3 bucket name that we will grant permissions to.

In [None]:
session = sagemaker.Session()
bucket = session.default_bucket()
print(bucket)

### Setup the locations where the training data will be placed

In [None]:
train_prefix = 'train'
test_prefix = 'test'
key = 'recordio-pb-data'

train_data = 's3://{}/{}/'.format(data_bucket, train_prefix)
test_data = 's3://{}/{}/'.format(data_bucket, test_prefix)

### Define the API for your Step Function Workflow
This is the API that controls the inputs of your State Machine.

* **TrainingJobName** = The name of your SageMaker Training Job
* **LambdaDataPrep** = The name of the Lambda function responsible for data prep
* **LambdaDataSplit** = The name of the Lambda function responsible for splitting the data
* **LambdaDeployModel** = The name of the Lambda function responsible for deploying the model
* **ModelName** = The name of the model we'll be training
* **EndpointName** = The name of the Endpoint (unused)
* **LambdaQueryStatus** = The name of the Lambda function responsible for querying the model accuracy
* **data_bucket** = The name of the S3 bucket where all of the data for the workflow will be persisted

In [None]:
# SageMaker expects unique names for each job, model and endpoint. 
# If these names are not unique the execution will fail.
execution_input = ExecutionInput(schema={
    'TrainingJobName': str,
    'LambdaDataPrep': str,
    'LambdaDataSplit': str,
    'LambdaDeployModel': str,
    'ModelName': str,
    'EndpointName': str,
    'LambdaQueryStatus': str,
    'data_bucket': str
})

### Create the SageMaker Estimator for training our model

In [None]:
import matplotlib.pyplot as plt

import sagemaker
from sagemaker import get_execution_role
from sagemaker.predictor import csv_serializer, json_deserializer
from sagemaker.amazon.amazon_estimator import get_image_uri


def estimator_from_hyperparams(s3_train_data, hyperparams, output_path, s3_test_data=None):
    """
    Create an Estimator from the given hyperparams, fit to training data, 
    and return a deployed predictor
    
    """
    # set up the estimator
    linear = sagemaker.estimator.Estimator(get_image_uri(boto3.Session().region_name, "linear-learner"),
        get_execution_role(),
        train_instance_count=1,
        train_instance_type='ml.m5.2xlarge',
        output_path=output_path,
        sagemaker_session=sagemaker.Session())
    linear.set_hyperparameters(**hyperparams)
    return linear

In [None]:
import math
# TODO: Update hard coded feature_dim and mini_batch_size params to be dynamic again (refer to old code)
hyperparams = {
    'feature_dim': 6,
    'mini_batch_size': 17995,
    'predictor_type': 'binary_classifier' 
}

output_path = 's3://' + data_bucket
linear_estimator = estimator_from_hyperparams(train_data, hyperparams, output_path, 
                                                   s3_test_data=test_data)

### Define the Step Function Training Step

The next thing we'll do is define the training step and pass the estimator we defined above. See [TrainingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) in the documentation to learn more.

In [None]:
s3_train_data = 's3://{}/{}/{}'.format(data_bucket, train_prefix, key)
s3_test_data = 's3://{}/{}/{}'.format(data_bucket, test_prefix, key)

training_step = steps.TrainingStep(
    'Model Training', 
    estimator=linear_estimator,
    data={
        'train': s3_train_data,
        'test': s3_test_data
    },
    job_name=execution_input['TrainingJobName'],
    wait_for_completion=True
)

### Define the Step Function Model Save Step

Let's define a model step that will create a model in SageMaker using the artifacts created during the TrainingStep. See [ModelStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) in the  documentation to learn more.

In [None]:
model_step = steps.ModelStep(
    'Save Model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName'],
    result_path='$.ModelStepResults'
)

### Define the Lambda Step Function Steps

Let's define a lambda step for each one of the lambda functions that we will invoke as part of our Step Functions workflow. See [LambdaStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) in the documentation to learn more.

In [None]:
# query lambda
lambda_step_query = steps.compute.LambdaStep(
    'Query Training Results',
    parameters={  
        "FunctionName": execution_input['LambdaQueryStatus'],
        'Payload':{
            "TrainingJobName.$": '$.TrainingJobName'
        }
    }
)

# data prep lambda
lambda_step_prep = steps.compute.LambdaStep(
    'Prep Data in Athena',
    parameters={  
        "FunctionName": execution_input['LambdaDataPrep'],
        'Payload':{
            "data_bucket": execution_input['data_bucket']
        }
    }
)

# data split lambda
lambda_step_split = steps.compute.LambdaStep(
    'Split Data',
    parameters={  
        "FunctionName": execution_input['LambdaDataSplit'],
        'Payload': lambda_step_prep.output()
    }
)

### Define a Choice State Step

In order to build a dynamic workflow, we need to create a choice step. This choice step branches based off of the results of our training step: did the training job fail or should the model be deployed?

In [None]:
check_accuracy_step = steps.states.Choice(
    'Accuracy > 90%'
)

### Define the Lambda Step Function Steps

Let's define a lambda step for the lambda function that we will invoke if the accuracy of our model is higher than the defined threshold.

In [None]:
# moves the trained model to the "deploy" area for lambda to pick up
lambda_model_deploy = steps.compute.LambdaStep(
    'Deploy Model',
    parameters={  
        "FunctionName": execution_input['LambdaDeployModel'],
        'Payload':{
            "data_bucket": execution_input['data_bucket'],
            "model_location": execution_input['TrainingJobName']
        }
    }
)

### Define the Fail State Step

Let's define a Fail step which proceeds from our choice state if the accuracy of our model is lower than the defined threshold. See [FailStateStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the documentation to learn more.

In [None]:
fail_step = steps.states.Fail(
    'Model Accuracy Too Low',
    comment='Validation accuracy lower than threshold'
)

end_step = steps.states.Pass('End')

### Add Rules to Choice State

The next thing we'll do is add a threshold rule to our choice state. If the validation accuracy of our model is below 0.90, we move to the fail state step. If the validation accuracy of our model is above 0.90, we move to the lambda model deployment step.

To achieve an accuracy of 90%, we need error <.10.

In [None]:
threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(variable=lambda_step_query.output()['Payload']['trainingMetrics'][4]['Value'], value=.90)

check_accuracy_step.add_choice(rule=threshold_rule, next_step=lambda_model_deploy)
check_accuracy_step.default_choice(next_step=fail_step)

### Link all the Steps Together

Finally, let's create a workflow definition by chaining all of the steps together that we've created. See [Chain](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.states.Chain) in the documentation to learn more.

In [None]:
workflow_definition = steps.Chain([
    lambda_step_prep,
    lambda_step_split,
    training_step,
    model_step,
    lambda_step_query,
    check_accuracy_step
])

# Run the workflow

Now that we have the workflow definition, let's create the workflow and render the graph with render_graph:

In [None]:
workflow_name = 'MLOpsRetrainingWorkflow_{}'.format(id)
workflow = Workflow(
    name=workflow_name,
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input
)

In [None]:
workflow.render_graph()

### Create the workflow in AWS Step Functions with `create`

In [None]:
state_machines = Workflow.list_workflows()
workflow_exist = False
existing_workflow_obj = {}
for machine in state_machines:
    if machine["name"] == workflow_name:
        workflow_exist = True
        existing_workflow_obj = machine

In [None]:

if workflow_exist:
    workflow = Workflow.attach(existing_workflow_obj["stateMachineArn"])
    print(workflow_name + " already exist.  Updating the workflow definition.")
    workflow.update(definition=workflow_definition,role=workflow_execution_role)
    # small delay for step functions to pick up the new definitions before execution
    time.sleep(2)
else:
    print(workflow_name + " does not exist.  Creating it with the specified workflow definition.")
    workflow.create()
    

### Run the workflow with `execute`

In [None]:

execution = workflow.execute(
    inputs={
        'TrainingJobName': 'linear-learner-{}'.format(id), # each Sagemaker Job requires a unique name,
        'ModelName': 'UnicornWeatherImpact-{}'.format(id), # each Model requires a unique name,
        'EndpointName': 'UnicornStatus', # each Endpoint requires a unique name
        'LambdaQueryStatus': '',
        'LambdaDataPrep': '',
        'LambdaDataSplit': '',
        'LambdaDeployModel': '',
        'data_bucket': data_bucket
    }
)


### Render the workflow progress with the `render_progress`

This generates a snapshot of the current state of your workflow as it executes. This is a static image therefore you must run the cell again to check progress:

In [None]:
execution.render_progress()