# AWS Step Functions Data Science SDK で Amazon SageMaker Training Job を並列実行する

## 目次

1. [背景](#1.-背景)
1. [セットアップ](#2.-セットアップ)
1. [データ](#3.-データ)
1. [SageMaker Estimator の準備](#4.-SageMaker-Estimator-の準備)
1. [AWS Step Functions の準備](#5.-AWS-Step-Functions-の準備)
1. [リソースの削除](#6.-リソースの削除)

---

## 1. 背景

AWS Step Functions は、機械学習パイプラインの構築でよく使われます。AWS Step Functions Data Science SDK を使うと、Python でパイプラインを作ることができるため、データサイエンティストが自身のユースケースに最適な構成を簡単に構築できます。Step Functions を使った基本的なパイプライン構築方法については [こちらのサンプルノートブック](https://github.com/aws-samples/aws-ml-jp/blob/main/mlops/step-functions-data-science-sdk/model-train-evaluate-compare/step_functions_mlworkflow_scikit_learn_data_processing_and_model_evaluation_with_experiments.ipynb) をご参照ください。 

本ノートブックは、**学習コードは共通で良いが複数の学習データを使ってそれぞれのモデルを学習させたい** ユースケースにピッタリなサンプルノートブックです。モデル学習の並列実行に Step Functions の [Map State](https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-map-state.html) を使用します。サンプルデータとしては MNIST を使用します。MNISTは、手書き文字の分類に広く使用されているデータセットです。 70,000個のラベル付きの28x28ピクセルの手書き数字のグレースケール画像で構成されています。 データセットは、60,000個のトレーニング画像と10,000個のテスト画像に分割されます。 手書きの数字 0から9の合計10のクラスがあります。 

<img src="workflow.png" width="50%">

学習スクリプトでは PyTorch を使用しています。SageMaker の PyTorch の詳細については、[sagemaker-pytorch-containers](https://github.com/aws/sagemaker-pytorch-containers) と [sagemaker-python-sdk](https://github.com/aws/sagemaker-python-sdk) のレポジトリをご参照ください。

---

## 2. セットアップ

SageMaker セッションを作成し、設定を開始します。

- 学習およびモデルデータに使用する S3 バケットとプレフィックスは、ノートブックインスタンス、トレーニング、およびホスティングと同じリージョン内にある必要があります。
- データへの学習およびホスティングアクセスを提供するために使用される IAM ロール arn を用います。 ノートブックインスタンス、学習インスタンス、および/またはホスティングインスタンスに複数のロールが必要な場合は、 `sagemaker.get_execution_role（）` を、適切な IAM ロール arn 文字列に置き換えてください。


In [None]:
import boto3
import sagemaker
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
iam_client = boto3.client('iam', region_name=region)

bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()
account_id = boto3.client('sts').get_caller_identity().get('Account')

project_name = 'sagemaker-parallel-training-map'
user_name = 'demo'
sagemaker_policy_name = project_name + '-' + user_name + '-policy'
prefix = f'sagemaker/{project_name}/{user_name}'

from IPython.display import display, Markdown
text = f"""
以下の手順で IAM 関連の設定を実施してください。
1. <a href=\"policy/sagemaker-policy.json\" target=\"_blank\">policy/sagemaker-policy.json</a> の中身をコピー
1. <a href=\"https://{region}.console.aws.amazon.com/iam/home#/policies$new?step=edit\" target=\"_blank\">IAM Policy の作成</a>をクリックし、**JSON** タブをクリックしてから手順1でコピーした JSON をペーストして右下の **次のステップ：タグ** ボタンをクリック
1. 右下の **次のステップ：確認** ボタンをクリック
1. **名前** に **{sagemaker_policy_name}** を記載して、右下の **ポリシーの作成** ボタンをクリック
1.  <a href=\"https://us-east-1.console.aws.amazon.com/sagemaker/home?region={region}#/notebook-instances\" target=\"_blank\">ノートブックインスタンス一覧</a> を開いてこのノートブックを実行しているノートブックをクリック
1. **アクセス許可と暗号化** の部分に表示されている IAM ロールへのリンクをクリック
1. **アクセス許可を追加** をクリックして **ポリシーをアタッチ** を選択
1. **その他の許可ポリシー** の検索ボックスで手順4 で作成した {sagemaker_policy_name} を検索して横にあるチェックボックスをオンにする
1. **ポリシーのアタッチ** をクリック
"""
display(Markdown(text))

このノートブックのコードは、以前からのノートブックインスタンスで実行する場合と、SageMaker Studio のノートブックで実行する場合とで挙動が異なります。以下のセルを実行することで、いまの実行環境が以前からのノートブックインスタンスなのか、SageMaker Studio のノートブックなのかを判定して、`on_studio`に記録します。この結果に基づいて、以降のノートブックの実行を次のように変更します。

- データセットの展開先を変更します。SageMaker Studio を利用する場合、home のディレクトリは EFS をマウントして実現されており、データセットを展開する際にやや時間を要します。そこで home 以外のところへ展開するようにします。

In [None]:
import os, json
NOTEBOOK_METADATA_FILE = "/opt/ml/metadata/resource-metadata.json"
if os.path.exists(NOTEBOOK_METADATA_FILE):
    with open(NOTEBOOK_METADATA_FILE, "rb") as f:
        metadata = json.loads(f.read())
        domain_id = metadata.get("DomainId")
        on_studio = True if domain_id is not None else False
print("Is this notebook runnning on Studio?: {}".format(on_studio))

## 3. データ
### 3.1. データの取得

In [None]:
!aws s3 cp s3://fast-ai-imageclas/mnist_png.tgz . --no-sign-request
if on_studio:
    !tar -xzf mnist_png.tgz -C /opt/ml --no-same-owner
else:
    !tar -xvzf  mnist_png.tgz

In [None]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch
import os

root_dir_studio = '/opt/ml'
data_dir = os.path.join(root_dir_studio,'data') if on_studio else 'data'
training_dir = os.path.join(root_dir_studio,'mnist_png/training') if on_studio else 'mnist_png/training'
test_dir = os.path.join(root_dir_studio,'mnist_png/testing') if on_studio else 'mnist_png/testing'

os.makedirs(data_dir, exist_ok=True)

training_data = datasets.ImageFolder(root=training_dir,
                            transform=transforms.Compose([
                            transforms.Grayscale(),
                            transforms.ToTensor(),
                            transforms.Normalize((0.1307,), (0.3081,))]))
test_data = datasets.ImageFolder(root=test_dir,
                            transform=transforms.Compose([
                            transforms.Grayscale(),
                            transforms.ToTensor(),
                            transforms.Normalize((0.1307,), (0.3081,))]))

training_data_loader = DataLoader(training_data, batch_size=len(training_data))
training_data_loaded = next(iter(training_data_loader))
torch.save(training_data_loaded, os.path.join(data_dir, 'training.pt'))

test_data_loader = DataLoader(test_data, batch_size=len(test_data))
test_data_loaded = next(iter(test_data_loader))
torch.save(test_data_loaded, os.path.join(data_dir, 'test.pt'))

### 3.2.データをS3にアップロードする
データセットを S3 にアップロードするには、 `sagemaker.Session.upload_data` 関数を使用します。 戻り値として入力した S3 のロケーションは、後で学習ジョブを実行するときに使用します。このサンプルでは、学習データを 2セット用意して並列で学習を実行するため、2ヶ所にデータをアップロードします。

In [None]:
inputs1 = sagemaker_session.upload_data(path=data_dir, bucket=bucket, key_prefix=prefix+'/1')
inputs2 = sagemaker_session.upload_data(path=data_dir, bucket=bucket, key_prefix=prefix+'/2')
print(inputs1)
print(inputs2)

## 4. SageMaker Estimator の準備

学習の条件を設定するため、Estimator クラスの子クラスの PyTorch オブジェクトを作成します。 ここでは、PyTorchスクリプト、IAMロール、および（ジョブごとの）ハードウェア構成を渡す PyTorch Estimator を定義しています。また合わせてローカルの `source_dir` を指定することで、依存するスクリプト群をコンテナにコピーして、学習時に使用することが可能です。

ハイパーパラメータは Step Functions 実行時に渡すため、ここでは設定しません。（ここで設定したものは後ですべて上書きされて無効になります）

In [None]:
from sagemaker.pytorch import PyTorch


instance_type = 'ml.m5.xlarge'

estimator = PyTorch(
                    entry_point="mnist.py",
                    role=role,
                    framework_version='1.8.0',
                    py_version='py3',
                    instance_count=1,
                    instance_type=instance_type,
#                     hyperparameters={
#                         'batch-size':128,
#                         'lr': 0.01,
#                         'epochs': 1,
#                         'backend': 'gloo'
#                     }
                   )

## 5. AWS Step Functions の準備

前の手順で作成した Estimator を使って Step Functions の TrainingStep を作成し、その後 Workflow を作成します。

In [None]:
import boto3
import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.steps import (
    Chain,
    ChoiceRule,
    ModelStep,
    ProcessingStep,
    TrainingStep,
    TransformStep,
)
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath
from stepfunctions.workflow import Workflow

### 5.1 IAM Role と Policy の作成

Step Functions の Workflow にセットする IAM Role を作成します。

In [None]:
from time import sleep

policy_arn_list = []
role_name_list = []

def get_policy_arn(policy_name):
    next_token = ''
    while True:
        if next_token == '':
            response = iam_client.list_policies(Scope='Local')
        else:
            response = iam_client.list_policies(Scope='Local', Marker=next_token)
        for content in response['Policies']:
            if policy_name == content['PolicyName']:
                return content['Arn']
        if 'Marker' in response:
            next_token = response['Marker']
        else:
            break

    return ''


def detach_role_policies(role_name):
    try:
        response = iam_client.list_attached_role_policies(
            RoleName=role_name,
        )
    except Exception as ex:
        print(ex)
    policies = response['AttachedPolicies']

    for p in policies:
        response = iam_client.detach_role_policy(
            RoleName=role_name,
            PolicyArn=p['PolicyArn']
        )

            
def create_role(role_name, assume_role_policy):
    try:
        response = iam_client.create_role(
            Path = '/service-role/',
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps(assume_role_policy),
            MaxSessionDuration=3600*12 # 12 hours
        )
        role_arn = response['Role']['Arn']
    except Exception as ex:
        if "EntityAlreadyExists" in str(ex):
            detach_role_policies(role_name)
            response = iam_client.delete_role(
                RoleName = role_name,
            )
            response = iam_client.create_role(
                Path = '/service-role/',
                RoleName = role_name,
                AssumeRolePolicyDocument = json.dumps(assume_role_policy),
                MaxSessionDuration=3600*12 # 12 hours
            )
            role_arn = response['Role']['Arn']
        else:
            print(ex)
    sleep(10)
    return role_arn


def create_policy(policy_name, policy_json_name):
    with open('policy/' + policy_json_name, 'r') as f:
        policy_json = json.load(f)
    try:
        response = iam_client.create_policy(
            PolicyName=policy_name,
            PolicyDocument=json.dumps(policy_json),
        )
        policy_arn = response['Policy']['Arn']
    except Exception as ex:
        if "EntityAlreadyExists" in str(ex):
            response = iam_client.delete_policy(
                PolicyArn=get_policy_arn(policy_name)
            )
            response = iam_client.create_policy(
                PolicyName=policy_name,
                PolicyDocument=json.dumps(policy_json),
            )
            policy_arn = response['Policy']['Arn']
    policy_arn_list.append(policy_arn)
    
    sleep(10)
    return policy_arn


def create_policy_role(policy_name, policy_json_name, role_name, assume_role_policy):

    role_arn = create_role(role_name, assume_role_policy)
    policy_arn = create_policy(policy_name, policy_json_name)

    sleep(5)
    response = iam_client.attach_role_policy(
        RoleName=role_name,
        PolicyArn=policy_arn
    )

    role_name_list.append(role_name)
    policy_arn_list.append(policy_arn)
    sleep(10)
    return role_arn

In [None]:
import json

step_functions_policy_name = project_name + '-stepfunctions-' + user_name + '-policy'
step_functions_role_name = project_name + '-stepfunctions-' + user_name + '-role'
step_functions_policy_json_name = 'stepfunctions-policy.json'

assume_role_policy = {
      "Version": "2012-10-17",
      "Statement": [{"Sid": "","Effect": "Allow","Principal": {"Service":"states.amazonaws.com"},"Action": "sts:AssumeRole"}]
    }

workflow_execution_role = create_policy_role(step_functions_policy_name, step_functions_policy_json_name,
                   step_functions_role_name, assume_role_policy)
workflow_execution_role

### 5.2 Step Functions Workflow 実行時のパラメータの準備

Step Functions Workflow 実行時に指定するパラメータのスキーマを定義します。すべての学習ジョブに共通でセットしたいパラメータは `ExecutionInput` で、学習ジョブごとに変えたいパラメータは `StepInput` で定義します。

In [None]:
execution_input = ExecutionInput(
    schema={
        "TrainingParameters": dict,
    }
)

step_input = StepInput(
    schema={
        "TrainingJobName": str,
        "TrainingInput": str,
        "TrainingOutput": str,
    }
)

### 5.3 TrainingStep の作成

TrainingStep を作成します。学習データ、学習ジョブ名、学習済みモデルを保存するパスを、先ほど作成した ExecutionInput と StepInput を使って設定します。 

In [None]:
training_step = steps.TrainingStep(
    "SageMaker Training Step",
    estimator=estimator,
    data={"training": sagemaker.TrainingInput(step_input["TrainingInput"])},
    job_name=step_input["TrainingJobName"],
    hyperparameters=execution_input["TrainingParameters"],
    output_data_config_path=step_input["TrainingOutput"],
    wait_for_completion=True,
)

### 5.4 Step Functions Workflow の作成

作成した TrainingStep を使って Map State を作成し、続けて Workflow を作成します。

In [None]:
from stepfunctions.workflow import Workflow

param_name = 'Jobs'

training_map = steps.states.Map(
    "SageMaker training Map",
    iterator=training_step,
    items_path=f'$.{param_name}',
)

workflow_graph = Chain([training_map])
workflow_name = project_name+"-" + user_name

branching_workflow = Workflow(
    name=workflow_name,
    definition=workflow_graph,
    role=workflow_execution_role,
)

branching_workflow.create()
branching_workflow.update(workflow_graph)

### 5.5 学習スクリプトを S3 にアップロード

並列実行される学習ジョブで使用する学習スクリプトを tar.gz で固めて S3 にアップロードします。

In [None]:
TRAINNING_SCRIPT_LOCATION = "source.tar.gz"
!tar zcvf $TRAINNING_SCRIPT_LOCATION mnist.py

train_code = sagemaker_session.upload_data(
    TRAINNING_SCRIPT_LOCATION,
    bucket=bucket,
    key_prefix=os.path.join(user_name, prefix, "code"),
)
train_code

### 5.6 Step Functions Workflow 実行時のパラメータの作成

Workflow 実行時に指定する、学習データのS3パス、学習ジョブ名、学習済みモデルを保存するS3パス、ハイパーパラメータなどの情報を作成します。Map State に渡す情報（今回は `input_params` ）は dict のリストとして作成します。

In [None]:
from datetime import datetime
from dateutil import tz

JST = tz.gettz('Asia/Tokyo')
timestamp = datetime.now(tz=JST).strftime('%Y%m%d-%H%M%S')

input_data_list = [inputs1, inputs2]

input_params = []

for i in range(2):
    id = str(i+1)
    job_name_prefix = f'sfn-map-test-{timestamp}'
    job_name = f'{job_name_prefix}-{id}'
    input_params.append(
        {
            'TrainingJobName': job_name,
            'TrainingInput': input_data_list[i],
            'TrainingOutput':  f's3://{bucket}/{job_name_prefix}/output/result/{id}'
        }
    )
    
input_params_dict = {}
input_params_dict['TrainingParameters'] = {
    "sagemaker_program": "mnist.py",
    "sagemaker_submit_directory": train_code,
    'batch-size':'128',
    'lr': '0.01',
    'epochs': '1',
    'backend': 'gloo'}
input_params_dict[param_name] = input_params
input_params_dict

### 5.7 Step Functions Workflow の実行

作成したパラメータを使って Step Functions Workflow を実行します。表示されたリンクから AWS コンソールに移動して今実行した Workflow を確認してみましょう。

In [None]:
execution = branching_workflow.execute(
    inputs=input_params_dict
)
from IPython.display import display, Markdown
display(Markdown(f"<a href=\"https://{region}.console.aws.amazon.com/states/home?region={region}#/statemachines/view/arn:aws:states:us-east-1:{account_id}:stateMachine:{workflow_name}\" target=\"_blank\">Step Functions のコンソール</a>"))

## 6. リソースの削除

このノートブックで作成したリソースを削除しましょう。

### Step Functions Workflow の削除

In [None]:
workflow_list = Workflow.list_workflows()
workflow_arn = [d['stateMachineArn'] for d in workflow_list  if d['name']==workflow_name][0]
sfn_workflow = Workflow.attach(workflow_arn)
try:
    sfn_workflow.delete()
    print('Delete:', workflow_name)
except Exception as e:
    print(e)

### IAM Role と Policy の削除

In [None]:
role_name_list = list(set(role_name_list))
policy_arn_list = list(set(policy_arn_list))

for r in role_name_list:
    try:
        detach_role_policies(r)
        iam_client.delete_role(RoleName=r)
        print('IAM Role 削除完了:', r)
    except Exception as e:
        print(e)
        pass

for p in policy_arn_list:
    try:
        iam_client.delete_policy(PolicyArn=p)
        print('IAM Policy 削除完了:', p)
    except Exception as e:
        print(e)

# ノートブックインスタンスにアタッチしたポリシーの削除
sagemaker_policy_arn = get_policy_arn(sagemaker_policy_name)
response = iam_client.detach_role_policy(
    RoleName=role.split('/')[2],
    PolicyArn=sagemaker_policy_arn
)
print('\nこちらの IAM Policy は手動で削除してください。', sagemaker_policy_arn)