## 安装和初始化

#### 安装stepfunction模块

In [1]:
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

Requirement already up-to-date: stepfunctions in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (1.0.0.7)
Requirement not upgraded as not directly required: sagemaker>=1.42.8 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from stepfunctions) (1.51.4)
Requirement not upgraded as not directly required: pyyaml in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from stepfunctions) (5.3.1)
Requirement not upgraded as not directly required: boto3>=1.9.213 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from stepfunctions) (1.12.27)
Requirement not upgraded as not directly required: scipy>=0.19.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=1.42.8->stepfunctions) (1.1.0)
Requirement not upgraded as not directly required: numpy>=1.9.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=1.42.8->stepfunctions) (1.14.3)
Requirement not upgra

#### 初始化一些参数

In [3]:
import uuid
import logging
import stepfunctions
import boto3
import sagemaker

# 通用的初始化
session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

bucket = 'sagemaker-pipeline-mnist-datasets' # 整个实验要使用的bucket
source_prefix = 'source/tf-mnist' # 源数据存放的prefix
output_prefix = 'output/tf-mnist' # 转换完的数据存放的prefix

# 生成uuid，用于唯一化各个组件需要用到的name
id = uuid.uuid4().hex

## 分配对应的权限：
#### 1.给notebook的role分配权限，使其可以创建step function的各个组件
给sagemaker notebook的role增加`AWSStepFunctionsFullAccess`权限，以便可以在notebook中创建step function的工作流
#### 2.给notebook的role分配权限，使其可以创建Glue Job，可以创建lambda函数，并且可以将role pass给某个lambda函数
- 找到notebook的Role -> Permission -> 选择某条策略 -> edit policy
- Add additional Policy -> Service选择**IAM** -> Action选择**PassRole** -> Resource选择**Specific** -> 指定可被Pass的role为**query_training_status-role**
- 继续 Add additional Policy -> Service选择**Lambda** -> Action选择**Write** -> Resource选择**all resource**
- 继续 Add additional Policy -> Service选择**Glue** -> Action选择**Write** -> Resource选择**all resource**
- Review and Save changes

#### 3.给StepFunction创建IAM Role，使其未来可以具有操作sagemaker的权限
- 进入IAM控制台 -> Role -> Create Rule
- trusted entity选择**AWS Service** -> 服务选择**Step Function** -> Next Permission
- 一路Next直到输入名称`StepFunctionsWorkflowExecutionRole` -> **Create**

下面将给这个Role赋予可以操作sagemaker、调用Lambda和EventBridge创建event rules的权限，遵从最佳实践--最小化权限原则

- 在Permission下 -> Attach Policies -> Create Policy
- 粘贴如下的Policy，并替换必要的变量 [YOUR_NOTEBOOK_ROLE_ARN]， [YOUR_LAMBDA_FUNCTION_PREFIX]， [YOUR_GLUE_ETL_JOB_PREFIX]；由于lambda和glue job的名字有动态的后缀，所以这里只需要定义好前缀。
- [YOUR_LAMBDA_FUNCTION_PREFIX] = query-training-status, [YOUR_GLUE_ETL_JOB_PREFIX] = glue-mnist-etl
- Review -> 输入名字：StepFunctionsWorkflowExecutionPolicy，并创建Policy

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "[YOUR_NOTEBOOK_ROLE_ARN]",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:CreateModel",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:CreateEndpoint",
                "sagemaker:StopTrainingJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:UpdateEndpoint",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:DeleteEndpoint"
            ],
            "Resource": [
                "arn:aws:sagemaker:*:*:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:DescribeRule",
                "events:PutRule",
                "events:PutTargets"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:*:*:function:[YOUR_LAMBDA_FUNCTION_PREFIX]*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:StartJobRun",
                "glue:GetJobRun",
                "glue:BatchStopJobRun",
                "glue:GetJobRuns"
            ],
            "Resource": "arn:aws:glue:*:*:job/[YOUR_GLUE_ETL_JOB_PREFIX]*"
        }
    ]
}
```
- 然后返回给Role attach policy的窗口，选择刚刚创建的Policy，并attach

#### 4.创建Glue Job要使用的Role，这个Role要有Glue Job的要读写数据的Bucket的权限

- 进入IAM控制台 -> Roles -> Create Role
- trusted entity选择**AWS Service** -> 服务选择**Glue** -> **Next Permission**
- 选择 `AmazonS3FullAccess policy`，然后一路next
- 直到Review页面，属于名称 `AWS-Glue-S3-Bucket-Access` -> **Create Role**

#### 5.创建Lambda函数要使用的Role，函数需要这个role去

- 进入IAM控制台 -> Roles -> Create Role
- trusted entity选择**AWS Service** -> 服务选择**Lambda** -> **Next Permission**
- 选择 `AmazonSageMakerReadOnly` 和 `AWSLambdaBasicExecutionRole`，然后一路next
- 直到Review页面，属于名称 `query_training_status-role` -> **Create Role**

## 准备数据
- 从Internet下载mnist数据集
- 对数据集进行处理，将train、validation和test数据集Label和Feature数据分别合并到一个文件中

In [4]:
import pickle, gzip, urllib.request, json
import numpy as np

# Load the dataset
urllib.request.urlretrieve("http://deeplearning.net/data/mnist/mnist.pkl.gz", "mnist.pkl.gz")
with gzip.open('mnist.pkl.gz', 'rb') as f:
    train_set, valid_set, test_set = pickle.load(f, encoding='latin1')

In [5]:
# 将下载的mnist数据的feature和label合并到一起，并存储到S3
def convert_data():
    data_partitions = [('train', train_set), ('validation', valid_set), ('test', test_set)]
    for data_partition_name, data_partition in data_partitions:
        print('{}: {} {}'.format(data_partition_name, data_partition[0].shape, data_partition[1].shape))
        labels = [t.tolist() for t in data_partition[1]]
        features = [t.tolist() for t in data_partition[0]]
        
        if data_partition_name != 'test':
            examples = np.insert(features, 0, labels, axis=1)  # 在feature矩阵的第0列插入labels
        else:
            examples = features  # test数据集没有把labels加进去，why？
        
        np.savetxt('data.csv', examples, delimiter=',')
        
        key = "{}/{}/examples".format(source_prefix,data_partition_name)
        url = 's3://{}/{}'.format(bucket, key)
        boto3.Session().resource('s3').Bucket(bucket).Object(key).upload_file('data.csv')
        print('Done writing to {}'.format(url))
        
convert_data()

train: (50000, 784) (50000,)
Done writing to s3://sagemaker-pipeline-mnist-datasets/source/tf-mnist/train/examples
validation: (10000, 784) (10000,)
Done writing to s3://sagemaker-pipeline-mnist-datasets/source/tf-mnist/validation/examples
test: (10000, 784) (10000,)
Done writing to s3://sagemaker-pipeline-mnist-datasets/source/tf-mnist/test/examples


## 创建资源

### 创建Glue ETL Job

- 在这里我们创建的Glue ETL Job的作用是对所有数据集的Features进行normalization，以降低极值对训练和预测的影响
- glue是一个serverless的etl服务，底层通过spark实现，我们可以编写etl脚本交由glue运行

In [7]:
from sagemaker.s3 import S3Uploader

# 将glue脚本上传到s3
glue_script_location = S3Uploader.upload(local_path='./train_val_norm.py',
                               desired_s3_uri='s3://{}/{}'.format(bucket, 'glue_script'),
                               session=session)

job_name = 'train-val-norm-job-{}'.format(id) # 定义glue job的名字
glue_role = 'AWS-Glue-S3-Bucket-Access'  # 使用权限设置章节中创建的glue role

glue_client = boto3.client('glue')

response = glue_client.create_job(
    Name=job_name,
    Description='PySpark job to normalize the features of train and validation data',
    Role=glue_role, 
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': glue_script_location,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--job-language': 'python'
    },
    GlueVersion='1.0',
    WorkerType='Standard',
    NumberOfWorkers=2,
    Timeout=60
)

IdempotentParameterMismatchException: An error occurred (IdempotentParameterMismatchException) when calling the CreateJob operation: Job with name 'train-val-norm-job-85f525994ce84c24a9716697288794de' already submitted with different configuration

### 创建sagemaker的estimator

- estimator是一个对象，用来完成sagemaker的各个环节，training和hosting等
- 需要定义estimator的配置，比如训练数据，训练实例类型，超参数等

In [9]:
from sagemaker.tensorflow import TensorFlow

# 定义训练配置，实例类型和超参等
s3_output_location = 's3://{}/{}/{}'.format(bucket, output_prefix, 'tf-mninst-output')
model_dir = '/opt/ml/model'
train_instance_type = 'ml.m5.xlarge'
hyperparameters = {'epochs': 5, 'batch_size': 128, 'learning_rate': 0.01, 'other_para':0.1}

# 如果需要监控训练算法中某一个指标，可以定义metric_definitions并传入Tensorflow estimator，被监控的metrics会被解析并打到cloudwatch
metric_definitions = [{'Name': 'accuracy',
                       'Regex': 'accuracy=(.*?);'}]

# 创建一个tensorflow的estimator
tf_estimator = TensorFlow(
                       entry_point='my_train_1.py',
                       model_dir=model_dir,
                       output_path=s3_output_location,
                       train_instance_type=train_instance_type,
                       train_instance_count=1,
                       hyperparameters=hyperparameters,
                       role=sagemaker.get_execution_role(),
                       base_job_name='tf-scriptmode-mnist',
                       framework_version='2.0.0',
                       py_version='py3',
                       metric_definitions=metric_definitions,
                       script_mode=True)

## 定义Step Function Pipeline

#### import相关module

In [10]:
import stepfunctions
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

#### 定义step function的input的schema

In [11]:
execution_input = ExecutionInput(schema={
    'TrainingJobName': str,
    'GlueJobName': str,
    'ModelName': str,
    'EndpointName': str
})

#### 定义glue step

In [12]:
# 定义要传入glue job的参数
source_path = 's3://{}/{}/'.format(bucket, source_prefix)
output_path = 's3://{}/{}/'.format(bucket, output_prefix)
train_prefix = 'train'
val_prefix = 'validation'

etl_step = steps.GlueStartJobRunStep(
    'Extract, Transform, Load',
    parameters={"JobName": execution_input['GlueJobName'],
                "Arguments":{
                    '--SOURCE_PATH': source_path,
                    '--OUTPUT_PATH': output_path,
                    '--TRAIN_PREFIX': train_prefix + '/',
                    '--VAL_PREFIX': val_prefix + '/'}
               }
)

#### 定义sagemaker training step

In [13]:
# 定义训练数据的位置
train_data = 's3://{}/{}/{}'.format(bucket, output_prefix, 'train')
validation_data = 's3://{}/{}/{}'.format(bucket, output_prefix, 'validation')

# data chennels会作为参数传递给estimator构造函数，定义训练数据的信息
data_channels = {'train': train_data, 'validation': validation_data}


training_step = steps.TrainingStep(
    'Model Training', 
    estimator=tf_estimator,
    data=data_channels,
    job_name=execution_input['TrainingJobName'],
    wait_for_completion=True
)

#### 定义sagemaker生成model的step

In [14]:
model_step = steps.ModelStep(
    'Save Model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName'],
    instance_type='ml.m5.xlarge',
    result_path='$.ModelStepResults'
)

#### 定义部署model的endpoint configure的step

In [15]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m4.xlarge'
)

#### 创建endpoint step

In [16]:
endpoint_step = steps.EndpointStep(
    'Update Model Endpoint',
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName'],
    update=False
)

#### 生成workflow

In [17]:
workflow_definition = steps.Chain([
    etl_step,
    training_step,
    model_step,
    endpoint_config_step,
    endpoint_step
])

In [18]:
# 使用之前创建的step function role
workflow_execution_role = 'arn:aws:iam::935206693453:role/StepFunctionsWorkflowExecutionRole'

workflow = Workflow(
    name='My-SM-Pipline-{}'.format(id),
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input
)

In [19]:
workflow.render_graph()

In [20]:
workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws:states:us-east-2:935206693453:stateMachine:My-SM-Pipline-85f525994ce84c24a9716697288794de'

In [21]:
execution = workflow.execute(
    inputs={
        'TrainingJobName': 'my-sm-pipeline-job-{}'.format(id), # Each Sagemaker Job requires a unique name,
        'GlueJobName': job_name,
        'ModelName': 'my-sm-pipeline-model-{}'.format(id),
        'EndpointName': 'my-sm-pipeline-endpoint-{}'.format(id)
    }
)

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m
