# 创建自动部署 Sagemaker Endpoint for TensorFlow Step Function 状态机

## 介绍
该 Notebook 使用 AWS Step Functions Data Science SDK 创建一个自动部署 Sagemaker TensorFlow 的 workflow。可以自动部署存储在 S3 中的 TensorFlow 模型文件为 Sagemaker Endpoint，并且当模型重新训练，模型文件更新后，可以自动部署更新 Sagemaker 的模型服务。

In [30]:
!pip install stepfunctions



In [2]:
import sagemaker
from sagemaker.tensorflow.serving import Model
from sagemaker.session import Session

import stepfunctions
from stepfunctions import steps
from stepfunctions.steps import ModelStep, EndpointConfigStep, EndpointStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.steps.states import Catch

import tensorflow as tf

import logging
import uuid




In [3]:
#生成一个唯一id
id = uuid.uuid4().hex

sagemaker_session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

#替换 bucket 和 prefix 为你自己的 bucket name 和前缀
bucket = "sagemaker-techday-becky"
prefix = "yalla/model"

# SageMaker 运行过程中的 IAM Role
sagemaker_execution_role = (
    sagemaker.get_execution_role()
)  # Replace with ARN if not in an AWS SageMaker notebook

In [4]:
# 获取 TensorFlow 模型

framework_version = tf.__version__

model_name = "wdl_model.tar.gz"
model_data = "s3://{}/{}/{}".format(bucket, prefix, model_name)
sm_model = Model(model_data=model_data, framework_version=framework_version, role=sagemaker_execution_role, sagemaker_session=sagemaker_session)

In [5]:
# 定义 step functions 
execution_input = ExecutionInput(schema={
    'ModelName': str,
    'EndpointName': str,
})

In [8]:
# 创建save model步骤
model_step = steps.ModelStep(
    'Save Model',
    model=sm_model,
    model_name=execution_input["ModelName"],
    result_path='$.ModelStepResults',
    instance_type='ml.m5.xlarge'
)

In [9]:
# 创建终端节点配置步骤
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input["ModelName"],
    model_name=execution_input["ModelName"],
    initial_instance_count=1,
    instance_type='ml.m5.xlarge'
)

In [10]:
# 更新终端节点
update_endpoint_step = steps.EndpointStep(
    'Update Model Endpoint',
    endpoint_name=execution_input["EndpointName"],
    endpoint_config_name=execution_input["ModelName"],
    update=True
)

In [12]:
# 创建终端节点
create_endpoint_step = steps.EndpointStep(
    'create Model Endpoint',
    endpoint_name=execution_input["EndpointName"],
    endpoint_config_name=execution_input["ModelName"],
    update=False
)

In [13]:
# 创建 Catch，如果更新失败（即endpoint不存在）就创建endpoint
update_endpoint_step.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=create_endpoint_step
))

In [14]:
# 连接所有步骤
workflow_definition = steps.Chain([
    model_step,
    endpoint_config_step,
    update_endpoint_step
])

In [16]:
# 将AccountID替换成你自己的AccountID
workflow_execution_role = 'arn:aws:iam::528326054333:role/StepFunctionsWorkflowExecutionRole'

# 定义工作流
workflow = Workflow(
    name='step_functions_deploySMmodel'.format(id),
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input
)

In [17]:
# 创建 Step Functions 状态机

workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:ap-northeast-1:528326054333:stateMachine:step_functions_deploySMmodel_081177fa4f6d4588a2891e53fdc66752'

In [None]:
# 测试执行 step functions 状态机

# import datetime

# curr_time = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
# execution = workflow.execute(
#     inputs={
#         "ModelName": "yalla-sm-model-{}".format(curr_time),
#         "EndpointName": "yalla-sm-endpoint",
#     }
# )