In [40]:
import sys
import uuid
import logging
import stepfunctions
import boto3
import sagemaker
import pandas as pd
import numpy as np
from time import gmtime, strftime, sleep         
import os
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker import s3_input
from sagemaker.s3 import S3Uploader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

In [41]:
# 定义sfn参数
session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

region = boto3.Session().region_name
bucket = session.default_bucket()
hex_id = uuid.uuid4().hex
# function_name = 'query-training-status-{}'.format(hex_id)
workflow_execution_role = 'arn:aws:iam::851108988172:role/StepFunctionsWorkflowExecutionRole'
sagemaker_execution_role = sagemaker.get_execution_role()

In [42]:
# 定义数据集
project_name = 'cifar-10-data'
# boto3.Session().resource('s3').Bucket(bucket).Object('cifar-10-data/eval.tfrecords').upload_file('cifar-10-data/eval.tfrecords')
# boto3.Session().resource('s3').Bucket(bucket).Object('cifar-10-data/train.tfrecords').upload_file('cifar-10-data/train.tfrecords')
# boto3.Session().resource('s3').Bucket(bucket).Object('cifar-10-data/validation.tfrecords').upload_file('cifar-10-data/validation.tfrecords')
train_data = 's3://{}/{}/'.format(bucket, project_name)

In [43]:
# Configure the AWS SageMaker Estimator
hyperparameters = {'train-steps': 100}
instance_type = 'ml.m5.large'
"""estimator = sagemaker.estimator.Estimator(role=sagemaker_execution_role,
                      train_instance_count=1,
                      train_instance_type=instance_type,
                      image_name='sagemaker-tf-cifar10-example:latest',
                      hyperparameters=hyperparameters)
estimator.fit('s3://{}/cifar-10-data/'.format(bucket))"""

"estimator = sagemaker.estimator.Estimator(role=sagemaker_execution_role,\n                      train_instance_count=1,\n                      train_instance_type=instance_type,\n                      image_name='sagemaker-tf-cifar10-example:latest',\n                      hyperparameters=hyperparameters)\nestimator.fit('s3://{}/cifar-10-data/'.format(bucket))"

In [44]:
# container = get_image_uri(region, 'sagemaker-tf-cifar10-example:latest')

cif = sagemaker.estimator.Estimator(image_name='851108988172.dkr.ecr.us-east-1.amazonaws.com/sagemaker-tf-cifar10-example:latest',
                                    role=sagemaker_execution_role, 
                                    train_instance_count=1, 
                                    train_instance_type=instance_type,
                                    hyperparameters=hyperparameters,
                                    output_path='s3://{}/{}/output'.format(bucket, project_name))

In [45]:
# Build a Machine Learning Workflow
execution_input = ExecutionInput(schema={
    'TrainingJobName': str,
    'ModelName': str,
    'EndpointName': str,
})

In [46]:
# Create a SageMaker Training Step
training_step = steps.TrainingStep(
    'Model Training', 
    estimator=cif,
    data=train_data,
    job_name=execution_input['TrainingJobName'],
    wait_for_completion=True
)

In [47]:
# Create save model step
model_step = steps.ModelStep(
    'Save Model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName'],
    result_path='$.ModelStepResults'
)

In [48]:
# Create endpoint step
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m5.xlarge'
)

In [49]:
# Update endpoint step
endpoint_step = steps.EndpointStep(
    'Update Model Endpoint',
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName'],
    update=False
)

In [50]:
# Link all steps
workflow_definition = steps.Chain([
    training_step,
    model_step,
    endpoint_config_step,
    endpoint_step
])

In [51]:
# Run workflow
workflow = Workflow(
    name='MyBYOC_{}'.format(uuid.uuid4().hex),
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input
)

In [52]:
workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:us-east-1:851108988172:stateMachine:MyBYOC_81d40346065a4d25af4ada500a8c93c7'

In [59]:
execution = workflow.execute(
    inputs={
        'TrainingJobName': 'BYOCJob-{}'.format(uuid.uuid4().hex), # Each Sagemaker Job requires a unique name,
        'ModelName': 'BYOCModel-{}'.format(uuid.uuid4().hex), # Each Model requires a unique name,
        'EndpointName': 'BYOCEndpoint-{}'.format(uuid.uuid4().hex), # Each Endpoint requires a unique name
    }
)

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m
