## 機械学習パイプラインを構築した前処理、学習、推論の自動化

このノートブックでは、AWS StepFunctions を活用し、2 で活用した前処理や、学習、推論といったそれぞれのジョブを一つのパイプラインにまとめます。その際、エラーハンドリングを行う機構も構築します。ワークフローの定義を Python で記述できる [AWS StepFunctions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/) を使います。ワークフローは構築したパイプラインはそのまま AWS 上に構築することができ、CloudFormation のテンプレートとしてもエクスポートできます。

## AWS StepFunctions Data Science SDK のインストール

In [None]:
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

## データの準備、s3 へアップロード
今回も元のデータが S3 に保存されている状態からパイプラインを開始したいと思います。また、実行環境も前回と同様に前処理用のコンテナを準備氏、学習と推論用には AWS が提供している `scikit-learn` が事前にインストールされているコンテナを活用します。

### 使用するライブラリなどの読み込み

In [None]:
import boto3
import sagemaker
import io
import uuid
import logging

import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput

import stepfunctions
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep, TransformStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath

stepfunctions.set_stream_logger(level=logging.INFO)

AWS StepFunctions が各ジョブを実行する権限は、`TitanicPipelineAppStack-stepfunctionsworkflowexect` で定義されているので、AWS コンソールから確認して下記に ARN を記入下さい。

In [None]:
role = get_execution_role()
workflow_execution_role = '<Your StepFunctions Workflow Role>'

### S3 へのデータのアップロード

In [None]:
sagemaker_session = sagemaker.Session()
input_train = sagemaker_session.upload_data(path='./data/train.csv', key_prefix='kaggle-ml-pipeline/data')
input_test = sagemaker_session.upload_data(path='./data/test.csv', key_prefix='kaggle-ml-pipeline/data')

### データ前処理用のコンテナの準備

In [None]:
!docker build -t sagemaker-kaggle-titanic-preprocess ./scripts/preprocess

import boto3

# boto3の機能を使ってリポジトリ名に必要な情報を取得する
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name
tag = ':latest'

# SageMakerFullAccess を使っているから repository 名の中に sagemaker が含まれている必要がある
ecr_repository = f'sagemaker-kaggle-titanic-preprocess'
image_uri = f'{account_id}.dkr.ecr.{region}.amazonaws.com/{ecr_repository+tag}'

!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
 
# リポジトリの作成
# すでにある場合はこのコマンドは必要ない
!aws ecr create-repository --repository-name $ecr_repository
 
!docker build -t {ecr_repository} .
!docker tag {ecr_repository + tag} $image_uri
!docker push $image_uri

print(f'コンテナは {image_uri} へ登録されています。')

## ワークフロー定義の準備
ワークフローへ渡す設定について、スキーマを定義します。

In [None]:
# SageMaker へは学習ジョブ、モデル、エンドポイントへそれぞれユニークな名前を用います。 
execution_input = ExecutionInput(
    schema={
    'ModelName': str,
    'TrainPreprocessingJobName': str,
    'TrainingJobName': str, 
    'TestPreprocessingJobName': str,
    'TransformJobName': str
    }
)

### 学習ステップの定義
前回行った、前処理と学習と同様の内容を実施しますが、今回はノートブック経由で SDK にてジョブを発行するのではなく、それぞれのステップとして定義し、最後にワークフローへと統合していきます。前処理ジョブの定義は前回と同様です。

In [None]:
job_name = f'sagemaker-kaggle-preprocessing-train'
output_s3_path = 's3://' + sagemaker_session.default_bucket() + '/kaggle-ml-pipeline'

processing_input_dir = '/opt/ml/processing/input'
processing_code_dir = '/opt/ml/processing/input/code'
processing_output_dir = '/opt/ml/processing/output'


PREPROCESSING_SCRIPT_LOCATION = './scripts/preprocess/preprocess_script/preprocess.py'

input_code = sagemaker_session.upload_data(
    PREPROCESSING_SCRIPT_LOCATION,
    bucket=sagemaker_session.default_bucket(),
    key_prefix= 'kaggle-ml-pipeline/preprocess/code',
)

output_s3_path_preprocess = output_s3_path + '/preprocessed'

processor = ScriptProcessor(base_job_name=job_name,
                                   image_uri=image_uri,
                                   command=['python3'],
                                   role=role,
                                   instance_count=1,
                                   instance_type='ml.c5.xlarge'
                                  )

定義した前処理ジョブをワークフローに組み込めるようにステップとします。

In [None]:
from stepfunctions.steps import ProcessingStep

train_preprocess_step = ProcessingStep(
    'Preprocess for Training Step', 
    processor=processor,
    job_name=execution_input["TrainPreprocessingJobName"],
    inputs=[
        ProcessingInput(source=input_code, destination=processing_code_dir, input_name="code"),
        ProcessingInput(source=input_train, destination=processing_input_dir, input_name="train_data"),
    ],
    outputs=[ProcessingOutput(source=processing_output_dir, destination=output_s3_path_preprocess, output_name="processed_train_data")],
    container_arguments=[
                  '--data_type', 'train',
                  '--input_dir',processing_input_dir,
                  '--output_dir',processing_output_dir
                      ],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocess.py"]
)

学習ジョブの定義も前回と同様です。

In [None]:
from sagemaker.sklearn.estimator import SKLearn

output_s3_path_train = output_s3_path + '/train'

sklearn = SKLearn(
    entry_point='scripts/train/train.py',
    framework_version="0.23-1",
    train_instance_type="ml.m5.xlarge",
    output_path=output_s3_path_train,
    role=role)

train_input = output_s3_path_preprocess + '/train.csv'

学習ジョブもステップとして定義します。

In [None]:
from stepfunctions.steps import TrainingStep

training_step = TrainingStep(
    'Train Step', 
    estimator=sklearn,
    data={'train': train_input},
    job_name=execution_input['TrainingJobName']  
)

In [None]:
training_step.get_expected_model()

### 推論ステップの定義

学習済モデルを推論で活用するために、モデル作成ステップを定義します。前回は Jupyter ノートブック上で学習ジョブと推論ジョブを呼び出していた(学習用ジョブの Estimator インスタンスをそのまま推論でも活用していた)ため、モデル作成を明示的には行っていませんでしたが、今回はパイプラインの中で独立したステップとして実行するために、モデル作成ステップを追加しています。

In [13]:
model_step = steps.ModelStep(
    state_id='Save model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName']  
)

推論用の前処理は前回と同様であり、こちらもステップとして定義します。

In [14]:
job_name = f'sagemaker-kaggle-preprocessing-test'

processor = ScriptProcessor(base_job_name=job_name,
                                   image_uri=image_uri,
                                   command=['python3'],
                                   role=role,
                                   instance_count=1,
                                   instance_type='ml.c5.xlarge'
                                  )

In [15]:
test_preprocess_step = steps.ProcessingStep(
    'Preprocess for Test Step', 
    processor=processor,
    job_name=execution_input["TestPreprocessingJobName"],
    inputs=[
        ProcessingInput(source=input_code, destination=processing_code_dir, input_name="code"),
        ProcessingInput(source=input_test, destination=processing_input_dir, input_name="test_data"),
    ],
    outputs=[ProcessingOutput(source=processing_output_dir, destination=output_s3_path_preprocess, output_name="processed_test_data")],
    container_arguments=[
                  '--data_type', 'test',
                  '--input_dir',processing_input_dir,
                  '--output_dir',processing_output_dir
                      ],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocess.py"]
)

バッチ変換ジョブもステップとして定義します。

In [30]:
test_input = output_s3_path_preprocess + '/test.csv'

output_s3_path_inference = output_s3_path + '/batch_inference'
transformer = sklearn.transformer(instance_count=1,
                                  instance_type='ml.m5.xlarge',
                                  output_path=output_s3_path_inference)

transform_step = steps.TransformStep(
    'Transform Input Dataset',
    transformer=sklearn.transformer(
        instance_count=1,
        instance_type='ml.m5.large'
    ),
    job_name=execution_input['TransformJobName'],     
    model_name=execution_input['ModelName'], 
    data=test_input,
    content_type='text/csv'
)

各ステップでのジョブが失敗した場合に対応する `Catch` ステップを定義します。

In [31]:
failed_state_sagemaker_processing_failure = stepfunctions.steps.states.Fail(
    "ML Workflow failed", cause="SageMakerProcessingJobFailed"
)

catch_state_processing = stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=failed_state_sagemaker_processing_failure,
)

train_preprocess_step.add_catch(catch_state_processing)
training_step.add_catch(catch_state_processing)
test_preprocess_step.add_catch(catch_state_processing)
transform_step.add_catch(catch_state_processing)

各ステップをワークフローとして定義し、内容をレンダリングして確認します。

In [32]:
workflow_definition = steps.Chain([train_preprocess_step, training_step, model_step, test_preprocess_step, transform_step])

workflow = Workflow(
    name="titanic-ml-pipeline16",
    definition=workflow_definition,
    role=workflow_execution_role,
)

workflow.render_graph()

定義したワークフローは AWS CloudFormation のテンプレートとして出力できるため、Terraform など、サードパーティ OSS 含めた各種ツールでのデプロイにも流用できます。

In [33]:
template = workflow.get_cloudformation_template()



### Workflow の実行
`Chain` メソッドでまとめあげた各ステップを `workflow` として実行します。

In [34]:
workflow.create()

execution = workflow.execute(
    inputs={
        "ModelName": 'model-{}'.format(uuid.uuid1().hex),
        "TrainPreprocessingJobName": 'train-preprocess-job-{}'.format(uuid.uuid1().hex), 
        "TrainingJobName": 'training-job-{}'.format(uuid.uuid1().hex),
        "TestPreprocessingJobName": 'test-preprocess-job-{}'.format(uuid.uuid1().hex),
        "TransformJobName": 'transform-job-{}'.format(uuid.uuid1().hex),
    }
)

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m
[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m
