# Amazon SageMaker ハンズオン：Scikit-learn コンテナを用いて Iris データセットの 学習・推論を行なう


このチュートリアルでは、[Scikit-learn](https://scikit-learn.org/stable/) を SageMaker で使用する方法を、ビルド済みのコンテナを利用して紹介します。Scikit-learn は人気の Python 機械学習フレームワークです。分類、回帰、クラスタリング、次元削減、データ/フィーチャの前処理のための多くの異なるアルゴリズムが含まれています。

[sagemaker-python-sdk](https://github.com/aws/sagemaker-python-sdk)モジュールを使えば、既存のscikit-learnコードを簡単に利用することができます。ここでは、Irisデータセットを用いてScikit-learnのモデルを学習し、予測値を生成することを試します。Scikit-learn コンテナの詳細については、[sagemaker-scikit-learn-containers](https://github.com/aws/sagemaker-scikit-learn-container) リポジトリや [sagemaker-python-sdk](https://github.com/aws/sagemaker-python-sdk) リポジトリを参照してください。

## 0. 事前準備：ライブラリのインストール / セットアップ

In [None]:
!pip install -U sagemaker

In [6]:
# S3 prefix
prefix = "DEMO-scikit-iris"

import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = get_execution_role()

# 1. Amazon SageMaker Training Job - Amazon SageMaker上でモデルを学習する

## 1.1 学習用データを Amazon S3 にアップロード 
Amazon SageMaker でモデルの学習を行うには、まずAmazon S3と呼ばれるAWSのストレージのサービスに対して学習に使用したいデータをアップロードする必要があります。ここでは、機械学習用のデータセットとして有名なIrisデータセットをサンプルデータとして使用します。

あらかじめ Amazon SageMaker 側が用意している Amazon S3バケット内のサンプルデータをダウンロードします。

In [20]:
import boto3
import numpy as np
import pandas as pd
import os

os.makedirs("./data", exist_ok=True)

s3_client = boto3.client("s3")
s3_client.download_file(
    f"sagemaker-example-files-prod-{region}", "datasets/tabular/iris/iris.data", "./data/iris.csv"
)

df_iris = pd.read_csv("./data/iris.csv", header=None)
df_iris[4] = df_iris[4].map({"Iris-setosa": 0, "Iris-versicolor": 1, "Iris-virginica": 2})
iris = df_iris[[4, 0, 1, 2, 3]].to_numpy()
np.savetxt("./data/iris.csv", iris, delimiter=",", fmt="%1.1f, %1.3f, %1.3f, %1.3f, %1.3f")

ローカルにデータを取得したら、SageMaker Python SDK が提供するツールを使用して、データをお客様の AWSアカウント内にあるAmazon S3 バケットにアップロードします。

Amazon SageMakerの環境を立ち上げる際に、実は「Amazon SageMaker 用の Amazon S3 バケット」が作成されます。今回はその S3 バケットに対してデータをアップロードしていきます。

In [10]:
WORK_DIRECTORY = "data"

train_input = sagemaker_session.upload_data(
    WORK_DIRECTORY, key_prefix="{}/{}".format(prefix, WORK_DIRECTORY)
)

## 1.2. トレーニング用の Scikit-learn スクリプトを作成する（書き換える） <a class="anchor" id="create_sklearn_script"></a>

お客様の中には、ローカルPCでモデル開発を行う際 Jupyter Notebook上でモデルの前処理〜学習〜推論を行われる方もいらっしゃると思います。
Amazon SageMaker を用いた場合でも、引き続きJupyter Notebook上で任意の処理を実行できます。ただ、トレーニングを学習用インスタンスで実行する場合は、学習用スクリプト（Pythonファイル）を作成していただく必要があります。

### 書き換えが必要な理由
Amazon SageMaker では、オブジェクトストレージ Amazon S3 をデータ保管に利用します。例えば、S3 上の学習データを指定すると、自動的に Amazon SageMaker の学習用インスタンスにデータがダウンロードされ、トレーニングスクリプトが実行されます。トレーニングスクリプトを実行した後に、指定したディレクトリにモデルを保存すると、自動的にモデルがS3にアップロードされます。

トレーニングスクリプトを SageMaker に持ち込む場合は、以下の点を修正する必要があります。

* 学習用インスタンスにダウンロードされた学習データのロード
* 学習が完了したときのモデルの保存
* これらの修正は、トレーニングスクリプトを任意の環境に持ち込む際の修正と変わらないでしょう。例えば、自身のPCに持ち込む場合も、/home/user/data のようなディレクトリからデータを読み込んで、/home/user/model にモデルを保存したいと考えるかもしれません。同様のことを SageMaker で行う必要があります。

### 書き換え方法

（詳細には[こちらのサンプルノートブック](https://github.com/aws-samples/aws-ml-jp/blob/main/sagemaker/workshop/lab_bring-your-own-model/tensorflow/tensorflow.ipynb)などを参照ください）

SageMaker は `SKLearn` Estimatorを用いて scikit-learn スクリプトを実行することができます。SageMaker 上で実行すると、以下のような学習環境のプロパティにアクセスするための便利な環境変数が利用できます：

* `SM_MODEL_DIR`：: モデルの成果物を書き込むディレクトリへのパスを表す文字列。このフォルダに保存された成果物はトレーニングジョブ終了後にS3にアップロードされ、モデルのホスティングに使用される。
* `SM_OUTPUT_DIR`： 出力結果を書き込むファイルシステムのパスを表す文字列です。出力結果にはモデルのチェックポイントファイルや、グラフ、その他の保存ファイルが含まれます。これらの成果物は圧縮され、モデルの成果物と同じS3プレフィックスでS3にアップロードされます。

また、 `SKLearn` Estimator の `fit()` メソッドを呼び出す際に 'train' と 'test' の2つの入力チャンネルを使用したとすると、以下の環境変数が設定されます。

* `SM_CHANNEL_TRAIN`: 'train' channel のデータを含むディレクトリへのパスを表す文字列。
* `SM_CHANNEL_TEST`: 'test' channel のデータを含むディレクトリへのパスを表す文字列。

典型的なトレーニングスクリプトは、入力チャンネルからデータをロードし、ハイパーパラメータでトレーニングを設定し、モデルをトレーニングし、後でホストできるようにモデルを `model_dir` に保存します。ハイパーパラメータは引数としてスクリプトに渡され、 `argparse.ArgumentParser` インスタンスで取得することができる。例えば、このノートブックで実行するスクリプトは以下のようになります：

```python
from __future__ import print_function

import argparse
import joblib
import os
import pandas as pd

from sklearn import tree


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Hyperparameters are described here. In this simple example we are just including one hyperparameter.
    parser.add_argument('--max_leaf_nodes', type=int, default=-1)

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()

    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))
    raw_data = [ pd.read_csv(file, header=None, engine="python") for file in input_files ]
    train_data = pd.concat(raw_data)

    # labels are in the first column
    train_y = train_data.iloc[:, 0]
    train_X = train_data.iloc[:, 1:]

    # Here we support a single hyperparameter, 'max_leaf_nodes'. Note that you can add as many
    # as your training my require in the ArgumentParser above.
    max_leaf_nodes = args.max_leaf_nodes

    # Now use scikit-learn's decision tree classifier to train the model.
    clf = tree.DecisionTreeClassifier(max_leaf_nodes=max_leaf_nodes)
    clf = clf.fit(train_X, train_y)

    # Print the coefficients of the trained classifier, and save the coefficients
    joblib.dump(clf, os.path.join(args.model_dir, "model.joblib"))


def model_fn(model_dir):
    """Deserialized and return fitted model
    
    Note that this should have the same name as the serialized model in the main method
    """
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf
```

※ 補足

Scikit-learn コンテナはお客様が作成したトレーニングスクリプトをインポートするため、コンテナがトレーニングコードを実行する際に誤って実行しないように、常にトレーニングコードをメインガード `(if __name__=='__main__':)` に記述する必要があります。モデルの学習時に使用できる環境変数の詳細については、https://github.com/aws/sagemaker-containers をご覧ください。

## 1.3. SageMaker Estimator の作成 
作成した学習スクリプトを使って実際にモデルを学習していきます。Amazon SageMakerでは、学習の際に`Estimator` と呼ばれる、モデルの学習や評価、推論などで決まった処理を抽象化したインターフェースを使用します。

今回使用するScikit-learnや、よく機械学習で用いられるフレームワークであるPyTorchやTensorFlowなどに関しては、そのフレームワーク専用のEstimatorを用意しており、そちらを使うことで1から定義するよりもセットアップが簡単です。

SageMaker上でScikit-learn学習スクリプトを実行するために、いくつかのコンストラクタ引数を受け付けるsagemaker.sklearn.estimator.sklearn estimatorを作成します。

* entry_point： SageMaker が学習と予測のために実行する Python スクリプトへのパス。
* role： IAM ロールの ARN。
* instance_type (オプション)： 学習用の SageMaker インスタンスタイプ
* sagemaker_session（オプション）： SageMaker での学習に使用するセッション。
* hyperparameters (オプション)： ハイパーパラメータとして train 関数に渡される辞書。

SKLearn Estimatorの詳細はこちらをご覧ください。 https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/sklearn

In [11]:
from sagemaker.sklearn.estimator import SKLearn

# Scikit-learn の バージョンを指定
FRAMEWORK_VERSION = "1.2-1"

# 学習に使用したいスクリプトファイル名を指定
script_path = "scikit_learn_iris.py"

sklearn = SKLearn(
    entry_point=script_path, 
    framework_version=FRAMEWORK_VERSION,
    instance_type="ml.c4.xlarge",
    role=role,
    sagemaker_session=sagemaker_session,
    hyperparameters={"max_leaf_nodes": 30},
)

## 1.4. Iris データで SKLearn Estimator をトレーニング（Amazon SageMaker Training Job）
学習は簡単で、Estimatorに対してfit()を呼び出すだけです！これにより、SageMakerのトレーニングジョブが開始され、データをダウンロードし、（提供されたスクリプトファイル内の）scikit-learnコードを起動し、スクリプトが作成したモデルの成果物を保存します。

In [None]:
sklearn.fit({"train": train_input})

### （補足）Amazon SageMaker Training Jobでトレーニングする時のメリット
「Jupyter Notebook 上 で学習する場合と何が違うの？」と思ったお客様もいらっしゃるかもしれません。Amazon SageMaker Training Jobを用いることのメリットをいくつか紹介しておきます。
* **学習専用のインスタンスを立ち上げられる**：「普段は不要だけれど、学習の時だけGPUを使いたい」「学習の時に大量のサーバーを使ってささっと学習を終わらせたい」というような柔軟な使い方が可能になります。
* **学習の内容が自動で記録される**：Notebook上だけでモデルの試行錯誤を行なっている場合、「いつどんなパラメータでどんなデータを用いて学習したモデルがどこに保管されているか...」というのが曖昧になりがちです（少なくとも記録が属人化します）。Amazon SageMaker Training Jobとして実行することで、どんなデータ・どんなハイパーパラメータで学習したかなどの学習に関する情報が自動で記録され、後から見返すことができます。
* **SageMakerのその他幅広い機能を使うことができる**：例えばMLOpsの機能など、SageMakerに備わっている様々な便利機能を用いて機械学習の開発の効率を高めることができます。

## 2. 推論の実行 - 学習されたモデルを使って推論を実行（ Amazon SageMaker で学習したモデルを用いて Amazon SageMaker で推論するパターン）

### 2.1. モデルのデプロイ 
モデルを SageMaker ホスティングにデプロイするには、fit()を実行したEstimator（すなわち学習が終了したEstimator）に対して deploy() を実行するだけです。これにより、リアルタイム推論エンドポイントが立ち上がります。

In [None]:
predictor = sklearn.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge")

### 2.2. データをピックアップして実際に推論してみる
それでは実際に推論をしてみましょう。ここでは、トレーニングに使用したデータを抽出し、それに対して予測を行います。

（本来であれば、学習に使用していないテスト用のデータを使うべきですが、今回は「デプロイされたエンドポイントを使って推論がちゃんと実行できる」ことを確かめるためにこのように対応しています。）

In [21]:
import itertools
import pandas as pd

shape = pd.read_csv("data/iris.csv", header=None)

a = [50 * i for i in range(3)]
b = [40 + i for i in range(10)]
indices = [i + j for i, j in itertools.product(a, b)]

test_data = shape.iloc[indices[:-1]]
test_X = test_data.iloc[:, 1:]
test_y = test_data.iloc[:, 0]

予測を行うには、deploy()から返されたpredictor上でpredict()を呼び出し、予測を行うデータを渡します。エンドポイントからの出力は、分類予測の数値表現を返します。元のデータセットでは、これらは3つの花のカテゴリ名ですが、この例ではラベルは数値です。解析した元のラベルと比較をしています。

In [None]:
print(predictor.predict(test_X.values))
print(test_y.values)

無事結果が返ってきました。（学習に使ったデータをそのまま推論に使用しているので当たり前ですが、）実際のラベルと予測されたラベルが一致していることが確認できました。

### 2.3. エンドポイントのクリーンアップ
predict()メソッドを実行すると、エンドポイント、すなわち推論用のインスタンスが立ち上がっています。エンドポイントを使い終わったら、それを削除してリソースを解放し、追加コストが発生しないようにしましょう。

In [None]:
predictor.delete_endpoint()

## 3. 学習されたモデルを使って推論を実行（ Amazon SageMaker の外で学習されたモデルを用いて Amazon SageMaker で推論するパターン）

お客様によってはすでにどこかで学習されたモデルをAmazon SageMaker上で推論に用いたい、という場合があると思います。ここからはその手順について見てみましょう。（ちなみにこのようなパターンを「BYOM；Bring Your Own Model」と呼びます。）

### 3.1. 準備：モデルをSageMaker Studio Notebookインスタンスにダウンロード
**今回は「あらかじめローカルで学習されたモデル」を模擬するために、先ほどAmazon SageMaker Trainingで学習したモデルのファイルを用いることにします。**

まずは先ほどの学習結果のモデルがどこに保存されているのかを確認します。


In [None]:
sklearn.model_data

では、こちらのモデルをダウンロードしていきます。

In [None]:
os.makedirs("./model/", exist_ok=True)
sagemaker.s3.S3Downloader.download(
    s3_uri=sklearn.model_data,
    local_path='./model/'
)

ご覧の通り"model.tar.gz"というファイル名が確認できたかと思います。Amazon SageMaker Trainingでは、モデルの出力結果は「tar.gz ファイル」の形で圧縮されて出力されています。こちらを解凍することで実際のモデルファイルを確認できます。

In [43]:
# tar.gz の解凍
import tarfile
with tarfile.open('./model/model.tar.gz') as tar:
    tar.extractall('./model/')

### 3.2. 準備：ダウンロードしたモデルの読み込みを試してみる
SageMaker Training で学習したモデルファイル（今回はjoblibファイル）は当然ながら SageMaker Studio Notebook上でも Loadすればそのまま使用できます。

In [None]:
import joblib
filename = './model/model.joblib'
sk_local_model = joblib.load(filename)

In [None]:
sk_local_model.predict(test_X.values)

### 3.3. tar.gz に圧縮したモデルを S3 にアップロードする
ではここから、手元にあるモデルファイル（`./model/model.joblib`）を使ってAmazon SageMakerの推論エンドポイントを立てる方法を学んでいきます。
まずは、手元にあるモデルファイルを Amazon S3にアップロードしていきます。

In [None]:
# 圧縮
%cd /root/sagemaker-handson-sklearn/model
!rm model.tar.gz
!tar zcvf model.tar.gz ./*
%cd ..

In [None]:
sess = sagemaker.session.Session()
bucket = sess.default_bucket()
import datetime
timestamp = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))).strftime('%Y%m%d%H%M%S')
model_s3_path = f's3://{bucket}/iris-model-{timestamp}'

In [None]:
model_s3_uri = sagemaker.s3.S3Uploader.upload(
    local_path = '/root/sagemaker-handson-sklearn/model/model.tar.gz',
    desired_s3_uri = model_s3_path
)
print(model_s3_uri)

### 3.4. S3 にアップロードしたモデルを SageMaker 管理のモデルとして登録する
Amazon SageMaker で推論エンドポイントを立てるためには、作成したモデルを Amazon SageMaker側に登録させる必要があります。Amazon SageMaker Trainingで学習したモデルは、学習完了時に自動的に登録がなされますが、今回のBYOMパターンの場合は明示的に登録する設定が必要です。

Scikit-Learn で作成したモデルは `SKLearnModel` で読み込みます。

### 3.4.1. モデル登録 - 推論用スクリプトを作成する
BYOMの場合は、学習の時に学習用スクリプトを作成したのと同様に、推論用のスクリプトを作成する必要があります。このスクリプトの書き方にはお作法があり、最低限以下の二つの関数を定義する必要があります。
* `model_fn(model_dir)`：`model_dir`の中には、先ほどAmazon S3にアップロードしたモデルが入っています。こちらをLoadする処理を記載します。
* `predict_fn(input_data, model)`：ロードしたモデルに対してデータを入力する処理を記載します。

In [57]:
%%writefile ./inference.py
import os, joblib
def model_fn(model_dir):
    model = joblib.load(os.path.join(model_dir, 'model.joblib'))
    return model
def predict_fn(input_data, model):
    return model.predict(input_data)

Writing ./inference.py


In [60]:
from sagemaker.sklearn import SKLearnModel
sklearn_model = SKLearnModel(
    entry_point='./inference.py',
    model_data=model_s3_uri,
    role= role,
    framework_version = '1.2-1',
    py_version='py3',
)

### 3.4.2. 登録したモデルのデプロイ

In [None]:
endpoint_name =  'iris-model-Endpoint'
sklearn_predictor = sklearn_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.large',
    endpoint_name=endpoint_name
)

デプロイに成功したら、先ほどと同じテストデータを入力してみます。

In [None]:
sklearn_predictor.predict(test_X.values)

無事同じ出力を得ることができました。

### 3.5. デプロイしたモデルを使ってSageMakerの外から推論する
このようにしてデプロイしたモデルは、Amazon SageMaker のノートブック以外の環境から利用することができるようになります。
AWSの様々なサービスのAPIを利用するためのライブラリであるAWS SDK（Pythonの場合は「Boto3」という名前のライブラリ）を使用することで可能になります。

適切な権限があればどこからでも実行できますが、ここでは簡単のためにSageMaker Studio Notebook上で実行してみます。試してみたい方は、AWS Lambdaなどの別サービスに下記コードをコピペしてみてください。

In [67]:
# Boto3のインポート
import boto3

In [90]:
import json
client = boto3.client('sagemaker-runtime')
response = client.invoke_endpoint(
    EndpointName=endpoint_name,
    Body=str(test_X.values.tolist()),
    ContentType='application/json'
)

In [None]:
response = np.array(json.load(response['Body']))
response

### 3.6. エンドポイントのクリーンアップ
終わったらエンドポイントを消去します。

In [None]:
sklearn_predictor.delete_endpoint()

##  4. (Optional) 推論方式の変更 - サーバーレス推論を実行してみる
Amazon SageMaker では現在4つの推論方式をサポートしています。これまでみてきたエンドポイントのパターンは「リアルタイム推論」でした。常にエンドポイントが立ち上がっているため最もリアルタイムに推論の結果を返すことができます。

その他にも、推論を行いたいパターンに応じて推論方式を選択できます。一度SageMaker側にモデルを登録すれば、簡単に推論の方式を変えられます。今回はサーバーレス推論のパターンを試してみましょう。

その他の推論方式を含む詳細については、こちらのドキュメントをご覧ください。https://sagemaker.readthedocs.io/en/stable/overview.html#sagemaker-serverless-inference

### 4.1. サーバーレス推論に関する設定値を決定

In [92]:
from sagemaker.serverless import ServerlessInferenceConfig

# デフォルトの設定値を参照
serverless_config = ServerlessInferenceConfig()

# 設定したい場合はこちら
# serverless_config = ServerlessInferenceConfig(
#   memory_size_in_mb=4096,
#   max_concurrency=10,
# )

### 4.2. サーバーレス推論エンドポイントのデプロイ

In [None]:
serverless_predictor = sklearn_model.deploy(serverless_inference_config=serverless_config)

### 4.3. 推論の実行

In [None]:
import boto3, json
client = boto3.client('sagemaker-runtime')
response = client.invoke_endpoint(
    EndpointName=serverless_predictor.endpoint,
    Body=str(test_X.values.tolist()),
    ContentType='application/json'
)

## 5. (Optional) バッチ変換 <a class="anchor" id="batch_transform"></a>
ここまで、リクエストが来たものに対して同期的に推論を実行し予測結果を返すエンドポイントを見てきました。ただ、ユースケースによっては、推論の頻度は低いものの大量のデータを一気に推論にかけてしまいたい、というケースもあるかと思います。そんな時に最適なのが、バッチ変換（Batch Transform）という推論方式です。通常時インスタンスは稼働せず、バッチ変換のリクエストが来た時に推論用インスタンスを立て、終了したら自動でインスタンスは消去されます。

In [None]:
# Define an SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn.transformer(instance_count=1, instance_type="ml.m5.xlarge")

### 5.1. 入力データの準備
トレーニングデータから100行のランダムサンプルを10個抽出し、ラベル（Y）から特徴（X）を分割し、入力データをS3の所定の場所にアップロードします。

In [None]:
%%bash
# Randomly sample the iris dataset 10 times, then split X and Y
mkdir -p batch_data/XY batch_data/X batch_data/Y
for i in {0..9}; do
    cat data/iris.csv | shuf -n 100 > batch_data/XY/iris_sample_${i}.csv
    cat batch_data/XY/iris_sample_${i}.csv | cut -d',' -f2- > batch_data/X/iris_sample_X_${i}.csv
    cat batch_data/XY/iris_sample_${i}.csv | cut -d',' -f1 > batch_data/Y/iris_sample_Y_${i}.csv
done

In [None]:
# Upload input data from local file system to S3
batch_input_s3 = sagemaker_session.upload_data("batch_data/X", key_prefix=prefix + "/batch_input")

### 5.2.変換ジョブの実行 <a class="anchor" id="run_transform_job"></a>
Amazon S3にあるデータをまとめて推論します。

In [None]:
# Start a transform job and wait for it to finish
transformer.transform(batch_input_s3, content_type="text/csv")
print("Waiting for transform job: " + transformer.latest_transform_job.job_name)
transformer.wait()

### 5.3.出力データの確認

In [None]:
# Download the output data from S3 to local file system
batch_output = transformer.output_path
!mkdir -p batch_data/output
!aws s3 cp --recursive $batch_output/ batch_data/output/
# Head to see what the batch output looks like
!head batch_data/output/*

In [None]:
%%bash
# For each sample file, compare the predicted labels from batch output to the true labels
for i in {1..9}; do
    diff -s batch_data/Y/iris_sample_Y_${i}.csv \
        <(cat batch_data/output/iris_sample_X_${i}.csv.out | sed 's/[["]//g' | sed 's/, \|]/\n/g') \
        | sed "s/\/dev\/fd\/63/batch_data\/output\/iris_sample_X_${i}.csv.out/"
done