# Amazon SageMaker Processing と AWS Step Functions Data Science SDK で機械学習ワークフローを構築する (SageMaker Studio)

Amazon SageMaker Processing を使うと、データの前/後処理やモデル評価のワークロードを Amazon SageMaker platform 上で簡単に実行することができます。Processingジョブは Amazon Simple Storage Service (Amazon S3) から入力データをダウンロードし、処理結果を Amazon S3 にアップロードします。

Step Functions SDK は AWS Step Function と Amazon SageMaker を使って、データサイエンティストが機械学習ワークフローを簡単に作成して実行するためのものです。詳しい情報は以下のドキュメントをご参照ください。

* [AWS Step Functions](https://aws.amazon.com/step-functions/)
* [AWS Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io)

AWS Step Functions Data Science SDK の SageMaker Processing Step [ProcessingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/sagemaker.html#stepfunctions.steps.sagemaker.ProcessingStep) によって、AWS Step Functions ワークフローで実装された Sageaker Processing を機械学習エンジニアが直接システムに統合することができます。

このノートブックは、SageMaker Processing Job を使ってデータの前処理、モデルの学習、モデルの精度評価の機械学習ワークフローを AWS Step Functions Data Science SDK を使って作成する方法をご紹介します。大まかな流れは以下の通りです。

1. AWS Step Functions Data Science SDK の `ProcessingStep` を使ってデータの前処理、特徴量エンジニアリング、学習用とテスト用への分割を行う scikit-learn スクリプトを実行する SageMaker Processing Job を実行
1. AWS Step Functions Data Science SDK の `TrainingStep` を使って前処理された学習データを使ったモデルの学習を実行
1. AWS Step Functions Data Science SDK の `ProcessingStep` を使って前処理したテスト用データを使った学習済モデルの評価を実行
1. AWS Step Functions Data Science SDK の `LambdaStep`を使って最新のモデルと過去のモデルの評価指標の比較を実行


このノートブックで使用するデータは [Census-Income KDD Dataset](https://archive.ics.uci.edu/ml/datasets/Census-Income+%28KDD%29) です。このデータセットから特徴量を選択し、データクレンジングを実施し、二値分類モデルの利用できる形にデータを変換し、最後にデータを学習用とテスト用に分割します。このノートブックではロジスティック回帰モデルを使って、国勢調査の回答者の収入が 5万ドル以上か 5万ドル未満かを予測します。このデータセットはクラスごとの不均衡が大きく、ほとんどのデータに 5万ドル以下というラベルが付加されています。

## Setup

このノートブックを実行するのに必要なライブラリをインストールします。

In [None]:
# Import the latest sagemaker, stepfunctions and boto3 SDKs
import sys

!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker==2.50.0"
!{sys.executable} -m pip install -qU "stepfunctions==2.0.0"
!{sys.executable} -m pip install sagemaker-experiments
!{sys.executable} -m pip show sagemaker stepfunctions

### 必要なモジュールのインポート

In [None]:
import io
import logging
import os
import random
import time

import boto3
import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import (
    Chain,
    ChoiceRule,
    ModelStep,
    ProcessingStep,
    TrainingStep,
    TransformStep,
)
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath
from stepfunctions.workflow import Workflow

import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import image_uris
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.s3 import S3Uploader
from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker.analytics import ExperimentAnalytics
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker
import time

# SageMaker Session
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name

# SageMaker Execution Role
# You can use sagemaker.get_execution_role() if running inside sagemaker's notebook instance
role = get_execution_role()

account_id = boto3.client('sts').get_caller_identity().get('Account')
bucket = sagemaker_session.default_bucket()

次に、ノートブックから Step Functions を実行するための IAM ロール設定を行います。

### ノートブックインスタンスの IAM ロールに権限を追加

https://sagemaker-immersionday.workshop.aws/ja/lab3/option2.html に従って、

1. AWS CodeBuild の信頼関係を追加
1. https://github.com/aws-samples/amazon-sagemaker-immersion-day/blob/master/iam-policy-sm-cb.txt のポリシーを IAM ロールに追加

続けて、IAM ロールの画面で
1.  `AWSStepFunctionsFullAccess` のポリシーも追加します。

次に Step Functions で使用する実行ロールを作成します。

### Step Functions の実行ロールの作成

 作成した Step Functions ワークフローは、AWS の他のサービスと連携するための IAM ロールを必要とします。

1. [IAM console](https://console.aws.amazon.com/iam/) にアクセス
2. 左側のメニューの **ロール** を選択し **ロールの作成** をクリック
3. **ユースケースの選択** で **Step Functions** をクリック
4. **次のステップ：アクセス権限** **次のステップ：タグ** **次のステップ：確認**をクリック
5. **ロール名** に `AmazonSageMaker-StepFunctionsWorkflowExecutionRole` と入力して **ロールの作成** をクリック

次に、作成したロールに AWS マネージド IAM ポリシーをアタッチします。

1. [IAM console](https://console.aws.amazon.com/iam/) にアクセス
2. 左側のメニューの **ロール** を選択
3. 先ほど作成した `AmazonSageMaker-StepFunctionsWorkflowExecutionRole`を検索
4. **ポリシーをアタッチします** をクリックして `CloudWatchEventsFullAccess` を検索
5. `CloudWatchEventsFullAccess` の横のチェックボックスをオンにして **ポリシーのアタッチ** をクリック

次に、別の新しいポリシーをロールにアタッチします。ベストプラクティスとして、以下のステップで特定のリソースのみのアクセス権限とこのサンプルを実行するのに必要なアクションのみを有効にします。

1. 左側のメニューの **ロール** を選択
1. 先ほど作成した `AmazonSageMaker-StepFunctionsWorkflowExecutionRole`を検索
1. **ポリシーをアタッチします** をクリックして **ポリシーの作成** をクリック
1. **JSON** タブをクリックして以下の内容をペースト<br>
NOTEBOOK_ROLE_ARN の部分をノートブックインスタンスで使用している IAM ロールの ARN に置き換えてください。

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:DescribeRule",
                "events:PutRule"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "NOTEBOOK_ROLE_ARN",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": [
                "batch:DescribeJobs",
                "batch:SubmitJob",
                "batch:TerminateJob",
                "dynamodb:DeleteItem",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "ecs:DescribeTasks",
                "ecs:RunTask",
                "ecs:StopTask",
                "glue:BatchStopJobRun",
                "glue:GetJobRun",
                "glue:GetJobRuns",
                "glue:StartJobRun",
                "lambda:InvokeFunction",
                "sagemaker:CreateEndpoint",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:CreateHyperParameterTuningJob",
                "sagemaker:CreateModel",
                "sagemaker:CreateProcessingJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:CreateTransformJob",
                "sagemaker:DeleteEndpoint",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DescribeHyperParameterTuningJob",
                "sagemaker:DescribeProcessingJob",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:DescribeTransformJob",
                "sagemaker:ListProcessingJobs",
                "sagemaker:ListTags",
                "sagemaker:StopHyperParameterTuningJob",
                "sagemaker:StopProcessingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:StopTransformJob",
                "sagemaker:UpdateEndpoint",
                "sns:Publish",
                "sqs:SendMessage"
            ],
            "Resource": "*"
        }
    ]
}
```

5. **次のステップ：タグ** **次のステップ：確認**をクリック
6. **名前** に `AmazonSageMaker-StepFunctionsWorkflowExecutionPolicy` と入力して **ポリシーの作成** をクリック
7. 左側のメニューで **ロール** を選択して `AmazonSageMaker-StepFunctionsWorkflowExecutionRole` を検索
8. **ポリシーをアタッチします** をクリック
9. 前の手順で作成した `AmazonSageMaker-StepFunctionsWorkflowExecutionPolicy` ポリシーを検索してチェックボックスをオンにして **ポリシーのアタッチ** をクリック
11. AmazonSageMaker-StepFunctionsWorkflowExecutionRole の **Role ARN** をコピーして以下のセルにペースト

In [None]:
# paste the AmazonSageMaker-StepFunctionsWorkflowExecutionRole ARN from above
workflow_execution_role = "arn:aws:iam::"+account_id+":role/AmazonSageMaker-StepFunctionsWorkflowExecutionRole"

### Step Functions ワークフロー実行時の入力スキーマ作成

Step Functions ワークフローを実行する際に、パラメタなどを引数として渡すことができます。ここではそれらの引数のスキーマを作成します。

In [None]:
# SageMaker expects unique names for each job, model and endpoint.
# If these names are not unique the execution will fail. Pass these
# dynamically for each execution using placeholders.
execution_input = ExecutionInput(
    schema={
        "PreprocessingJobName": str,
        "PreprocessingInputData": str,
        "PreprocessingOutputDataTrain": str,
        "PreprocessingOutputDataTest": str,
        "TrainingJobName": str,
        "TrainingParameters": dict,
        "TrainingOutputModel": str,
        "ExperimentName": str,
        "EvaluationProcessingJobName": str,
        "EvaluationProcessingOutput": str,
        "EvaluationExperimentArgs": list,
    }
)

### Amazon SageMaker Experiments のセットアップ

このノートブックでは、モデルの評価メトリクスを記録するために Amazon SageMaker Experiments を使用します。`experiment_name` に任意の Experiment 名をセットして以下のセルを実行してください。

In [None]:
experiment_name = "sfn-evaluate-model"

# create the experiment if it doesn't exist
try:
    experiment_evaluate = Experiment.load(experiment_name=experiment_name)
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        experiment_evaluate = Experiment.create(
            experiment_name=experiment_name, 
            description="model evaluation", 
            sagemaker_boto_client=boto3.client('sagemaker'))

print(experiment_evaluate.experiment_name)

## データの前処理と特徴量エンジニアリング

データクレンジング 、前処理、特徴量エンジニアリングのスクリプトの前に、データセットの初めの 20行をのぞいてみましょう。ターゲット変数は `income` 列です。選択する特徴量は `age`, `education`, `major industry code`, `class of worker`, `num persons worked for employer`, `capital gains`, `capital losses`,  `dividends from stocks` です。

In [None]:
import pandas as pd

input_data = "s3://sagemaker-sample-data-{}/processing/census/census-income.csv".format(region)
df = pd.read_csv(input_data, nrows=10)
df.head(n=10)

scikit-learn の前処理スクリプトを実行するために `SKLearnProcessor`を作成します。これは、SageMaker が用意している scikit-learn のコンテナイメージを使って Processing ジョブを実行するためのクラスです。

In [None]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    max_runtime_in_seconds=1200,
)

以下のセルを実行すると `preprocessing.py` が作成されます。これは前処理のためのスクリプトです。以下のセルを書き換えて実行すれば、`preprocessing.py` が上書き保存されます。このスクリプトでは、以下の処理が実行されます。

* 重複データやコンフリクトしているデータの削除
* ターゲット変数 `income` 列をカテゴリ変数から 2つのラベルを持つ列に変換
* `age` と `num persons worked for employer` をビニングして数値からカテゴリ変数に変換
* 連続値である`capital gains`, `capital losses`, `dividends from stocks` を学習しやすいようスケーリング
* `education`, `major industry code`, `class of worker`を学習しやすいようエンコード
* データを学習用とテスト用に分割し特徴量とラベルの値をそれぞれ保存

学習スクリプトでは、前処理済みの学習用データとラベル情報を使用してモデルを学習します。また、モデル評価スクリプトでは学習済みモデルと前処理済みのテスト用データトラベル情報を使用してモデルを評価します。

In [None]:
%%writefile preprocessing.py

import argparse
import os
import warnings

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelBinarizer, KBinsDiscretizer
from sklearn.preprocessing import PolynomialFeatures
from sklearn.compose import make_column_transformer

from sklearn.exceptions import DataConversionWarning

warnings.filterwarnings(action="ignore", category=DataConversionWarning)


columns = [
    "age",
    "education",
    "major industry code",
    "class of worker",
    "num persons worked for employer",
    "capital gains",
    "capital losses",
    "dividends from stocks",
    "income",
]
class_labels = [" - 50000.", " 50000+."]


def print_shape(df):
    negative_examples, positive_examples = np.bincount(df["income"])
    print(
        "Data shape: {}, {} positive examples, {} negative examples".format(
            df.shape, positive_examples, negative_examples
        )
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    print("Received arguments {}".format(args))

    input_data_path = os.path.join("/opt/ml/processing/input", "census-income.csv")

    print("Reading input data from {}".format(input_data_path))
    df = pd.read_csv(input_data_path)
    df = pd.DataFrame(data=df, columns=columns)
    df.dropna(inplace=True)
    df.drop_duplicates(inplace=True)
    df.replace(class_labels, [0, 1], inplace=True)

    negative_examples, positive_examples = np.bincount(df["income"])
    print(
        "Data after cleaning: {}, {} positive examples, {} negative examples".format(
            df.shape, positive_examples, negative_examples
        )
    )

    split_ratio = args.train_test_split_ratio
    print("Splitting data into train and test sets with ratio {}".format(split_ratio))
    X_train, X_test, y_train, y_test = train_test_split(
        df.drop("income", axis=1), df["income"], test_size=split_ratio, random_state=0
    )

    preprocess = make_column_transformer(
        (
            ["age", "num persons worked for employer"],
            KBinsDiscretizer(encode="onehot-dense", n_bins=10),
        ),
        (
            ["capital gains", "capital losses", "dividends from stocks"],
            StandardScaler(),
        ),
        (
            ["education", "major industry code", "class of worker"],
            OneHotEncoder(sparse=False),
        ),
    )
    print("Running preprocessing and feature engineering transformations")
    train_features = preprocess.fit_transform(X_train)
    test_features = preprocess.transform(X_test)

    print("Train data shape after preprocessing: {}".format(train_features.shape))
    print("Test data shape after preprocessing: {}".format(test_features.shape))

    train_features_output_path = os.path.join("/opt/ml/processing/train", "train_features.csv")
    train_labels_output_path = os.path.join("/opt/ml/processing/train", "train_labels.csv")

    test_features_output_path = os.path.join("/opt/ml/processing/test", "test_features.csv")
    test_labels_output_path = os.path.join("/opt/ml/processing/test", "test_labels.csv")

    print("Saving training features to {}".format(train_features_output_path))
    pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False)

    print("Saving test features to {}".format(test_features_output_path))
    pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False)

    print("Saving training labels to {}".format(train_labels_output_path))
    y_train.to_csv(train_labels_output_path, header=False, index=False)

    print("Saving test labels to {}".format(test_labels_output_path))
    y_test.to_csv(test_labels_output_path, header=False, index=False)

前処理用スクリプトを S3 にアップロードします。Processing ジョブの Input にアップロードした S3 パスを指定することで、ジョブ起動後にそのパスから前処理用スクリプトが Processing 用コンテナにダウンロードされます。

In [None]:
PREPROCESSING_SCRIPT_LOCATION = "preprocessing.py"

input_code = sagemaker_session.upload_data(
    PREPROCESSING_SCRIPT_LOCATION,
    bucket=bucket,
    key_prefix="data/sklearn_processing/code",
)

### `ProcessingStep` の作成

それでは、SageMaker Processing ジョブを起動するための [ProcessingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/sagemaker.html#stepfunctions.steps.sagemaker.ProcessingStep) を作成しましょう。

このステップは、前の手順で定義した SKLearnProcessor に入力と出力の情報を追加して使用します。

#### [ProcessingInputs](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ProcessingInput) と [ProcessingOutputs](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ProcessingOutput)  オブジェクトを作成して SageMaker Processing ジョブに入力と出力の情報を追加

In [None]:
inputs = [
    ProcessingInput(
        source=execution_input["PreprocessingInputData"], destination="/opt/ml/processing/input", input_name="input-1"
    ),
    ProcessingInput(
        source=input_code,
        destination="/opt/ml/processing/input/code",
        input_name="code",
    ),
]

outputs = [
    ProcessingOutput(
        source="/opt/ml/processing/train",
        destination=execution_input["PreprocessingOutputDataTrain"],
        output_name="train_data",
    ),
    ProcessingOutput(
        source="/opt/ml/processing/test",
        destination=execution_input["PreprocessingOutputDataTest"],
        output_name="test_data",
    ),
]

####  `ProcessingStep` の作成

In [None]:

processing_step = ProcessingStep(
    "SageMaker pre-processing step",
    processor=sklearn_processor,
    job_name=execution_input["PreprocessingJobName"],
    inputs=inputs,
    outputs=outputs,
    container_arguments=["--train-test-split-ratio", "0.2"],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocessing.py"],
)

## 前処理済みデータを使ったモデルの学習

学習スクリプト `train.py` を使って学習ジョブを実行するための `SKLearn` インスタンスを作成します。これはあとで `TrainingStep` を作成する際に使用します。

In [None]:
from sagemaker.sklearn.estimator import SKLearn

sklearn = SKLearn(
    entry_point="train.py",
    train_instance_type="ml.m5.xlarge",
    role=role,
    framework_version="0.20.0",
    py_version="py3",
)

学習スクリプト `train.py` は、ロジスティック回帰モデルを学習し、学習済みモデルを `/opt/ml/model` に保存します。Amazon SageMaker は、学習ジョブの最後に `/opt/ml/model` に保存されているモデルを `model.tar.gz` に圧縮して S3 にアップロードします。

In [None]:
%%writefile train.py

import os
import argparse

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.externals import joblib

if __name__ == "__main__":
    
    parser = argparse.ArgumentParser()
    parser.add_argument("-c", type=str, default='1.0')
    args, _ = parser.parse_known_args()
    
    print(args.c)
    
    training_data_directory = "/opt/ml/input/data/train"
    train_features_data = os.path.join(training_data_directory, "train_features.csv")
    train_labels_data = os.path.join(training_data_directory, "train_labels.csv")
    print("Reading input data")
    X_train = pd.read_csv(train_features_data, header=None)
    y_train = pd.read_csv(train_labels_data, header=None)

    model = LogisticRegression(class_weight="balanced", solver="lbfgs", C=float(args.c), verbose=1)
    print("Training LR model")
    model.fit(X_train, y_train)
    model_output_directory = os.path.join("/opt/ml/model", "model.joblib")
    print("Saving model to {}".format(model_output_directory))
    joblib.dump(model, model_output_directory)

学習用スクリプトを source.tar.gz に固めて S3 にアップロードします。

In [None]:
TRAINNING_SCRIPT_LOCATION = "source.tar.gz"
!tar zcvf $TRAINNING_SCRIPT_LOCATION train.py

train_code = sagemaker_session.upload_data(
    TRAINNING_SCRIPT_LOCATION,
    bucket=bucket,
    key_prefix="data/sklearn_train/code",
)
train_code

### `TrainingStep` の作成

In [None]:
training_step = steps.TrainingStep(
    "SageMaker Training Step",
    estimator=sklearn,
    data={"train": sagemaker.TrainingInput(execution_input["PreprocessingOutputDataTrain"], content_type="text/csv")},
    job_name=execution_input["TrainingJobName"],
    hyperparameters=execution_input["TrainingParameters"],
    wait_for_completion=True,
)

## モデルの評価

`evaluation.py` はモデル評価用のスクリプトです。このスクリプトは scikit-learn と Amazon SageMake Experiments を用いるため、カスタムコンテナを利用できる`ScriptProcessor` を使用します。このスクリプトは学習済みモデルとテスト用データセットを入力として受け取り、各分類クラスの分類評価メトリクス、precision、リコール、F1スコア、accuracy と ROC AUC が記載された JSON ファイルを出力します。

パラメタや学習データを変えて複数のモデルを学習させ、それらの精度を比較する際に評価メトリクスの情報を Amazon SageMaker Experiments で管理しておくと、作成した複数のモデルを比較しやすくなります。以下のモデル評価用のスクリプトでは、モデルの評価メトリクスを算出してそれらを Experiment に登録し、また、最新の学習済みモデルが既存のモデルと比べて良いかどうかを評価メトリクスを使って判定し、その結果も Experiment に記録しています。学習ジョブや Processing ジョブの中で Experiments の Tracker を使う際は、`Tracker.load() ` を使用することでジョブが使用している Trial を自動的に読み出して tracking を開始します。

In [None]:
%%writefile evaluation.py

import json
import os
import tarfile
import argparse

import pandas as pd

from sklearn.externals import joblib
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score

from smexperiments.tracker import Tracker
from smexperiments.trial import Trial
from sagemaker.analytics import ExperimentAnalytics
import sagemaker


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    
    parser.add_argument('--experiment-name', type=str, default=None,
                        help='Experiment name')

    args = parser.parse_args()
    
    model_path = os.path.join("/opt/ml/processing/model", "model.tar.gz")
    print("Extracting model from path: {}".format(model_path))
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    print("Loading model")
    model = joblib.load("model.joblib")

    print("Loading test input data")
    test_features_data = os.path.join("/opt/ml/processing/test", "test_features.csv")
    test_labels_data = os.path.join("/opt/ml/processing/test", "test_labels.csv")

    X_test = pd.read_csv(test_features_data, header=None)
    y_test = pd.read_csv(test_labels_data, header=None)
    predictions = model.predict(X_test)

    print("Creating classification evaluation report")
    report_dict = classification_report(y_test, predictions, output_dict=True)
    report_dict["accuracy"] = accuracy_score(y_test, predictions)
    report_dict["roc_auc"] = roc_auc_score(y_test, predictions)

    print(args.experiment_name)
    trial_component_analytics = ExperimentAnalytics(
        experiment_name=args.experiment_name,
        sort_by="parameters.accuracy",
        sort_order="Descending",# Ascending or Descending
    )
    
    df = trial_component_analytics.dataframe()
    is_best = 0
    try:
        best_acc = df.iloc[0]['accuracy'] 
        if best_acc < report_dict["accuracy"]:
            print('This model is the best ever!!')
            is_best = 1
        else:
            print('This model is not so good.')
    except:
        is_best = 1
        print('This model is the first one.')
    
    print('Recording metrics to Experiments...')
    with Tracker.load() as processing_tracker: # Tracker requires with keyword
        processing_tracker.log_parameters({ "accuracy": report_dict["accuracy"], 
                                                                           "roc_auc": report_dict["roc_auc"], 
                                                                           "is_best": is_best})

    print("Classification report:\n{}".format(report_dict))

    evaluation_output_path = os.path.join("/opt/ml/processing/evaluation", "evaluation.json")
    print("Saving classification report to {}".format(evaluation_output_path))

    with open(evaluation_output_path, "w") as f:
        f.write(json.dumps(report_dict))

評価用スクリプトを S3 にアップロードします。

In [None]:
MODELEVALUATION_SCRIPT_LOCATION = "evaluation.py"

input_evaluation_code = sagemaker_session.upload_data(
    MODELEVALUATION_SCRIPT_LOCATION,
    bucket=bucket,
    key_prefix="data/sklearn_processing/code",
)

モデル評価用の ProcessingStep の入力と出力オブジェクトを作成します。

In [None]:

inputs_evaluation = [
    ProcessingInput(
        source=execution_input["PreprocessingOutputDataTest"],
        destination="/opt/ml/processing/test",
        input_name="input-1",
    ),
    ProcessingInput(
        source=execution_input["TrainingOutputModel"],
        destination="/opt/ml/processing/model",
        input_name="input-2",
    ),
    ProcessingInput(
        source=input_evaluation_code,
        destination="/opt/ml/processing/input/code",
        input_name="code",
    ),
]

outputs_evaluation = [
    ProcessingOutput(
        source="/opt/ml/processing/evaluation",
        destination=execution_input["EvaluationProcessingOutput"],
        output_name="evaluation",
    ),
]

モデルの評価結果を SageMaker Experiments で管理するため、Experiments のライブラリを含んだコンテナイメージを作成します。

In [None]:
!mkdir -p docker/processing/

In [None]:
%%writefile docker/processing/Dockerfile
FROM public.ecr.aws/docker/library/python:3.7-slim-buster
    
ENV AWS_DEFAULT_REGION us-east-1

RUN pip3 install --upgrade pip
RUN pip3 install -qU boto3 pandas==0.25.3 scikit-learn==0.20.0 sagemaker-experiments sagemaker

ENTRYPOINT ["python3", "/opt/ml/processing/input/code/evaluation.py"]

In [None]:
ecr_repository = 'sagemaker-studio-sf-model-evaluate'
tag = ':latest'
uri_suffix = 'amazonaws.com'
processing_repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, ecr_repository + tag)
processing_image_name = ecr_repository + tag

SageMaker Studio では Docker イメージのビルドを行うことができないため、Docker イメージを CodeBuild でビルドし、ECR にプッシュする sm-docker をインストールして利用します。

In [None]:
!pip install sagemaker-studio-image-build

In [None]:
!cd ./docker/processing/ && sm-docker build . --repository $processing_image_name

データの前処理の際は SKProcessor を使用しましたが、モデルの評価にはカスタムコンテナを使用するため ScriptProcessor を使用します。

In [None]:
from sagemaker.processing import ScriptProcessor

model_evaluation_processor = ScriptProcessor(
    command=['python3'],
    image_uri=processing_repository_uri,
    role=role,
    sagemaker_session=sagemaker_session,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    max_runtime_in_seconds=1200
)

ProcessingStep の引数 experiment_config に Experiment 名をセットすることで、起動した Processing ジョブの中で自動的に Trial が作成されます。

In [None]:
processing_evaluation_step = ProcessingStep(
    "SageMaker Processing Model Evaluation step",
    processor=model_evaluation_processor,
    job_name=execution_input["EvaluationProcessingJobName"],
    inputs=inputs_evaluation,
    outputs=outputs_evaluation,
    experiment_config={
             "ExperimentName":execution_input["ExperimentName"]},
    container_arguments=execution_input["EvaluationExperimentArgs"],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/evaluation.py"],
)

## モデルの評価結果に応じた処理

モデルの評価ジョブでの評価結果をもとに、Lambda 関数で後処理を行います。後処理としては例えば、Slack にモデルの評価結果をポストしたり、別の Workflow を起動したりなどが考えられます。このノートブックでは、評価結果に応じて表示するテキストの内容を変えています。

このノートブックでは、Experiment で管理されている情報を取得するために `sagemaker.analytics.ExperimentAnalytics` を使用するため、必要なライブラリが入ったコンテナを使って Lambda 関数を実行します。

In [None]:
!mkdir -p docker/lambda/app

In [None]:
%%writefile ./docker/lambda/app/app.py

import sys
import time
import os
import glob
import numpy as np
import json
import boto3
from sagemaker.analytics import ExperimentAnalytics

def handler(event, context):

    experiment_name = event['experiment-name']
    job_name = event['evaluation-job-name']
    
    print('job_name: ', job_name)
    
    search_expression = {
        "Filters":[
            {
                "Name": "TrialComponentName",
                "Operator": "Contains",
                "Value": job_name,
            }
        ],
    }

    trial_component_analytics = ExperimentAnalytics(
        experiment_name=experiment_name,
        search_expression=search_expression,
    )
    
    df = trial_component_analytics.dataframe()
    print('is_best: ', str(df['is_best']))

    result = False
    if int(df['is_best']) > 0:
        print('This model is the best ever!')
        result = True
    else:
        print('This model is not so good!')
    
    return {
        'statusCode'        : 200,
        'result':result
    }

In [None]:
%%writefile ./docker/lambda/Dockerfile

# Define custom function directory
ARG FUNCTION_DIR="/function"

FROM public.ecr.aws/docker/library/python:3.7-slim-buster
    
# Include global arg in this stage of the build
ARG FUNCTION_DIR

# Install aws-lambda-cpp build dependencies
RUN apt-get update && \
  apt-get install -y \
  g++ \
  make \
  cmake \
  unzip \
  libcurl4-openssl-dev \
  libopencv-dev
  
RUN pip3 install --upgrade pip
RUN pip3 install -qU boto3 pandas==0.25.3 sagemaker-experiments sagemaker

# Copy function code
RUN mkdir -p ${FUNCTION_DIR}
COPY app/ ${FUNCTION_DIR}/

# Install the function's dependencies
RUN pip install \
    --target ${FUNCTION_DIR} \
        awslambdaric


# Set working directory to function root directory
WORKDIR ${FUNCTION_DIR}


ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ]
CMD [ "app.handler" ]

In [None]:
ecr_repository_lambda = 'sagemaker-studio-sf-lambda'
tag = ':latest'
uri_suffix = 'amazonaws.com'
lambda_repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, ecr_repository_lambda + tag)
lambda_image_name = ecr_repository_lambda + tag

In [None]:
!cd ./docker/lambda/ && sm-docker build . --repository $lambda_image_name

###### コンテナイメージを使って AWS Lambda 関数を作成

Lambda で使用するコンテナイメージができたら、コンテナイメージを使って Lambda 関数を作成します。ここからは、ノートブックインスタンスを離れて、AWS Lambda のコンソール操作になります。AWS コンソールから AWS Lambda のコンソールにアクセスしてください。その後、以下の手順を実施して先ほど作成したコンテナイメージが動作する Lambda 関数を作成してください。

1. AWS Lambda コンソールで、「関数の作成」をクリック
1. 「以下のいずれかのオプションを選択して、関数を作成します。」と書かれた部分で「コンテナイメージ」を選択
1. 「関数名」に任意の関数名を入力
1. 「イメージを参照」ボタンをクリックして先ほど作成したコンテナイメージを選択<br>
「Amazon ECR イメージリポジトリ」のプルダウンメニューに作成したはずのリポジトリがなかったり、コンテナイメージがない場合は一つ上のセルの実行時に何らかのエラーが出ている可能性があるので確認してください。
1. 「関数の作成」をクリック

#### Lambda 関数の IAM ロールに Experiment 参照権限を追加

1. Lambda コンソールで先ほど作成した関数の詳細画面を表示し、設定タブをクリックして左側のメニューの「アクセス権限」をクリック
1. **ロール名** と書かれた部分にあるリンクをクリック（IAM のコンソールが表示される）
1. **ポリシーをアタッチします** をクリックして **ポリシーの作成** をクリック
1. **JSON** タブをクリックして以下の内容をペースト

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "sagemaker:Search",
            "Resource": "*"
        }
    ]
}
```

5. **次のステップ：タグ** **次のステップ：確認**をクリック
6. **名前** に `SageMakerSearchPolicy` と入力して **ポリシーの作成** をクリック
7. 作成したポリシーを検索して IAM ロールにアタッチ

Lambda 関数が作成できたら、以下のセルの `FunctionName` に関数名を入力して実行します。

In [None]:
from stepfunctions.steps.states import Retry
lambda_step = stepfunctions.steps.compute.LambdaStep(
    "Query Evaluation Results",
    parameters={
        "FunctionName": 'query_experiment_and_evaluate_container',
        "Payload": {
            "experiment-name": execution_input["ExperimentName"],
             "evaluation-job-name": execution_input["EvaluationProcessingJobName"],
            
        },
    },
)
lambda_step.add_retry(
    Retry(error_equals=["States.TaskFailed"], interval_seconds=15, max_attempts=2, backoff_rate=4.0)
)

## Fail 状態の作成
いずれかのステップが失敗したときにワークフローが失敗だとわかるように `Fail` 状態を作成します。

In [None]:
failed_state_sagemaker_processing_failure = stepfunctions.steps.states.Fail(
    "ML Workflow failed", cause="SageMakerProcessingJobFailed"
)

#### ワークフローの中のエラーハンドリングを追加

エラーハンドリングのために [Catch Block](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/states.html#stepfunctions.steps.states.Catch) を使用します。もし いずれかの Step が失敗したら、`Fail` 状態に遷移します。

In [None]:
catch_state_processing = stepfunctions.steps.states.Catch(
    error_equals=["States.TaskFailed"],
    next_step=failed_state_sagemaker_processing_failure,
)

processing_step.add_catch(catch_state_processing)
processing_evaluation_step.add_catch(catch_state_processing)
training_step.add_catch(catch_state_processing)
lambda_step.add_catch(catch_state_processing)

## `Workflow` の作成と実行

ここまでで Step Functions のワークフローを作成する準備が完了しました。それでは、ワークフローを作成して実行してみましょう。

In [None]:
from datetime import datetime, timezone, timedelta

JST = timezone(timedelta(hours=+9), 'JST')

timestamp = datetime.now(JST).strftime("%Y-%m-%d-%H-%M-%S")

# Generate unique names for Pre-Processing Job, Training Job, and Model Evaluation Job for the Step Functions Workflow
training_job_name = "scikit-learn-training-{}".format(
    timestamp
)  # Each Training Job requires a unique name
preprocessing_job_name = "scikit-learn-sm-preprocessing-{}".format(
    timestamp
)  # Each Preprocessing job requires a unique name,
evaluation_job_name = "scikit-learn-sm-evaluation-{}".format(
    timestamp
)  # Each Evaluation Job requires a unique name
timestamp

Processing ジョブの出力を保存する S3 パスを作成します。

In [None]:
# 前処理ジョブ用
s3_bucket_base_uri = "{}{}".format("s3://", bucket)
output_data = "{}/{}-{}".format(s3_bucket_base_uri, "data/sklearn_processing/output", timestamp)
preprocessed_training_data = "{}/{}".format(output_data, "train_data")

# モデル評価ジョブ用
preprocessed_testing_data = "{}/{}".format(output_data, "test_data")
model_data_s3_uri = "{}/{}/{}".format(s3_bucket_base_uri, training_job_name, "output/model.tar.gz")
output_model_evaluation_s3_uri = "{}/{}/{}".format(
    s3_bucket_base_uri, training_job_name, "evaluation"
)

Chain を使って各 Step を連結してワークフローを作成します。既存のワークフローを変更する場合は、update() を実行します。ログに ERROR が表示された場合は、以下のセルを再度実行してください。

In [None]:
workflow_graph = Chain([processing_step, training_step, processing_evaluation_step, lambda_step])
# workflow_graph = Chain([training_step])
# workflow_graph = Chain([processing_evaluation_step,  lambda_step])
# workflow_graph = Chain([ lambda_step])
branching_workflow = Workflow(
    name="SageMakerProcessingWorkflow",
    definition=workflow_graph,
    role=workflow_execution_role,
)

branching_workflow.create()
branching_workflow.update(workflow_graph)

パラメタを指定して、ワークフローを実行します。

In [None]:
# Execute workflow
execution = branching_workflow.execute(
    inputs={
        "PreprocessingJobName": preprocessing_job_name,  # Each pre processing job (SageMaker processing job) requires a unique name,
        "PreprocessingInputData": input_data,
        "PreprocessingOutputDataTrain": output_data+'/train_data',
        "PreprocessingOutputDataTest": output_data+'/test_data',
        "TrainingJobName": training_job_name,  # Each Sagemaker Training job requires a unique name,
        "TrainingParameters": {
                                     "sagemaker_program": "train.py",
                                     "sagemaker_submit_directory":  train_code,
                                      "c": '1.1'
},
        "TrainingOutputModel": model_data_s3_uri,
        "ExperimentName": experiment_evaluate.experiment_name,
        "EvaluationProcessingJobName": evaluation_job_name,  # Each SageMaker processing job requires a unique name,
        "EvaluationProcessingOutput": output_model_evaluation_s3_uri,
        "EvaluationExperimentArgs": ['--experiment-name', experiment_evaluate.experiment_name]
    }
)

以下のセルを実行することで、Workflow の進行状況がわかります。実行開始から12分程度で完了します。

In [None]:
execution.render_progress()

Workflow の実行が完了したら、Experiment の中をのぞいてみましょう。まずは Experiment のデータを ExperimentAnalytics を使って読み出します。

In [None]:
search_expression = {
    "Filters":[
        {
            "Name": "TrialComponentName",
            "Operator": "Contains",
            "Value": evaluation_job_name,
        }
    ],
}

trial_component_analytics = ExperimentAnalytics(
    experiment_name=experiment_evaluate.experiment_name,
    sort_by="parameters.accuracy",
#     search_expression=search_expression,
#     sort_by="metrics.acc.max",
#     sort_order="Ascending",# Ascending or Descending
#     metric_names=['metric1', 'metric2'],
#     parameter_names=['accuracy', 'roc_auc'],
    input_artifact_names=[]
)

読み出したデータを DataFrame 形式に変換して表示します。accuracy や roc_auc が記録されていることがわかります。

In [None]:
import pandas as pd
df = trial_component_analytics.dataframe()
pd.set_option('display.max_columns', None)
df

### ワークフローの出力を確認

Amazon S3 から `evaluation.json` を取得して確認します。ここにはモデルの評価レポートが書かれています。なお、以下のセルは Step Functions でワークフローの実行が完了してから（`evaluation.json` が出力されてから）実行してください。

In [None]:
workflow_execution_output_json = execution.get_output(wait=True)

In [None]:
from sagemaker.s3 import S3Downloader
import json

evaluation_s3_uri = output_model_evaluation_s3_uri + '/evaluation.json'
evaluation_output = S3Downloader.read_file(evaluation_s3_uri)
evaluation_output_dict = json.loads(evaluation_output)
print(json.dumps(evaluation_output_dict, sort_keys=True, indent=4))

## Step Functions Workflow を実行のみ行う場合

ここまでで、Step Functions Data Science SDK を使って Workflow を作成し、実行するところまでご紹介しました。実際に使用する際は、作成済みの Workflow をパラメタを指定して実行する部分を繰り返すことになります。ここでは、作成済みの Workflow を実行する部分のみを抜き出してご紹介します。

Workflow の ARN を使って既存の Workflow を読み出します。ARN はわからないけれど Workflow 名はわかる、という場合は以下のセルを実行することで ARN を取得することができます。（`workflow_name` に Workflow 名を入れてください）

In [None]:
workflow_name = 'SageMakerProcessingWorkflow'
workflow_list = Workflow.list_workflows()
workflow_arn = [d['stateMachineArn'] for d in workflow_list  if d['name']==workflow_name][0]
workflow_arn

Workflow を読み出して `execute` で実行します。実行する際にパラメタを指定することができます。SageMaker ジョブはユニークな名前である必要があるため、このノートブックでは datetime を使ってジョブ名を生成しています。以下のセルの `train_code` には、学習ジョブで使用するスクリプトをアップロードしてある S3 パスを設定してください。

In [None]:
from datetime import datetime, timezone, timedelta
from stepfunctions.workflow import Workflow

region = sagemaker_session.boto_region_name
input_data = "s3://sagemaker-sample-data-{}/processing/census/census-income.csv".format(region)
# train_code = 's3://xxx/code/source.tar.gz'

bucket = sagemaker_session.default_bucket()

JST = timezone(timedelta(hours=+9), 'JST')

timestamp = datetime.now(JST).strftime("%Y-%m-%d-%H-%M-%S")

# Generate unique names for Pre-Processing Job, Training Job, and Model Evaluation Job for the Step Functions Workflow
training_job_name = "scikit-learn-training-{}".format(
    timestamp
)  # Each Training Job requires a unique name
preprocessing_job_name = "scikit-learn-sm-preprocessing-{}".format(
    timestamp
)  # Each Preprocessing job requires a unique name,
evaluation_job_name = "scikit-learn-sm-evaluation-{}".format(
    timestamp
)  # Each Evaluation Job requires a unique name

# 前処理ジョブ用
s3_bucket_base_uri = "{}{}".format("s3://", bucket)
output_data = "{}/{}-{}".format(s3_bucket_base_uri, "data/sklearn_processing/output", timestamp)
preprocessed_training_data = "{}/{}".format(output_data, "train_data")

# モデル評価ジョブ用
preprocessed_testing_data = "{}/{}".format(output_data, "test_data")
model_data_s3_uri = "{}/{}/{}".format(s3_bucket_base_uri, training_job_name, "output/model.tar.gz")
output_model_evaluation_s3_uri = "{}/{}/{}".format(
    s3_bucket_base_uri, training_job_name, "evaluation"
)

existing_workflow = Workflow.attach(workflow_arn)
execution = existing_workflow.execute(
    inputs={
        "PreprocessingJobName": preprocessing_job_name,  # Each pre processing job (SageMaker processing job) requires a unique name,
        "PreprocessingInputData": input_data,
        "PreprocessingOutputDataTrain": output_data+'/train_data',
        "PreprocessingOutputDataTest": output_data+'/test_data',
        "TrainingJobName": training_job_name,  # Each Sagemaker Training job requires a unique name,
        "TrainingParameters": {
                                     "sagemaker_program": "train.py",
                                     "sagemaker_submit_directory":  train_code,
                                      "c": '1.1'
        },
        "TrainingOutputModel": model_data_s3_uri,
        "ExperimentName": experiment_evaluate.experiment_name,
        "EvaluationProcessingJobName": evaluation_job_name,  # Each SageMaker processing job requires a unique name,
        "EvaluationProcessingOutput": output_model_evaluation_s3_uri,
        "EvaluationExperimentArgs": ['--experiment-name', experiment_evaluate.experiment_name]
    }
)

以下のセルを実行することで、Workflow の進行状況がわかります。

In [None]:
execution.render_progress()

## リソースの削除

このノートブックの実行が終わったら、不要なリソースを削除することを忘れないでください。以下のコードのコメントアウトを外してから実行すると、このノートブックで作成した Step Functions のワークフローを削除することができます。ノートブックインスタンス、各種データを保存した S3 バケットも不要であれば削除してください。

### Step Functions Workflow の削除

In [None]:
# branching_workflow.delete()

### Experiment の削除

In [None]:
sm = boto3.Session().client('sagemaker')
def cleanup(experiment):
    for trial_summary in experiment.list_trials():
        trial = Trial.load(sagemaker_boto_client=sm, trial_name=trial_summary.trial_name)
        for trial_component_summary in trial.list_trial_components():
            tc = TrialComponent.load(
                sagemaker_boto_client=sm,
                trial_component_name=trial_component_summary.trial_component_name)
            trial.remove_trial_component(tc)
            try:
                # comment out to keep trial components
                tc.delete()
            except:
                # tc is associated with another trial
                continue
            # to prevent throttling
            time.sleep(.5)
        trial.delete()
    experiment.delete()

In [None]:
cleanup(experiment_evaluate)