## Automate Model Retraining & Deployment Using the AWS Step Functions Data Science SDK

**This sample is provided for demonstration purposes, make sure to conduct appropriate testing if derivating this code for your own use-cases!**

This notebook describes how to use the AWS Step Functions Data Science SDK to create a machine learning model retraining workflow. The Step Functions SDK is an open source library that allows data scientists to easily create and execute machine learning workflows using AWS Step Functions and Amazon SageMaker. For more information, please see the following resources:
* [AWS Step Functions](https://aws.amazon.com/step-functions/)
* [AWS Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io)


### Step 0: Get Admin Setup Results
Bucket names, codecommit repo, docker image, IAM roles, ...

In order to keep things orginized, we will save our `Source Code` (data processing, model training/serving scripts), `datasets`, as well as our trained `model(s) binaries` and their `test-performance metrics` all on S3, **versioned with respect to the date/time of each update.**

In [None]:
# Upgrade the stepfunctions library
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

In [None]:
import json
import boto3
import logging
import stepfunctions
from stepfunctions import steps
from time import gmtime, strftime
from stepfunctions.steps.choice_rule import ChoiceRule
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
stepfunctions.set_stream_logger(level=logging.INFO)
session = boto3.session.Session()


# Set project bucket, IAM Roles and Docker Image for Training
with open('admin_setup.txt', 'r') as filehandle:
    admin_setup = json.load(filehandle)

SOURCE_DATA = admin_setup["raw_data_path"]
BUCKET = admin_setup["project_bucket"]
REGION = session.region_name

TRAINING_IMAGE = admin_setup["docker_image"]
WORKFLOW_EXECUTION_ROLE = admin_setup["workflow_execution_role"]

REPO = admin_setup["repo_name"]
BRANCH = "master"

WORKFLOW_DATE_TIME = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
WORKFLOW_NAME = admin_setup["repo_name"]

### Define Wrokflow Schema

In [None]:
my_workflow_input_schema = {
    #ADMIN
    "REGION":str,
    "ROLE_ARN":str,
    "BUCKET":str,
    "WORKFLOW_NAME":str,
    "WORKFLOW_DATE_TIME":str,
    "DATA_SOURCE":str,
    
    # CodeCommit
    "REPO":str,
    "BRANCH":str,
    "DATA_PROCESSING_DIR":str,
    "ML_DIR":str,
    
    # SM Processing
    "PROCESSING_SCRIPT":str,
    "PROCESSING_IMAGE":str,
    "PROCESSING_INSTANCE_TYPE":str,
    "PROCESSING_INSTANCE_COUNT":int,
    "PROCESSING_VOLUME_SIZE_GB":int,
    
    # SM TRAINING
    "TRAINING_SCRIPT":str,
    "TRAINING_IMAGE":str,
    "TRAINING_INSTANCE_TYPE":str,
    "TRAINING_INSTANCE_COUNT":int,
    "TRAINING_VOLUME_SIZE_GB":int,
    
    # SM SERVING
    "SERVING_SCRIPT":str,
    "SERVING_IMAGE":str,
    "SERVING_INSTANCE_TYPE":str,
    "SERVING_INSTANCE_COUNT":int,
    "SERVING_VOLUME_SIZE_GB":int,
}
my_execution_input = ExecutionInput(schema=my_workflow_input_schema)

In [None]:
# StepN: Create Fail State
fail_step = steps.states.Fail(
    'Workflow Failed',
    comment='Either Validation accuracy is lower than threshold or one of processing, training, deployment jobs has faild.'
)

# Step1: Copy source code from CodeCommit to S3
codecommit_to_s3_step = steps.compute.LambdaStep(
    state_id = 'Put SourceCode on S3',
    parameters={ 
        "FunctionName": WORKFLOW_NAME + '-codecommit-to-s3',
        'Payload':{
            "REGION": my_execution_input["REGION"],
            "BUCKET": my_execution_input["BUCKET"],
            "WORKFLOW_DATE_TIME": my_execution_input["WORKFLOW_DATE_TIME"],
            "REPO": my_execution_input["REPO"],
            "BRANCH": my_execution_input["BRANCH"],
            "ML_DIR": my_execution_input["ML_DIR"],
            "DATA_PROCESSING_DIR": my_execution_input["DATA_PROCESSING_DIR"]
        }
    }
)

# Step2: Run SageMaker Data Processing Job
data_processing_step = steps.compute.LambdaStep(
    state_id = 'Run SageMaker Processing',
    parameters={  
        "FunctionName": WORKFLOW_NAME + '-create-sagemaker-prcoessing-job',
        'Payload':{
            "DATA_SOURCE":SOURCE_DATA,
            "BUCKET": my_execution_input["BUCKET"],
            "WORKFLOW_NAME": my_execution_input["WORKFLOW_NAME"],
            "WORKFLOW_DATE_TIME": my_execution_input["WORKFLOW_DATE_TIME"],
            "PROCESSING_INSTANCE_TYPE": my_execution_input["PROCESSING_INSTANCE_TYPE"],
            "PROCESSING_INSTANCE_COUNT": my_execution_input["PROCESSING_INSTANCE_COUNT"],
            "PROCESSING_VOLUME_SIZE_GB": my_execution_input["PROCESSING_VOLUME_SIZE_GB"],
            "PROCESSING_IMAGE": my_execution_input["PROCESSING_IMAGE"],
            "PROCESSING_SCRIPT": my_execution_input["PROCESSING_SCRIPT"],
            "ROLE_ARN": my_execution_input["ROLE_ARN"]
        }
    }
)

# Step3: Wait a little bit
wait_for_data_processing = steps.states.Wait(
    state_id = "Wait 30 Seconds",
    seconds = 30
)

# Step4: Check if processing job has finished
get_processing_status = steps.compute.LambdaStep(
    state_id = "Get SageMaker Processing Status",
    parameters={  
        "FunctionName": WORKFLOW_NAME + '-query-data-processing-status',
        'Payload':{
            "WORKFLOW_NAME": my_execution_input["WORKFLOW_NAME"],
            "WORKFLOW_DATE_TIME": my_execution_input["WORKFLOW_DATE_TIME"]
        }
    }
)

# Step5: If processing job is not done, go back to waiting (Step3), if done go to Step6, else go to failure
# We will author this step later
# ...

# Step6: Start SageMaker Training Job
model_training_step = steps.compute.LambdaStep(
    'Run Model Training Job',
    parameters={  
        "FunctionName": WORKFLOW_NAME + '-create-sagemaker-training-job',
        'Payload':{
            "BUCKET": my_execution_input["BUCKET"],
            "WORKFLOW_NAME": my_execution_input["WORKFLOW_NAME"],
            "WORKFLOW_DATE_TIME": my_execution_input["WORKFLOW_DATE_TIME"],
            "TRAINING_INSTANCE_TYPE": my_execution_input["TRAINING_INSTANCE_TYPE"],
            "TRAINING_INSTANCE_COUNT": my_execution_input["TRAINING_INSTANCE_COUNT"],
            "TRAINING_VOLUME_SIZE_GB": my_execution_input["TRAINING_VOLUME_SIZE_GB"],
            "TRAINING_IMAGE": my_execution_input["TRAINING_IMAGE"],
            "TRAINING_SCRIPT": my_execution_input["TRAINING_SCRIPT"],
            "ROLE_ARN": my_execution_input["ROLE_ARN"]
        }
    }
)

# Step5: If processing job is not done, go back to waiting (Step3), if done go to Step6, else go to failure
check_pocessing_status = steps.states.Choice(
    state_id = "Processing Job Complete?",
)

processing_job_output = get_processing_status.output()['Payload']['ProcessingJobStatus']

completed_rule = ChoiceRule.StringEquals(variable=processing_job_output, value="Completed")
in_progress_rule = ChoiceRule.StringEquals(variable=processing_job_output, value="InProgress")

check_pocessing_status.add_choice(rule=completed_rule, next_step=model_training_step)
check_pocessing_status.add_choice(rule=in_progress_rule, next_step=wait_for_data_processing)
check_pocessing_status.default_choice(fail_step)



# Step7: Wait a little bit
wait_for_training = steps.states.Wait(
    state_id = "Wait 60 Seconds",
    seconds = 60
)

# Step8: Check if training job has finished
get_training_status = steps.compute.LambdaStep(
    state_id = "Get Training Job Status",
    parameters={  
        "FunctionName": WORKFLOW_NAME + '-query-training-status',
        'Payload':{
            "WORKFLOW_NAME": my_execution_input["WORKFLOW_NAME"],
            "WORKFLOW_DATE_TIME": my_execution_input["WORKFLOW_DATE_TIME"]
        }
    }
)


# Step9: If training job is not done, go back to waiting (Step7), if done go to Step10, else go to failure
# We will author this step later
# ...

# Step10: Get model accuracy (custom print to logs during training)
get_model_accuracy = steps.compute.LambdaStep(
    state_id = "Get Model Median Abs. Err.",
    parameters={  
        "FunctionName": WORKFLOW_NAME + '-query-model-accuracy',
        'Payload':{
            "WORKFLOW_NAME": my_execution_input["WORKFLOW_NAME"],
            "WORKFLOW_DATE_TIME": my_execution_input["WORKFLOW_DATE_TIME"]
        }
    }
)

# Step9: If training job is not done, go back to waiting (Step7), if done go to Step10, else go to failure
check_training_status = steps.states.Choice(
    state_id = "Training Job Complete?",
)

training_job_output = get_training_status.output()['Payload']['TrainingJobStatus']

completed_rule = ChoiceRule.StringEquals(variable=training_job_output, value="Completed")
in_progress_rule = ChoiceRule.StringEquals(variable=training_job_output,value="InProgress")

check_training_status.add_choice(rule=completed_rule, next_step=get_model_accuracy)
check_training_status.add_choice(rule=in_progress_rule, next_step=wait_for_training)
check_training_status.default_choice(fail_step)


# Step11: If model's Median Abs. Err. is less than 2, go back to next step (deployment), else go to failure
# We will author this step later
# ...

# Step12: Create Endpoint (or update it if it exists)
deploy_model_step = steps.compute.LambdaStep(
    'Deploy Model',
    parameters={  
        "FunctionName": WORKFLOW_NAME + '-deploy-sagemaker-model-job',
        'Payload':{
            "REGION": my_execution_input["REGION"],
            "BUCKET": my_execution_input["BUCKET"],
            "WORKFLOW_NAME": my_execution_input["WORKFLOW_NAME"],
            "WORKFLOW_DATE_TIME": my_execution_input["WORKFLOW_DATE_TIME"],
            "SERVING_INSTANCE_TYPE": my_execution_input["SERVING_INSTANCE_TYPE"],
            "SERVING_INSTANCE_COUNT": my_execution_input["SERVING_INSTANCE_COUNT"],
            "SERVING_IMAGE": my_execution_input["SERVING_IMAGE"],
            "SERVING_SCRIPT": my_execution_input["SERVING_SCRIPT"],
            "ROLE_ARN": my_execution_input["ROLE_ARN"]
        }
    }
)


# Step11: If model's Median Abs. Err. is less than 3, go back to next step (deployment), else go to failure
check_accuracy_step = steps.states.Choice(
    'Median-AE < 3'
)
mae = get_model_accuracy.output()['Payload']['trainingMetrics'][0]['Value']
threshold_rule = ChoiceRule.NumericLessThan(variable=mae, value=3)
check_accuracy_step.add_choice(rule=threshold_rule, next_step=deploy_model_step)
check_accuracy_step.default_choice(next_step=fail_step)

### Link all the Steps Together
We create a workflow definition by chaining all of the steps together that we've created. See [Chain](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.states.Chain) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
# Chain Steps 5-16
codecommit_to_s3_step.next(data_processing_step)
data_processing_step.next(wait_for_data_processing)
wait_for_data_processing.next(get_processing_status)
get_processing_status.next(check_pocessing_status)
model_training_step.next(wait_for_training)
wait_for_training.next(get_training_status)
get_training_status.next(check_training_status)
get_model_accuracy.next(check_accuracy_step)

# Chain the whole workflow
workflow_definition = steps.Chain([
    codecommit_to_s3_step
    #wait_for_etl_step,
    #get_etl_status,
    #check_etl_status
])

In [None]:
workflow = Workflow(
    name=WORKFLOW_NAME,
    definition=workflow_definition,
    role=WORKFLOW_EXECUTION_ROLE,
    execution_input=my_execution_input
)

Create your workflow using the workflow definition above, and render the graph with [render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):

In [None]:
workflow.render_graph()

Create the workflow in AWS Step Functions with [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create):

In [None]:
workflow.create()

Run the workflow with [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute):

In [None]:
my_execution_input_values = {
    #ADMIN
    "REGION":REGION,
    "ROLE_ARN":WORKFLOW_EXECUTION_ROLE,
    "BUCKET":BUCKET,
    "WORKFLOW_NAME": WORKFLOW_NAME,
    "WORKFLOW_DATE_TIME":WORKFLOW_DATE_TIME,
    "DATA_SOURCE":SOURCE_DATA,

    # CodeCommit
    "REPO":REPO,
    "BRANCH":BRANCH,
    "DATA_PROCESSING_DIR": "sagemaker-processing-src",
    "ML_DIR": "sagemaker-train-serve-src",
    
    # SM Processing
    "PROCESSING_SCRIPT":"processing.py",
    "PROCESSING_IMAGE":TRAINING_IMAGE,
    "PROCESSING_INSTANCE_TYPE":"ml.c5.xlarge",
    "PROCESSING_INSTANCE_COUNT":1,
    "PROCESSING_VOLUME_SIZE_GB":10,
    
    # SM TRAINING
    "TRAINING_SCRIPT":"train.py",
    "TRAINING_IMAGE":TRAINING_IMAGE,
    "TRAINING_INSTANCE_TYPE":"ml.c5.xlarge",
    "TRAINING_INSTANCE_COUNT":1,
    "TRAINING_VOLUME_SIZE_GB":10,
    
    # SM SERVING
    "SERVING_SCRIPT":"train.py",
    "SERVING_IMAGE":TRAINING_IMAGE,
    "SERVING_INSTANCE_TYPE":"ml.c5.xlarge",
    "SERVING_INSTANCE_COUNT":1,
    "SERVING_VOLUME_SIZE_GB":10,
}

execution = workflow.execute(inputs=my_execution_input_values)

Render workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress). This generates a snapshot of the current state of your workflow as it executes. This is a static image therefore you must run the cell again to check progress:

In [None]:
execution.render_progress()

In [None]:
#execution.list_events(html=True)