## Introduction

This notebook describes how to use the AWS Step Functions Data Science SDK to create a machine learning model retraining workflow. The Step Functions SDK is an open source library that allows data scientists to easily create and execute machine learning workflows using AWS Step Functions and Amazon SageMaker. For more information, please see the following resources:
* [AWS Step Functions](https://aws.amazon.com/step-functions/)
* [AWS Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io)

In this notebook, we will use the SDK to create steps that capture data, encorporate this data into the training of a machine learning model, deploy the model to a SageMaker endpoint, link these steps together to create a workflow, and then execute the workflow in AWS Step Functions.

In [None]:
!pip install stepfunctions

In [6]:
import uuid
import logging
import stepfunctions
import boto3
import sagemaker

from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker import s3_input
from sagemaker.s3 import S3Uploader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

session = sagemaker.Session()

region = boto3.Session().region_name
bucket = session.default_bucket()
id = uuid.uuid4().hex

In [17]:
from sagemaker import RandomCutForest
import sagemaker

role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
prefix='taxidata'

# specify general training job information
rcf = RandomCutForest(role=role,
                      train_instance_count=1,
                      train_use_spot_instances=True,
                      train_max_wait=60*5,
                      train_max_run=60*5,
                      train_instance_type='ml.m4.xlarge',
                      data_location='s3://{}/{}/'.format(bucket, prefix),
                      output_path='s3://{}/{}/output'.format(bucket, prefix),
                      num_samples_per_tree=512,
                      num_trees=50)

In [18]:
import pandas as pd

# SageMaker expects unique names for each job, model and endpoint. 
# If these names are not unique the execution will fail.
execution_input = ExecutionInput(schema={
    'TrainingJobName': str,
    'ModelName': str,
    'EndpointName': str,
})

taxi_data = pd.read_csv('nyc_taxi_values.csv', delimiter=',')

training_step = steps.TrainingStep(
    'Model Training', 
    estimator=rcf,
    data=rcf.record_set(taxi_data.to_numpy().reshape(-1,1)),
    job_name=execution_input['TrainingJobName'],
    wait_for_completion=True
)

In [20]:
model_step = steps.ModelStep(
    'Save Model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName'],
    result_path='$.ModelStepResults'
)

In [21]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m4.xlarge'
)

In [22]:
endpoint_step = steps.EndpointStep(
    'Update Model Endpoint',
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName'],
    update=False
)

In [23]:
workflow_definition = steps.Chain([
    training_step,
    model_step,
    endpoint_config_step,
    endpoint_step
])

In [25]:
workflow = Workflow(
    name='MyMLWorkflow_{}'.format(id),
    definition=workflow_definition,
    role=role,
    execution_input=execution_input
)

In [26]:
workflow.render_graph()

In [31]:
workflow.create()

'arn:aws:states:eu-central-1:559317267498:stateMachine:MyMLWorkflow_4ace0a82e7b44de0befc81944f449a28'

In [32]:
execution = workflow.execute(
    inputs={
        'TrainingJobName': 'stepfunctions-train-{}'.format(id), # Each Sagemaker Job requires a unique name,
        'ModelName': 'stepfunctions-model-{}'.format(id), # Each Model requires a unique name,
        'EndpointName': 'stepfunctions-endpoint' # Each Endpoint requires a unique name
    }
)

In [33]:
execution.render_progress()

In [41]:
execution.list_events(max_items=1, reverse_order=False, html=False)

[{'timestamp': datetime.datetime(2020, 10, 8, 8, 22, 39, 165000, tzinfo=tzlocal()),
  'type': 'ExecutionStarted',
  'id': 1,
  'previousEventId': 0,
  'executionStartedEventDetails': {'input': '{\n    "TrainingJobName": "stepfunctions-train-4ace0a82e7b44de0befc81944f449a28",\n    "ModelName": "stepfunctions-model-4ace0a82e7b44de0befc81944f449a28",\n    "EndpointName": "stepfunctions-endpoint"\n}',
   'inputDetails': {'truncated': False},
   'roleArn': 'arn:aws:iam::559317267498:role/service-role/AmazonSageMaker-ExecutionRole-20200115T112053'}}]

In [42]:
workflow.list_executions(html=True)

Name,Status,Started,End Time
ec52a66f-e17d-49a4-971d-9857ef47fc3b,SUCCEEDED,"Oct 08, 2020 08:22:39.165 AM","Oct 08, 2020 08:26:27.861 AM"


In [43]:
Workflow.list_workflows(html=True)

Name,Creation Date
MyMLWorkflow_4ace0a82e7b44de0befc81944f449a28,"Oct 08, 2020 08:20:34.973 AM"
