In [None]:
!pip install stepfunctions

* Step Functions workflow 생성을 위한 role 생성 필요

In [None]:
import sagemaker
from sagemaker import get_execution_role
import stepfunctions
from stepfunctions.steps import *
from stepfunctions.workflow import Workflow

sm_role = get_execution_role()
wf_role = 'your-StepFunctionsWorkflowExecutionRole-arn'
input_uri = 's3://your-bucket/your-prefix/raw_data.csv'
dataset_path = 's3://your-bucket3/your-prefix'
dataset_uri = dataset_path + '/dataset.pkl.gz'
bucket = 'your-bucket'

In [None]:
from sagemaker.tensorflow import TensorFlow

sf_estimator = TensorFlow(entry_point='../01_Experiments/mnist_simple_nn.py',
                          role=sm_role,
                          train_instance_count=1,
                          train_instance_type='ml.m5.xlarge',
                          framework_version='2.1.0',
                          py_version='py3')

## Simple pipeline

In [None]:
from stepfunctions.template.pipeline import TrainingPipeline

pipeline = TrainingPipeline(estimator=sf_estimator,
                            role=wf_role,
                            inputs=dataset_uri,
                            s3_bucket=bucket)

In [None]:
pipeline.render_graph()

In [None]:
pipeline.create()

In [None]:
execution = pipeline.execute()

In [None]:
execution.render_progress()

# Making each step

## Execution input define

In [None]:
from stepfunctions.inputs import ExecutionInput

execution_input = ExecutionInput(schema={
    'InputPath': str,
    'OutputPath': str,
    'ExecutionName': str
})

## From training to deploying

In [None]:
common_name = execution_input["ExecutionName"]

training_step = sagemaker.TrainingStep(
    state_id="Training",
    estimator=sf_estimator,
    job_name=common_name,
    data=dataset_uri
)
model_step = sagemaker.ModelStep(
    state_id="CreateModel",
    model=training_step.get_expected_model(),
    model_name=common_name,
    instance_type=training_step.estimator.train_instance_type
)

endpoint_conf_step = sagemaker.EndpointConfigStep(
    state_id='EndpointConfigure',
    endpoint_config_name=common_name,
    model_name=common_name,
    initial_instance_count=1,
    instance_type='ml.m4.xlarge'
)

endpoint_step = sagemaker.EndpointStep(
    state_id='Deploy',
    endpoint_name=common_name,
    endpoint_config_name=common_name
)

In [None]:
ml_steps_def = Chain([
    training_step,
    model_step,
    endpoint_conf_step,
    endpoint_step
])

## Preprocessing step using lambdas

In [None]:
create_processing_step = LambdaStep(
    state_id="StartPreprocessing",
    parameters={  
        "FunctionName": "your-processing-fn-arn",
        "Payload": {
            "InputPath": execution_input['InputPath'],
            "OutputPath": execution_input['OutputPath'],
            "ExecutionName": execution_input['ExecutionName']
        }
    }
)

get_processing_status = LambdaStep(
    state_id="GetPreprocessingStatus",
    parameters={
        "FunctionName": "your-checkstatus-fn-arn",
        "Payload": {
            "JobName": create_processing_step.output()['Payload']['JobName']
        }
    }
)

wait_state = Wait(
    state_id="Wait",
    seconds=60
)

## Choice step - loop to check the processing state

In [None]:
preprocessing_failure = Fail(
    state_id='Preprocessing Fail',
    error='Preprocessing has failed!',
    cause='See lambda error'
)
check_job_choice = Choice(
    state_id="IsPreProcessingComplete"
)
check_job_choice.add_choice(
    ChoiceRule.StringEquals(variable=get_processing_status.output()['Payload']['ProcessingJobStatus'],
                            value='InProgress'),
    next_step=get_processing_status
)

check_job_choice.add_choice(
    ChoiceRule.StringEquals(variable=get_processing_status.output()['Payload']['ProcessingJobStatus'],
                            value='Stopping'),
    next_step=get_processing_status
)

check_job_choice.add_choice(
    ChoiceRule.StringEquals(variable=get_processing_status.output()['Payload']['ProcessingJobStatus'],
                            value='Failed'),
    next_step=preprocessing_failure
)

check_job_choice.add_choice(
    ChoiceRule.StringEquals(variable=get_processing_status.output()['Payload']['ProcessingJobStatus'],
                            value='Stopped'),
    next_step=preprocessing_failure
)

check_job_choice.add_choice(
    ChoiceRule.StringEquals(variable=get_processing_status.output()['Payload']['ProcessingJobStatus'],
                            value='Completed'),
    next_step=ml_steps_def
)

In [None]:
ml_preprocess_def = Chain([
    create_processing_step,
    get_processing_status,
    wait_state,
    check_job_choice
])

In [None]:
full_wf = Workflow(
    name='my-workflow',
    definition=ml_preprocess_def,
    role=wf_role
)

create 시점에 workflow가 등록(배포)된다.

In [None]:
full_wf.create()

In [None]:
import time
timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
full_execution = full_wf.execute(inputs={
    'InputPath': input_uri,
    'OutputPath': dataset_path,
    'ExecutionName': 'sfworkflow-{}'.format(timestamp)
})

In [None]:
full_execution.render_progress()

In [None]:
full_execution.list_events(html=True)
# html parsing sometimes fails then set html=False for raw output

# Data 업로드로 trigger

In [None]:
import boto3
s3_client = boto3.client('s3')
s3_client.upload_file('../00_Basics/raw_data.csv', bucket, 'your-prefix/raw_data.csv')

In [None]:
Workflow.list_workflows()

In [None]:
wf_ref = Workflow.attach(state_machine_arn='your-state-machine-arn')

In [None]:
wf_ref.list_executions()

In [None]:
from stepfunctions.workflow import Execution
import datetime

In [None]:
exec_ref = Execution(workflow=wf_ref,
                     execution_arn='your-execution-arn',
                     start_date=datetime.datetime(2020, 7, 2), status='RUNNING')

In [None]:
exec_ref.render_progress()