# AWS Step Functions Data Science SDK と Amazon SageMaker でデータ準備、AutoGluon を使ったモデル学習、バッチ推論パイプラインを構築

このノートブックは、00-prepare-container-images.ipynb を実行してから実行してください。


1. [背景](#背景)
1. [セットアップ](#セットアップ)
1. [S3 バケットの準備](#S3-バケットの準備)
1. [データ](#データ)
1. [ステータス通知用 SNS Topic の準備](#ステータス通知用-SNS-Topic-の準備)
1. [ステータス通知用 Lambda 関数の準備](#ステータス通知用-Lambda-関数の準備)
1. [AWS Step Functions の準備](#AWS-Step-Functions-の準備)
1. [AWS Step Functions Workflow の実行](#AWS-Step-Functions-Workflow-の実行)
1. [S3 へのファイル作成を AWS Step Functions Workflow 実行用 Lambda 関数のトリガーに設定](#S3-へのファイル作成を-AWS-Step-Functions-Workflow-実行用-Lambda-関数のトリガーに設定)
1. [動作確認](#動作確認)
1. [リソースの削除](#リソースの削除)


---

## 背景

AWS Step Functions は、機械学習パイプラインの構築でよく使われます。AWS Step Functions Data Science SDK を使うと、Python でパイプラインを作ることができるため、データサイエンティストが自身のユースケースに最適な構成を簡単に構築できます。Step Functions を使った基本的なパイプライン構築方法については [こちらのサンプルノートブック](https://github.com/aws-samples/aws-ml-jp/blob/main/mlops/step-functions-data-science-sdk/model-train-evaluate-compare/step_functions_mlworkflow_scikit_learn_data_processing_and_model_evaluation_with_experiments.ipynb) をご参照ください。 

本ノートブックは、**データ準備、モデル学習、バッチ推論のパイプライン** を構築するためのサンプルノートブックです。扱うのはテーブル形式のデータで、モデルの学習には [AutoGluon-Tabular](https://auto.gluon.ai/stable/tutorials/tabular_prediction/index.html) を使用します。[こちらの記事](https://aws.amazon.com/jp/builders-flash/202201/autogluon-tabular-tutorials/?awsf.filter-name=*all) で日本語での簡単な使用方法が説明されています。サンプルデータとしては、公開されているカリフォルニア州の住宅データセットを使用します。ターゲット変数は、カリフォルニア州の地区の住宅価格の中央値です。

<img src="workflow.png" width="50%">

---

## セットアップ

このサンプルノートブックは長いので、実行したいセルにアクセスしやすいよう Table of Contents を作成する拡張機能をインストールすると便利です。以下のセルを実行したあと、このノートブックを開いているブラウザのタブをリロードすると Table of Contents の拡張機能が使えるようになります。以下のセルは、ノートブックインスタンスを再起動するまで再実行の必要はありません。

In [None]:
%%sh
pip install jupyter_contrib_nbextensions
jupyter contrib nbextension install --user
jupyter nbextension enable toc2/main

### Step Functions Data Science SDK をインストール

以下のセルを実行したら、**メニューの「Kernel」->「Restart」をクリックしてカーネルを再起動してください。**再起動後は以下のセルを再度実行する必要はないので、その下から作業を再開してください。

In [None]:

pip install -U awscli boto3 "sagemaker>=2.0.0"
pip install -U "stepfunctions==2.3.0"


SageMaker セッションを作成し、設定を開始します。

- 学習およびモデルデータに使用する S3 バケットとプレフィックスは、ノートブックインスタンス、トレーニング、およびホスティングと同じリージョン内にある必要があります。
- データへの学習およびホスティングアクセスを提供するために使用される IAM ロール arn を用います。 ノートブックインスタンス、学習インスタンス、および/またはホスティングインスタンスに複数のロールが必要な場合は、 `sagemaker.get_execution_role（）` を、適切な IAM ロール arn 文字列に置き換えてください。

**以下のセルの初めの方にある `mail_address` にご自身のメールアドレスを記載してから以下のセルを実行してください。**

In [None]:
import boto3
from datetime import datetime
from dateutil import tz
import json
import os
import pandas as pd
import sagemaker
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput
from time import sleep

%store -r

mail_address = "<SET YOUR E-MAIL ADDRESS>"

JST = tz.gettz('Asia/Tokyo')
timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
iam_client = boto3.client('iam', region_name=region)
sns_client = boto3.client('sns', region_name=region)
ecr_client = boto3.client('ecr', region_name=region)

role = sagemaker.get_execution_role()
account_id = boto3.client('sts').get_caller_identity().get('Account')

sagemaker_policy_name = project_name + '-' + user_name + '-policy'
prefix = f'sagemaker/{project_name}/{user_name}'
bucket_name = project_name + '-' + user_name + '-' + timestamp
bucket_name_trigger = project_name + '-' + user_name + '-trigger-' + timestamp

s3_client = boto3.client('s3', region_name=region)
lambda_client = boto3.client('lambda', region_name=region)

policy_arn_list = []
role_name_list = []
lambda_function_list = []

role_name = role.split('/')[-1]
iam_console_url = f'https://{region}.console.aws.amazon.com/iamv2/home#/roles/details/{role_name}?section=permissions'

from IPython.display import display, Markdown
text = f"""
以下の手順で IAM 関連の設定を実施してください。
1. <a href=\"policy/sagemaker-policy.json\" target=\"_blank\">policy/sagemaker-policy.json</a> の中身をコピー
1. <a href=\"https://{region}.console.aws.amazon.com/iam/home#/policies$new?step=edit\" target=\"_blank\">IAM Policy の作成</a>をクリックし、**JSON** タブをクリックしてから手順1でコピーした JSON をペーストして右下の **次のステップ：タグ** ボタンをクリック
1. 右下の **次のステップ：確認** ボタンをクリック
1. **名前** に **「{sagemaker_policy_name}」** を記載して、右下の **ポリシーの作成** ボタンをクリック
1.  <a href=\"{iam_console_url}\" target=\"_blank\">ノートブックインスタンスにアタッチされた IAM Role</a> を開く
1. **許可を追加** ボタンをクリックして **ポリシーをアタッチ** を選択
1. **その他の許可ポリシー** の検索ボックスで手順4 で作成した {sagemaker_policy_name} を検索して横にあるチェックボックスをオンにする
1. **ポリシーのアタッチ** をクリック
"""
display(Markdown(text))

IAM Role や Policy 作成用のヘルパー関数を作成します。

In [None]:
from time import sleep

def get_policy_arn(policy_name):
    marker = ''
    while True:
        if marker == '':
            response = iam_client.list_policies(Scope='Local')
        else:
            response = iam_client.list_policies(Scope='Local', Marker=marker)
        for content in response['Policies']:
            if policy_name == content['PolicyName']:
                return content['Arn']
        if 'Marker' in response:
            marker = response['Marker']
        else:
            break

    return ''


def detach_role_policies(role_name):
    try:
        response = iam_client.list_attached_role_policies(
            RoleName=role_name,
        )
    except Exception as ex:
        print(ex)
    policies = response['AttachedPolicies']

    for p in policies:
        response = iam_client.detach_role_policy(
            RoleName=role_name,
            PolicyArn=p['PolicyArn']
        )

            
def create_role(role_name, assume_role_policy):
    try:
        response = iam_client.create_role(
            Path = '/service-role/',
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps(assume_role_policy),
            MaxSessionDuration=3600*12 # 12 hours
        )
        role_arn = response['Role']['Arn']
    except Exception as ex:
        if "EntityAlreadyExists" in str(ex):
            detach_role_policies(role_name)
            response = iam_client.delete_role(
                RoleName = role_name,
            )
            response = iam_client.create_role(
                Path = '/service-role/',
                RoleName = role_name,
                AssumeRolePolicyDocument = json.dumps(assume_role_policy),
                MaxSessionDuration=3600*12 # 12 hours
            )
            role_arn = response['Role']['Arn']
        else:
            print(ex)
    sleep(10)
    return role_arn


def create_policy(policy_name, policy_json_name):
    with open('policy/' + policy_json_name, 'r') as f:
        policy_json = json.load(f)
    try:
        response = iam_client.create_policy(
            PolicyName=policy_name,
            PolicyDocument=json.dumps(policy_json),
        )
        policy_arn = response['Policy']['Arn']
    except Exception as ex:
        if "EntityAlreadyExists" in str(ex):
            response = iam_client.delete_policy(
                PolicyArn=get_policy_arn(policy_name)
            )
            response = iam_client.create_policy(
                PolicyName=policy_name,
                PolicyDocument=json.dumps(policy_json),
            )
            policy_arn = response['Policy']['Arn']
    policy_arn_list.append(policy_arn)
    
    sleep(10)
    return policy_arn


def create_policy_role(policy_name, policy_json_name, role_name, assume_role_policy):

    role_arn = create_role(role_name, assume_role_policy)
    policy_arn = create_policy(policy_name, policy_json_name)

    sleep(5)
    response = iam_client.attach_role_policy(
        RoleName=role_name,
        PolicyArn=policy_arn
    )

    role_name_list.append(role_name)
    policy_arn_list.append(policy_arn)
    sleep(10)
    return role_arn

## S3 バケットの準備
### Job 生成物格納用 S3 バケットの準備

SageMaker Jobs が生成したデータやモデルなどを保存する S3 バケットを作成します。セキュリティのため暗号化を有効にします。

In [None]:
def create_bucket(bucket_name):

    if region == 'us-east-1':
        response = s3_client.create_bucket(Bucket=bucket_name)
    else:
        location = {'LocationConstraint': region}
        response = s3_client.create_bucket(Bucket=bucket_name,
                                           CreateBucketConfiguration=location)
    sleep(10)
    response = s3_client.put_bucket_encryption(
        Bucket=bucket_name,
        ServerSideEncryptionConfiguration={
            'Rules': [
                {
                    'ApplyServerSideEncryptionByDefault': {
                        'SSEAlgorithm': 'AES256',
                    },
                },
            ]
        },
    )

    response = s3_client.put_public_access_block(
        Bucket=bucket_name,
        PublicAccessBlockConfiguration={
            'BlockPublicAcls': True,
            'IgnorePublicAcls': True,
            'BlockPublicPolicy': True,
            'RestrictPublicBuckets': True
        },
        ExpectedBucketOwner=account_id
    )
create_bucket(bucket_name)

### トリガーとなるファイルアップロード用 S3 バケットの準備

このサンプルノートブックでは、バッチ推論の入力ファイルをアップロードする S3 バケットを、SageMaker Jobs の出力保存用バケットとは別に用意します。同じバケットを利用してももちろんかまいません。

In [None]:
create_bucket(bucket_name_trigger)

## データ

このサンプルノートブックで使用するデータセットは、1990 年の米国国勢調査の結果を使って作成された StatLib リポジトリ (http://lib.stat.cmu.edu/datasets/) から入手したもので、各行が国勢調査ブロックグループに対応しています。ブロックグループとは、米国国勢調査局がサンプルデータを公表している最小の地理的単位で、人口は通常 600～3,000人です。 

### データの取得

AWS が用意した S3 バケットからデータをダウンロードして展開します。

In [None]:
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/california_housing/cal_housing.tgz .
!tar -xvzf  cal_housing.tgz

ダウンロードしたデータを DataFrame として読み込み、CSV 形式で保存します。

In [None]:
columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
    "medianHouseValue",
]

target = "medianHouseValue"

cal_housing_df = pd.read_csv("CaliforniaHousing/cal_housing.data", names=columns, header=None)
rawdata_name = 'rawdata.csv'
cal_housing_df.to_csv(rawdata_name, index=None)
cal_housing_df

### データをS3にアップロードする
ML パイプラインのトリガーとなるデータアップロード用の S3 バケットにデータをアップロードします。S3 バケットにトリガーの設定をしていないため、この時点ではトリガーは発行されません。データセットを S3 にアップロードするには、 `sagemaker.Session.upload_data` 関数を使用します。 戻り値として入力した S3 のロケーションは、後で学習ジョブを実行するときに使用します。

In [None]:
raw_data_s3_path = sagemaker_session.upload_data(path=rawdata_name, bucket=bucket_name_trigger, key_prefix=prefix + "/rawdata")
print('Raw data S3 path:', raw_data_s3_path)

## ステータス通知用 SNS Topic の準備

ML パイプラインの中で、実行状況を通知するための SNS Topic を作成します。

In [None]:
sns_notification_topic_name =  project_name + '-notification-' + user_name
response = sns_client.create_topic(
    Name=sns_notification_topic_name
)
sns_notification_topic_name

このノートブックの初めのセットアップのところで設定したメールアドレスに通知が送信されるよう設定します。以下のセルを実行してから数分すると、設定したメールアドレスに「AWS Notification - Subscription Confirmation」というタイトルのメールが届きます。このメールの本文にある「Confirm subscription」のリンクをクリックして、SNS Topic のサブスクリプションを有効にします。

In [None]:
sns_notification_topic_arn = response['TopicArn']
response = sns_client.subscribe(
    TopicArn=sns_notification_topic_arn,
    Protocol='email',
    Endpoint = mail_address,
)


## ステータス通知用 Lambda 関数の準備

作成した SNS Topic にメッセージを送信する Lambda 関数を作成します。

In [None]:
 def create_lambda_function(function_name, file_name, role_arn, handler_name,
                            envs={}, py_version='python3.9'):

    with open(file_name+'.zip', 'rb') as f:
        zip_data = f.read()
        
    if function_exists(function_name):
        
        response = lambda_client.update_function_configuration(
            FunctionName=function_name,
            Environment={
                'Variables': envs
            },
        )
        sleep(10)
        response = lambda_client.update_function_code(
            FunctionName=function_name,
            ZipFile=zip_data,
            Publish=True,
        )
        
    else:
        response = lambda_client.create_function(
            FunctionName=function_name,
            Role=role_arn,
            Handler=handler_name+'.lambda_handler',
            Runtime=py_version,
            Code={
                'ZipFile':zip_data
            },
            Environment={
                'Variables': envs
            },
            Timeout=60*5, # 5 minutes
            MemorySize=128, # 128 MB
            Publish=True,
            PackageType='Zip',
        )
    lambda_function_list.append(function_name)
    return response['FunctionArn']

def function_exists(function_name):
    try:
        lambda_client.get_function(
            FunctionName=function_name,
        )
        return True
    except Exception as e:
        return False

In [None]:
lambda_notification_function_name  = project_name + '-notification-' + user_name
lambda_notification_policy_name = lambda_notification_function_name + '-policy'
lambda_notification_role_name = lambda_notification_function_name + '-role'
lambda_notification_json_name = 'lambda-notification-policy.json'

assume_role_policy = {
  "Version": "2012-10-17",
  "Statement": [{"Sid": "","Effect": "Allow","Principal": {"Service":"lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]
}

lambda_notification_role_arn = create_policy_role(lambda_notification_policy_name, lambda_notification_json_name,
                   lambda_notification_role_name, assume_role_policy)
sleep(10) # wait until IAM is created

以下のセルでは、Lambda 関数で使用するライブラリとソースコードを zip に固めています。Lmabda 関数を作成する際は、以下の処理を実行した環境と同じ Python のバージョンのランタイムを指定してください。2022年7月現在、conda_python3 カーネルの Python バージョンは 3.8 なので、Lambda 関数の Python バージョンも 3.8 を指定します。

In [None]:
def prepare_lambda_resource(function_name, code_path):
    !rm -rf $function_name
    !rm {function_name}.zip
    !mkdir $function_name
    !pip install pyyaml -t $function_name
    !cp {code_path}/index.py $function_name
    !cd $function_name && zip -r ../{function_name}.zip .
prepare_lambda_resource(lambda_notification_function_name, 'code/notification')

Lambda 関数を作成します。環境変数に、先ほど作成した SNS Topic の ARN を設定します。

In [None]:
envs = {
    'SNS_TOPIC_ARN': sns_notification_topic_arn
}
lambda_notification_function_arn = create_lambda_function(lambda_notification_function_name,
                                                   lambda_notification_function_name,
                                                   lambda_notification_role_arn,
                                                   'index',
                                                   envs,
                                                   py_version='python3.8')

## AWS Step Functions の準備

Step Functions Data Science SDK を使って、冒頭に示した Step Functions Workflow を作成します。

In [None]:
import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.steps import (
    Chain,
    ProcessingStep,
)
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath
from stepfunctions.workflow import Workflow

### IAM Role と Policy の作成

Step Functions の Workflow にセットする IAM Role を作成します。

In [None]:
import json

step_functions_policy_name = project_name + '-stepfunctions-' + user_name + '-policy'
step_functions_role_name = project_name + '-stepfunctions-' + user_name + '-role'
step_functions_policy_json_name = 'stepfunctions-policy.json'

assume_role_policy = {
      "Version": "2012-10-17",
      "Statement": [{"Sid": "","Effect": "Allow","Principal": {"Service":"states.amazonaws.com"},"Action": "sts:AssumeRole"}]
    }

workflow_execution_role = create_policy_role(step_functions_policy_name, step_functions_policy_json_name,
                   step_functions_role_name, assume_role_policy)
workflow_execution_role

### Step Functions Workflow 実行時のパラメータの準備

Step Functions Workflow 実行時に指定するパラメータのスキーマを定義します。

In [None]:
execution_input = ExecutionInput(
    schema={
        "TimeStamp": str,
        "PrepJobName": str,
        "PrepInput": str,
        "PrepOutput": str,
        "TrainJobName": str,
        "TrainInput": str,
        "TrainOutput": str,
        "PredJobName": str,
        "PredArgs": str,
        "PredInput": str,
        "PredOutput": str,
        "PostJobName": str,
        "PostInput": str,
        "PostOutput": str,
    }
)


### Data Preparation Step の作成

データ準備部分を作成します。まず SageMaker Processing 用の Processor を作成し、それを ProcessingStep のパラメタにセットします。このとき、Processing で使用するソースコードやコンテナイメージも指定します。

In [None]:
num_of_segment = 2

prep_timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

code_path = '/opt/ml/processing/input/code'
input_dir = '/opt/ml/processing/input/data'
output_dir = '/opt/ml/processing/output'

SCRIPT_LOCATION = "code/prep"

code_s3_path = sagemaker_session.upload_data(
    SCRIPT_LOCATION,
    bucket=bucket_name,
    key_prefix=os.path.join(prefix, SCRIPT_LOCATION, prep_timestamp),
)

prep_inputs = [
    ProcessingInput(
        input_name='code',
        source=code_s3_path,
        destination=code_path),
    ProcessingInput(
        source=execution_input["PrepInput"],
        destination=input_dir,
        input_name="data"
    )
]

prep_outputs = [
    ProcessingOutput(
        source=output_dir,
        destination=execution_input["PrepOutput"],
        output_name="result",
    )
]

prep_processor = Processor(
        role=role,
        image_uri=prep_repository_uri,
#         entrypoint=["python3", f"{code_path}/prep.py"],
        instance_count=1, 
        instance_type="ml.m5.xlarge",
        volume_size_in_gb=16,
        volume_kms_key=None,
        output_kms_key=None,
        max_runtime_in_seconds=86400,  # default is 24 hours(60*60*24)
        sagemaker_session=None,
        env=None,
        network_config=None
)

prep_step = ProcessingStep(
    "Data Preparation",
    processor=prep_processor,
    job_name=execution_input["PrepJobName"],
    inputs=prep_inputs,
    outputs=prep_outputs,
    container_arguments=['--num-of-dataset', str(num_of_segment)],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/prep.py"],
    wait_for_completion=True,
)


### Model Training Step の作成

データ準備の時と同様に、モデル学習用の ProcessingStep を作成します。データ準備との違いは、学習データ用の ProcessingInput に `s3_data_distribution_type='ShardedByS3Key'` が追加されていることです。これにより、指定された S3 パスにあるファイルが各インスタンスに均等に分配されます。ファイルの数と Processing Job で使用するインスタンス数を同じにすれば、1インスタンス1ファイルの状態を作ることができます。

In [None]:
train_timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

code_path = '/opt/ml/processing/input/code'
input_dir = '/opt/ml/processing/input/data'
output_dir = '/opt/ml/processing/output'

SCRIPT_LOCATION = "code/train"

code_s3_path = sagemaker_session.upload_data(
    SCRIPT_LOCATION,
    bucket=bucket_name,
    key_prefix=os.path.join(prefix, SCRIPT_LOCATION, train_timestamp),
)

train_inputs = [
    ProcessingInput(
        input_name='code',
        source=code_s3_path,
        destination=code_path),
    ProcessingInput(
        source=execution_input["TrainInput"],
        destination=input_dir,
        input_name="data",
        s3_data_distribution_type='ShardedByS3Key'
    )
]

train_outputs = [
    ProcessingOutput(
        source=output_dir,
        destination=execution_input["TrainOutput"],
        output_name="result",
    )
]

train_processor = Processor(
        role=role,
        image_uri=train_repository_uri,
#         entrypoint=["python3", f"{code_path}/train.py"],
        instance_count=num_of_segment, 
        instance_type="ml.m5.xlarge",
        volume_size_in_gb=16,
        volume_kms_key=None,
        output_kms_key=None,
        max_runtime_in_seconds=86400,  # default is 24 hours(60*60*24)
        sagemaker_session=None,
        env=None,
        network_config=None
    )

train_step = ProcessingStep(
    "Model Training",
    processor=train_processor,
    job_name=execution_input["TrainJobName"],
    inputs=train_inputs,
    outputs=train_outputs,
    container_arguments=['--num-of-dataset', str(num_of_segment)],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/train.py"],
    wait_for_completion=True,
)

### バッチ推論 Step の作成

バッチ推論 Steo もモデル学習 Step と同様にファイルを各インスタンスに分散させて処理を行うため、ProcessingInput で `s3_data_distribution_type='ShardedByS3Key'` を使用しています。

In [None]:
pred_timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

code_path = '/opt/ml/processing/input/code'
input_dir = '/opt/ml/processing/input/data'
output_dir = '/opt/ml/processing/output'

SCRIPT_LOCATION = "code/pred"

code_s3_path = sagemaker_session.upload_data(
    SCRIPT_LOCATION,
    bucket=bucket_name,
    key_prefix=os.path.join(prefix, SCRIPT_LOCATION, pred_timestamp),
)

pred_inputs = [
    ProcessingInput(
        input_name='code',
        source=code_s3_path,
        destination=code_path),
    ProcessingInput(
        source=execution_input["PredInput"],
        destination=input_dir,
        input_name="data",
        s3_data_distribution_type='ShardedByS3Key'
    )
]

pred_outputs = [
    ProcessingOutput(
        source=output_dir,
        destination=execution_input["PredOutput"],
        output_name="result",
    )
]

pred_processor = Processor(
        role=role,
        image_uri=train_repository_uri,
#         entrypoint=["python3", f"{code_path}/pred.py"],
        instance_count=num_of_segment, 
        instance_type="ml.m5.xlarge",
        volume_size_in_gb=16,
        volume_kms_key=None,
        output_kms_key=None,
        max_runtime_in_seconds=86400,  # default is 24 hours(60*60*24)
        sagemaker_session=None,
        env=None,
        network_config=None
    )

pred_step = ProcessingStep(
    "Batch Inference",
    processor=train_processor,
    job_name=execution_input["PredJobName"],
    inputs=pred_inputs,
    outputs=pred_outputs,
    container_arguments=execution_input["PredArgs"],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/pred.py"],
    wait_for_completion=True,
)

### 後処理 Step の作成

In [None]:
post_timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

code_path = '/opt/ml/processing/input/code'
input_dir = '/opt/ml/processing/input/data'
output_dir = '/opt/ml/processing/output'

SCRIPT_LOCATION = "code/post"

code_s3_path = sagemaker_session.upload_data(
    SCRIPT_LOCATION,
    bucket=bucket_name,
    key_prefix=os.path.join(prefix, SCRIPT_LOCATION, post_timestamp),
)

post_inputs = [
    ProcessingInput(
        input_name='code',
        source=code_s3_path,
        destination=code_path),
    ProcessingInput(
        source=execution_input["PostInput"],
        destination=input_dir,
        input_name="data"
    )
]

post_outputs = [
    ProcessingOutput(
        source=output_dir,
        destination=execution_input["PostOutput"],
        output_name="result",
    )
]

post_processor = Processor(
        role=role,
        image_uri=prep_repository_uri,
#         entrypoint=["python3", f"{code_path}/post.py"],
        instance_count=1, 
        instance_type="ml.m5.xlarge",
        volume_size_in_gb=16,
        volume_kms_key=None,
        output_kms_key=None,
        max_runtime_in_seconds=86400,  # default is 24 hours(60*60*24)
        sagemaker_session=None,
        env=None,
        network_config=None
    )

post_step = ProcessingStep(
    "Post Process",
    processor=train_processor,
    job_name=execution_input["PostJobName"],
    inputs=post_inputs,
    outputs=post_outputs,
    container_arguments=[
        '--num-of-dataset', str(num_of_segment)
    ],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/post.py"],
    wait_for_completion=True,
)

### 状況通知 Lambda Step の作成

ML パイプラインの状況を通知するための Lambda Step を作成します。Lambda 関数自体はすでに作成しているため、LambdaStep でその関数名を指定します。

In [None]:
from stepfunctions.steps.states import Retry

status_list = [
    "Training Completed.",
    "Postprocess Completed."
]
lambda_train_step = stepfunctions.steps.compute.LambdaStep(
    "Train Notification",
    parameters={
        "FunctionName": lambda_notification_function_name,
        "Payload": {
            "status": status_list[0],
            "param.$": "$"
        },
    },
)
lambda_train_step.add_retry(
    Retry(error_equals=["States.TaskFailed"], interval_seconds=15, max_attempts=2, backoff_rate=4.0)
)

lambda_post_step = stepfunctions.steps.compute.LambdaStep(
    "Post Notification",
    parameters={
        "FunctionName": lambda_notification_function_name,
        "Payload": {
            "status": status_list[1],
            "param.$": "$"
        },
    },
)
lambda_post_step.add_retry(
    Retry(error_equals=["States.TaskFailed"], interval_seconds=15, max_attempts=2, backoff_rate=4.0)
)

### Fail State の作成

ML パイプラインが失敗したら SNS Topic に通知するために、Catch State と SnsPublishStep を作成します。SnsPublishStep の代わりに、先ほど作成した状況通知 LambdaStep を使用してもかまいません。SNS Topic に送信するメッセージをカスタマイズする必要がなければ SnsPublishStep を使うと便利です。

In [None]:
workflow_name = project_name+"-" + user_name
failed_state = stepfunctions.steps.states.Fail(
    "ML Workflow failed", cause="SageMakerJobFailed"
)

sns_step = stepfunctions.steps.service.SnsPublishStep(
    "Error Message",
    comment = "error",
    parameters={
        "TopicArn": sns_notification_topic_arn,
        "Message.$": "$",
        "Subject": f"Workflow Error: {workflow_name}"
    }
)
sns_step.next(failed_state)

catch_state = stepfunctions.steps.states.Catch(
    error_equals=["States.ALL"],
    next_step=sns_step,
)

prep_step.add_catch(catch_state)
train_step.add_catch(catch_state)
pred_step.add_catch(catch_state)
post_step.add_catch(catch_state)

### Step Functions Workflow の作成

作成した各 Step を連結して Workflow を作成します。

In [None]:
from stepfunctions.workflow import Workflow

workflow_graph = Chain([prep_step, train_step, lambda_train_step, pred_step, post_step, lambda_post_step])


branching_workflow = Workflow(
    name=workflow_name,
    definition=workflow_graph,
    role=workflow_execution_role,
)

branching_workflow.create()
branching_workflow.update(workflow_graph)

### Step Functions Workflow の動作確認

パラメータを指定して Step Functions Workflow を実行します。表示されたリンクから AWS コンソールに移動して今実行した Workflow を確認してみましょう。

In [None]:
sfn_timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')
job_name_prefix = project_name + '-' + user_name
data_timestamp =  datetime.now(JST).strftime('%Y%m')
prep_job_name = job_name_prefix + '-prep-' + sfn_timestamp
train_job_name = job_name_prefix + '-train-' + sfn_timestamp
pred_job_name = job_name_prefix + '-pred-' + sfn_timestamp
post_job_name = job_name_prefix + '-post-' + sfn_timestamp

prep_output_data = f's3://{bucket_name}/{prefix}/prep/{prep_job_name}'
train_output_data = f's3://{bucket_name}/{prefix}/train/{train_job_name}'
pred_output_data = f's3://{bucket_name}/{prefix}/pred/{pred_job_name}'
post_output_data = f's3://{bucket_name}/{prefix}/post/{post_job_name}'

pred_args = [
        '--num-of-dataset', str(num_of_segment),
        '--metrics-threshold', '10000',
        '--latest-model-path', train_output_data,
        '--previous-model-path', train_output_data
    ]

execution = branching_workflow.execute(
    inputs={
        "TimeStamp": data_timestamp,
        "PrepJobName": prep_job_name,
        "PrepInput": raw_data_s3_path,
        "PrepOutput": prep_output_data,
        "TrainJobName": train_job_name,
        "TrainInput": prep_output_data + '/train',
        "TrainOutput": train_output_data,
        "PredJobName": pred_job_name,
        "PredArgs": pred_args,
        "PredInput": prep_output_data + '/pred',
        "PredOutput": pred_output_data,
        "PostJobName": post_job_name,
        "PostInput": pred_output_data,
        "PostOutput": post_output_data,
    }
)
from IPython.display import display, Markdown
display(Markdown(f"<a href=\"https://{region}.console.aws.amazon.com/states/home?region={region}#/executions/details/{execution.execution_arn}\" target=\"_blank\">Step Functions のコンソール</a>"))

## AWS Step Functions Workflow 実行用 Lambda 関数の準備

前のセルでは、手動で Step Functions Workflow を実行しましたが、実際は S3 バケットへのファイルアップロードをトリガーに実行したいので、そのための Lambda 関数を作成します。

まずは、Lambda 関数で使用する IAM Policy と Role を作成します。

In [None]:
lambda_startsfn_function_name = project_name + '-startsfn-' + user_name
lambda_startsfn_policy_name = lambda_startsfn_function_name + '-policy'
lambda_startsfn_role_name = lambda_startsfn_function_name + '-role'
lambda_startsfn_json_name = 'lambda-startsfn-policy.json'

assume_role_policy = {
  "Version": "2012-10-17",
  "Statement": [{"Sid": "","Effect": "Allow","Principal": {"Service":["lambda.amazonaws.com"]},"Action": "sts:AssumeRole"}]
}
lambda_startsfn_role_arn = create_policy_role(lambda_startsfn_policy_name, lambda_startsfn_json_name,
                   lambda_startsfn_role_name, assume_role_policy)
sleep(10) # wait until IAM is created

Lambda 関数で使用するソースコードとライブラリを zip 圧縮します。

In [None]:
prepare_lambda_resource(lambda_startsfn_function_name, 'code/start-pipeline')

Step Functions Workflow 実行時に使用するパラメタを環境変数に設定して Lambda 関数を作成します。

In [None]:
envs = {
    'SNS_TOPIC_ARN': sns_notification_topic_arn,
    'STEPFUNCTION_ARN': branching_workflow.state_machine_arn,
    'ECR_PREP_REPO_URI': prep_repository_uri.split(':')[0],
    'ECR_TRAIN_REPO_URI': train_repository_uri.split(':')[0],
    'BUCKET_NAME': bucket_name,
    'SAGEMAKER_ROLE_ARN': role,
    'PREFIX': prefix,
}
lambda_startsfn_function_arn = create_lambda_function(lambda_startsfn_function_name,
                                                   lambda_startsfn_function_name,
                                                   lambda_startsfn_role_arn,
                                                   'index',
                                                   envs,
                                                   py_version='python3.8')
sleep(10)

## S3 へのファイル作成を AWS Step Functions Workflow 実行用 Lambda 関数のトリガーに設定

データアップロード用の S3 バケットの prefix 以下に xxx.run というファイルが作成されたらトリガーを発行して Lambda 関数を実行するように設定します。xxx.run ファイルはデータ準備の入力として使用するファイルと同じ場所に作成される想定です。作成されるファイルの拡張子を `suffix` で指定し、ファイルの作成を監視するパスを `prefix` で指定しています。

In [None]:
config = {
    'LambdaFunctionConfigurations': [{
        'LambdaFunctionArn': lambda_startsfn_function_arn,
        'Events': ['s3:ObjectCreated:*'],
        'Filter': {
            'Key': {
                'FilterRules': [
                    {
                        'Name': 'suffix',
                        'Value': '.run'
                    },
                    {
                        'Name': 'prefix',
                        'Value': prefix
                    }
                ]
            }
        }               
    }]
}
response = lambda_client.add_permission(
     FunctionName=lambda_startsfn_function_arn,
     StatementId='1',
     Action='lambda:InvokeFunction',
     Principal='s3.amazonaws.com',
     SourceArn=f'arn:aws:s3:::{bucket_name_trigger}',
     SourceAccount=account_id
 )
sleep(10)
response3 = s3_client.put_bucket_notification_configuration(
                            Bucket=bucket_name_trigger,
                            NotificationConfiguration=config)
sleep(5)

## 動作確認

test.run を S3 バケットにアップロードして、Step Functions Workflow が実行されるか確認してみましょう。

In [None]:
!aws s3 cp test.run s3://$bucket_name_trigger/$prefix/rawdata/

## リソースの削除

今回作成したリソースは基本的に利用時のみに料金が発生するものですが、意図しない課金を防ぐために、不要になったらこのノートブックで作成したリソースを削除しましょう。

### Step Functions Workflow の削除

In [None]:
workflow_list = Workflow.list_workflows()
workflow_arn = [d['stateMachineArn'] for d in workflow_list  if d['name']==workflow_name][0]
sfn_workflow = Workflow.attach(workflow_arn)
try:
    sfn_workflow.delete()
    print('Delete:', workflow_name)
except Exception as e:
    print(e)

### Lambda 関数の削除

In [None]:
lambda_function_list = list(set(lambda_function_list))
for f in lambda_function_list:
    lambda_client.delete_function(FunctionName=f)

### Amazon ECR リポジトリの削除

In [None]:
container_image_list = [
    train_repository_uri.split('/')[1].split(':')[0],
    prep_repository_uri.split('/')[1].split(':')[0]
]
for i in container_image_list:
    try:
        ecr_client.delete_repository(
            repositoryName=i,
            force=True
        )
        print('Delete:', i)
    except Exception as e:
        print(e)
        pass

### SNS Topic の削除

In [None]:
response = sns_client.delete_topic(
    TopicArn=sns_notification_topic_arn
)

### S3 バケットの削除

S3 バケットを削除したい場合は、以下のセルのコメントアウトを外してから実行してバケットを空にしてください。その後、S3 のコンソールからバケットの削除を実行してください。

In [None]:
def delete_all_keys_v2(bucket, prefix, dryrun=False):
    contents_count = 0
    marker = ''

    while True:
        if marker == '':
            response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
        else:
            response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, ContinuationToken=marker)

        if 'Contents' in response:
            contents = response['Contents']
            contents_count = contents_count + len(contents)
            for content in contents:
                if not dryrun:
                    print("Deleting: s3://" + bucket + "/" + content['Key'])
                    s3_client.delete_object(Bucket=bucket, Key=content['Key'])
                else:
                    print("DryRun: s3://" + bucket + "/" + content['Key'])

        if 'NextContinuationToken' in response:
            marker = response['NextContinuationToken']
        else:
            break

    print(contents_count, 'file were deleted.')

delete_all_keys_v2(bucket_name, '')
delete_all_keys_v2(bucket_name_trigger, '')

### IAM Role と Policy の削除

In [None]:
role_name_list = list(set(role_name_list))
policy_arn_list = list(set(policy_arn_list))

for r in role_name_list:
    try:
        detach_role_policies(r)
        iam_client.delete_role(RoleName=r)
        print('IAM Role 削除完了:', r)
    except Exception as e:
        print(e)
        pass

for p in policy_arn_list:
    try:
        iam_client.delete_policy(PolicyArn=p)
        print('IAM Policy 削除完了:', p)
    except Exception as e:
        print(e)

# ノートブックインスタンスにアタッチしたポリシーの削除
sagemaker_policy_arn = get_policy_arn(sagemaker_policy_name)
response = iam_client.detach_role_policy(
    RoleName=role.split('/')[2],
    PolicyArn=sagemaker_policy_arn
)
print('\nこちらの IAM Policy は手動で削除してください。', sagemaker_policy_arn)

---
## Amazon SageMaker Job 単体テスト
### Amazon SageMaker Processing でデータ準備

このサンプルノートブックでは、Processing Job で任意のセットの学習データを作成して S3 にアップロードします。ここで設定した学習データのセット数に応じて後段の Training の並列数が変わります。

In [None]:
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput

num_of_segment = 2

prep_timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

prep_job_name = project_name + '-' + user_name + '-prep-' + prep_timestamp
prep_input_data = raw_data_s3_path
prep_output_data = f's3://{bucket_name}/{prefix}/prep/{prep_job_name}'
code_path = '/opt/ml/processing/input/code'
input_dir = '/opt/ml/processing/input/data'
output_dir = '/opt/ml/processing/output'

SCRIPT_LOCATION = "code/prep"

code_s3_path = sagemaker_session.upload_data(
    SCRIPT_LOCATION,
    bucket=bucket_name,
    key_prefix=os.path.join(prefix, SCRIPT_LOCATION, prep_timestamp),
)

prep_inputs = [
    ProcessingInput(
        input_name='code',
        source=code_s3_path,
        destination=code_path),
    ProcessingInput(
        source=prep_input_data,
        destination=input_dir,
        input_name="data"
    )
]

prep_outputs = [
    ProcessingOutput(
        source=output_dir,
        destination=prep_output_data,
        output_name="result",
    )
]

prep_processor = Processor(
        role=role,
        image_uri=prep_repository_uri,
        entrypoint=["python3", f"{code_path}/prep.py"],
        instance_count=1, 
        instance_type="ml.m5.xlarge",
        volume_size_in_gb=16,
        volume_kms_key=None,
        output_kms_key=None,
        max_runtime_in_seconds=86400,  # default is 24 hours(60*60*24)
        sagemaker_session=None,
        env=None,
        network_config=None
    )

prep_processor.run(
    job_name=prep_job_name,
    inputs=prep_inputs,
     outputs=prep_outputs,
    arguments=['--num-of-dataset', str(num_of_segment)],
    logs=False,
    wait=False
)
from IPython.display import display, Markdown
display(Markdown(f"<a href=\"https://s3.console.aws.amazon.com/s3/buckets/{bucket_name}?region={region}&prefix={prefix}/prep/{prep_job_name}/&showversions=false\" target=\"_blank\">準備したデータ (S3)</a>"))

In [None]:
# inputs1 = sagemaker_session.upload_data(path=data_dir, bucket=bucket, key_prefix=prefix+'/1')
# inputs2 = sagemaker_session.upload_data(path=data_dir, bucket=bucket, key_prefix=prefix+'/2')
# print(inputs1)
# print(inputs2)

### Amazon SageMaker Processing で AutoGluon モデルの並列学習

テータ準備の Processing Job が完了したら、準備されたデータを使ってモデルを学習します。テータ準備の Processing Job の出力データを入力として、`s3_data_distribution_type='ShardedByS3Key'` としてデータ並列（今回は1データ1インスタンス）でモデルの学習を実行します。

In [None]:
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput

train_timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

train_job_name = project_name + '-' + user_name + '-train-' + train_timestamp
train_input_data = prep_output_data + '/train'
train_output_data = f's3://{bucket_name}/{prefix}/train/{train_job_name}'
code_path = '/opt/ml/processing/input/code'
input_dir = '/opt/ml/processing/input/data'
output_dir = '/opt/ml/processing/output'

SCRIPT_LOCATION = "code/train"

code_s3_path = sagemaker_session.upload_data(
    SCRIPT_LOCATION,
    bucket=bucket_name,
    key_prefix=os.path.join(prefix, SCRIPT_LOCATION, train_timestamp),
)

train_inputs = [
    ProcessingInput(
        input_name='code',
        source=code_s3_path,
        destination=code_path),
    ProcessingInput(
        source=train_input_data,
        destination=input_dir,
        input_name="data",
        s3_data_distribution_type='ShardedByS3Key'
    )
]

train_outputs = [
    ProcessingOutput(
        source=output_dir,
        destination=train_output_data,
        output_name="result",
    )
]

train_processor = Processor(
        role=role,
        image_uri=train_repository_uri,
        entrypoint=["python3", f"{code_path}/train.py"],
        instance_count=num_of_segment, 
        instance_type="ml.m5.xlarge",
        volume_size_in_gb=16,
        volume_kms_key=None,
        output_kms_key=None,
        max_runtime_in_seconds=86400,  # default is 24 hours(60*60*24)
        sagemaker_session=None,
        env=None,
        network_config=None
    )

train_processor.run(
    job_name=train_job_name,
    inputs=train_inputs,
     outputs=train_outputs,
    arguments=['--num-of-dataset', str(num_of_segment)],
    logs=False,
    wait=False
)
from IPython.display import display, Markdown
display(Markdown(f"<a href=\"https://s3.console.aws.amazon.com/s3/buckets/{bucket_name}?region={region}&prefix={prefix}/train/{train_job_name}/&showversions=false\" target=\"_blank\">学習済みモデル (S3)</a>"))

### Amazon SageMaker Processing でバッチ推論

In [None]:
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput


pred_timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

pred_job_name = project_name + '-' + user_name + '-pred-' + pred_timestamp
pred_input_data = prep_output_data + '/pred'
pred_output_data = f's3://{bucket_name}/{prefix}/pred/{pred_job_name}'
code_path = '/opt/ml/processing/input/code'
input_dir = '/opt/ml/processing/input/data'
output_dir = '/opt/ml/processing/output'

SCRIPT_LOCATION = "code/pred"

code_s3_path = sagemaker_session.upload_data(
    SCRIPT_LOCATION,
    bucket=bucket_name,
    key_prefix=os.path.join(prefix, SCRIPT_LOCATION, pred_timestamp),
)

pred_inputs = [
    ProcessingInput(
        input_name='code',
        source=code_s3_path,
        destination=code_path),
    ProcessingInput(
        source=pred_input_data,
        destination=input_dir,
        input_name="data",
        s3_data_distribution_type='ShardedByS3Key'
    )
]

pred_outputs = [
    ProcessingOutput(
        source=output_dir,
        destination=pred_output_data,
        output_name="result",
    )
]

pred_processor = Processor(
        role=role,
        image_uri=train_repository_uri,
        entrypoint=["python3", f"{code_path}/pred.py"],
        instance_count=num_of_segment, 
        instance_type="ml.m5.xlarge",
        volume_size_in_gb=16,
        volume_kms_key=None,
        output_kms_key=None,
        max_runtime_in_seconds=86400,  # default is 24 hours(60*60*24)
        sagemaker_session=None,
        env=None,
        network_config=None
    )

pred_processor.run(
    job_name=pred_job_name,
    inputs=pred_inputs,
     outputs=pred_outputs,
    arguments=[
        '--num-of-dataset', str(num_of_segment),
        '--metrics-threshold', '10000',
        '--latest-model-path', train_output_data,
        '--previous-model-path', train_output_data
              ],
    logs=False,
    wait=False
)
from IPython.display import display, Markdown
display(Markdown(f"<a href=\"https://s3.console.aws.amazon.com/s3/buckets/{bucket_name}?region={region}&prefix={prefix}/pred/{pred_job_name}/&showversions=false\" target=\"_blank\">バッチ推論結果 (S3)</a>"))

### Amazon SageMaker Processing で後処理

In [None]:
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput

post_timestamp = datetime.now(JST).strftime('%Y%m%d-%H%M%S')

post_job_name = project_name + '-' + user_name + '-post-' + post_timestamp
post_input_data = pred_output_data
post_output_data = f's3://{bucket_name}/{prefix}/post/{post_job_name}'
code_path = '/opt/ml/processing/input/code'
input_dir = '/opt/ml/processing/input/data'
output_dir = '/opt/ml/processing/output'

SCRIPT_LOCATION = "code/post"

code_s3_path = sagemaker_session.upload_data(
    SCRIPT_LOCATION,
    bucket=bucket_name,
    key_prefix=os.path.join(prefix, SCRIPT_LOCATION, post_timestamp),
)

post_inputs = [
    ProcessingInput(
        input_name='code',
        source=code_s3_path,
        destination=code_path),
    ProcessingInput(
        source=post_input_data,
        destination=input_dir,
        input_name="data"
    )
]

post_outputs = [
    ProcessingOutput(
        source=output_dir,
        destination=post_output_data,
        output_name="result",
    )
]

post_processor = Processor(
        role=role,
        image_uri=prep_repository_uri,
        entrypoint=["python3", f"{code_path}/post.py"],
        instance_count=1, 
#         instance_type="ml.m5.xlarge",
        instance_type="local",
        volume_size_in_gb=16,
        volume_kms_key=None,
        output_kms_key=None,
        max_runtime_in_seconds=86400,  # default is 24 hours(60*60*24)
        sagemaker_session=None,
        env=None,
        network_config=None
    )

post_processor.run(
    job_name=post_job_name,
    inputs=post_inputs,
     outputs=post_outputs,
    arguments=['--num-of-dataset', str(num_of_segment)],
    logs=False,
    wait=False
)
from IPython.display import display, Markdown
display(Markdown(f"<a href=\"https://s3.console.aws.amazon.com/s3/buckets/{bucket_name}?region={region}&prefix={prefix}/post/{post_job_name}/&showversions=false\" target=\"_blank\">後処理データ (S3)</a>"))