# Amazon SageMaker Studio ノートブックジョブ機能ハンズオン

本ハンズオンは SageMaker Studio ノートブックの `Python 3 (Data Science 2.0)` カーネル、`ml.t3.medium (2 vCPU + 4 GiB)` インスタンスで検証しています。

In [None]:
# parameters for notebook job
env = "local"
experiment_name = "diamond-price-prediction"
train_data_uri = ""
unseen_data_uri = ""
random_seed = 42

In [None]:
base_run_name = "experiment-run-" + env

## 事前準備

本ハンズオンを実施するには SageMaker の実行ロールに `sagemaker.amazonaws.com` および `events.amazon.aws.com` に対する信頼関係をセットする必要があります。
後者は Amazon EventBridge サービスの URL です。
ノートブックジョブを定期実行する際、裏側で EventBridge の仕組みを利用するため必要となります。

以降のセルでハンズオン用の実行ロールを作成します。
その他、セットする必要がある IAM ポリシーのリストなど、権限周りの最新情報は[公式ドキュメント](https://docs.aws.amazon.com/sagemaker/latest/dg/scheduled-notebook-policies.html)を参照してください。


### ライブラリのインポート

In [None]:
!pip install --upgrade "boto3>=1,<2" "sagemaker>=2,<3" "pycaret>=2,<3" "awswrangler>=2,<3"

In [None]:
import sys
import json
from pprint import pprint
from logging import (
    getLogger,
    StreamHandler,
    INFO,
)
# aws
import boto3
from botocore.exceptions import ClientError
from sagemaker.session import Session
from sagemaker.utils import unique_name_from_base
from sagemaker.s3 import (
    S3Uploader,
    S3Downloader,
)
import awswrangler as wr
from sagemaker.experiments import Run
# ml
from pycaret.datasets import get_data
from pycaret.regression import (
    setup,
    compare_models,
    models,
    create_model,
    tune_model,
    plot_model,
    predict_model,
    finalize_model,
    save_model,
    load_model,
    pull,
)
from pycaret.utils import check_metric
import numpy as np
import pandas as pd

logger = getLogger(__name__)
handler = StreamHandler(sys.stdout)
handler.setLevel(INFO)
logger.addHandler(handler)

iam = boto3.resource('iam')
sts = boto3.client('sts')

sagemaker_session = Session()
bucket = sagemaker_session.default_bucket()
prefix = "NotebookJobExample"
account_id = sts.get_caller_identity().get("Account")
region = boto3.session.Session().region_name

### ハンズオン実行用の IAM ポリシーとロールの作成

#### IAM ポリシーの作成

In [None]:
if env == "local":
    policy_name = "AmazonSageMakerStudioNotebookJobExamplePolicy"
    description = "This policy is required to execute a hands-on notebook for SageMaker Studio Notebook Job, " \
        + "https://github.com/kmotohas/amazon-sagemaker-studio-notebook-job-example ."

    policy_document = {
       "Version":"2012-10-17",
       "Statement":[
          {
             "Effect":"Allow",
             "Action":"iam:PassRole",
             "Resource":"arn:aws:iam::*:role/*",
             "Condition":{
                "StringLike":{
                   "iam:PassedToService":[
                      "sagemaker.amazonaws.com",
                      "events.amazonaws.com"
                   ]
                }
             }
          },
          {
             "Effect":"Allow",
             "Action":[
                "events:TagResource",
                "events:DeleteRule",
                "events:PutTargets",
                "events:DescribeRule",
                "events:PutRule",
                "events:RemoveTargets",
                "events:DisableRule",
                "events:EnableRule"
             ],
             "Resource":"*",
             "Condition":{
                "StringEquals":{
                   "aws:ResourceTag/sagemaker:is-scheduling-notebook-job":"true"
                }
             }
          },
          {
             "Effect":"Allow",
             "Action":[
                "s3:CreateBucket",
                "s3:PutBucketVersioning",
                "s3:PutEncryptionConfiguration"
             ],
             "Resource":"arn:aws:s3:::sagemaker-automated-execution-*"
          },
          {
             "Effect":"Allow",
             "Action":[
                "sagemaker:ListTags"
             ],
             "Resource":[
                "arn:aws:sagemaker:*:*:user-profile/*",
                "arn:aws:sagemaker:*:*:space/*",
                "arn:aws:sagemaker:*:*:training-job/*",
                "arn:aws:sagemaker:*:*:pipeline/*"
             ]
          },
          {
             "Effect":"Allow",
             "Action":[
                "sagemaker:AddTags"
             ],
             "Resource":[
                "arn:aws:sagemaker:*:*:training-job/*",
                "arn:aws:sagemaker:*:*:pipeline/*"
             ]
          },
          {
             "Effect":"Allow",
             "Action":[
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:CreateVpcEndpoint",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcEndpoints",
                "ec2:DescribeVpcs",
                "ecr:BatchCheckLayerAvailability",
                "ecr:BatchGetImage",
                "ecr:GetDownloadUrlForLayer",
                "ecr:GetAuthorizationToken",
                "s3:ListBucket",
                "s3:GetBucketLocation",
                "s3:GetEncryptionConfiguration",
                "s3:PutObject",
                "s3:DeleteObject",
                "s3:GetObject",
                "sagemaker:DescribeDomain",
                "sagemaker:DescribeUserProfile",
                "sagemaker:DescribeSpace",
                "sagemaker:DescribeStudioLifecycleConfig",
                "sagemaker:DescribeImageVersion",
                "sagemaker:DescribeAppImageConfig",
                "sagemaker:CreateTrainingJob",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:Search",
                "sagemaker:CreatePipeline",
                "sagemaker:DescribePipeline",
                "sagemaker:DeletePipeline",
                "sagemaker:StartPipelineExecution"
             ],
             "Resource":"*"
          }
       ]
    }

    policy_arn: str = None

    try:
        policy = iam.create_policy(
            PolicyName=policy_name,
            Description=description,
            PolicyDocument=json.dumps(policy_document),
        )
        logger.info(f"Created policy {policy.arn}")
        policy_arn = policy.arn
    except ClientError as e:
        logger.warning(f"Cloudn't create policy {policy_name}")
        if e.response["Error"]["Code"] == "EntityAlreadyExists":
            logger.warning(f"{policy_name} already exists")
            policy_arn = f"arn:aws:iam::{account_id}:policy/{policy_name}"
            logger.warning(policy_arn)
        else:
            logger.exception(e)
            raise

#### IAM ロールの作成

In [None]:
if env == "local":
    role_name = "AmazonSageMakerStudioNotebookJobExampleRole"

    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "sagemaker.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            },
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "events.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }

    role_arn: str = None

    try:
        role = iam.create_role(
            RoleName=role_name,
            Path="/service-role/",
            AssumeRolePolicyDocument=json.dumps(trust_policy)
        )
        logger.info(f"Created role {role.arn}")
        role_arn = role.arn
    except ClientError as e:
        logger.warning(f"Cloudn't create role {role_name}")
        if e.response["Error"]["Code"] == "EntityAlreadyExists":
            logger.warning(f"{role_name} already exists")
            role_arn = f"arn:aws:iam::{account_id}:role/service-role/{role_name}"
            logger.warning(role_arn)
        else:
            logger.exception(e)
            raise

#### ロールにポリシーをアタッチ

In [None]:
if env == "local":
    try:
        iam.Role(role_name).attach_policy(PolicyArn=policy_arn)
        logger.info(f"Attached policy {policy_arn} to role {role_name}")
    except ClientError:
        logger.exception(f"Couldn't attach policy {policy_arn} to role {role_name}")
        raise

## サンプルデータの準備

ダイアモンドの価格データセットを用います。
以降のコードは [PyCaret のビギナー向け回帰モデル開発チュートリアル](https://pycaret.gitbook.io/docs/get-started/tutorials)に沿って記述されています。

In [None]:
if env == "local":
    dataset = get_data("diamond")

サンプル数は6000、予測対象の Price を含むカラム数は8つ。

In [None]:
if env == "local":
    print(dataset.shape)

学習用のトレーニングデータと予測用のデータに分けます。

In [None]:
if env == "local":
    train_data = dataset.sample(frac=0.9, random_state=int(random_seed))
    unseen_data = dataset.drop(train_data.index)

    train_data.reset_index(drop=True, inplace=True)
    unseen_data.reset_index(drop=True, inplace=True)

    print(f"Data for Modeling: {train_data.shape}")
    print(f"Unseen Data For Predictions: {unseen_data.shape}")

Pandas DataFrame を CSV として保存します。
ここでは S3 への CSV のエクスポートを AWS SDK for Pandas (awswrangler) を用いています。

In [None]:
if env == "local":
    train_data_uri = f"s3://{bucket}/{prefix}/data/train"
    unseen_data_uri = f"s3://{bucket}/{prefix}/data/unseen"
    wr.s3.to_csv(train_data, train_data_uri + "/0001.csv", index=False)
    wr.s3.to_csv(unseen_data, unseen_data_uri + "/0001.csv", index=False)
    logger.info(f"Training data is uploaded to {train_data_uri}/0001.csv")
    logger.info(f"Unseen data is uploaded to {unseen_data_uri}/0001.csv")

ノートブックジョブ実行のためにデータセットを S3 にアップロードします。

#### ノートブックジョブ実行時は S3 からデータを取得する

In [None]:
if env == "job":
    train_data = wr.s3.read_csv(train_data_uri)
    unseen_data = wr.s3.read_csv(unseen_data_uri)

## SageMaker Experiments を用いた PyCaret の実験管理

ここでは [PyCaret](https://pycaret.gitbook.io/docs/) という AutoML ツールを用いてダイヤモンド価格予測モデルを構築します。
また、実験管理のために SageMaker Experiments というツールを用います。

SageMaker Experiments はフルマネージドのサービスであって、実感管理のために追加のサーバーを管理することなく利用できます。
SageMaker Studio の UI から管理画面を開くことができ、実験結果を閲覧したり、異なる実験間のメトリクスを比較したりできます。

SageMaker Python SDK ではそれぞれの実験を `Run` として扱い、`log_parameter`、`log_parameters`、`log_metric`、`log_file`、`log_artifact` などのメソッドを用いて実験のメトリクスやパラメーター、入出力ファイル、アーティファクトを記録します。

In [None]:
%%time
#from sagemaker.experiments import Run

run_name = unique_name_from_base(base_run_name)
output_path = f"s3://{bucket}/{prefix}/{experiment_name}/{run_name}/"

with Run(
    experiment_name=experiment_name,
    run_name=run_name,
) as run:
    # パラメーターをまとめて記録
    run.log_parameters(
        {
            "num_train_samples": len(train_data),
            "num_test_samples": len(unseen_data),
        }
    )
    
    # 入力ファイルを S3 にアップロードして記録
    #run.log_file(file_path="train_data.csv", name="Train Data", is_output=False)
    #run.log_file(file_path="unseen_data.csv", name="Unseen Data", is_output=False)
    
    # PyCaret の環境の初期化と特徴量変換パイプラインの作成
    pycaret_experiment = setup(
        data=train_data,
        target="Price",
        session_id=int(random_seed),
        silent=True,
    )
    
    # 20以上のモデルをデフォルトのハイパーパラメーターで学習して比較し、最良のモデルを判別
    logger.info("comparing models...")
    best_model = compare_models()
    run.log_parameter("best_model", best_model.__class__.__name__)
    
    # 事前定義済みのパラメーター空間をランダムグリッドサーチで探索してチューニング
    logger.info("tuning best model...")
    tuned_model = tune_model(best_model)
    
    # テストデータとしてホールドアウトされたサンプルで性能評価
    logger.info("evaluating model on test data")
    test_predictions = predict_model(tuned_model)  # 推論結果は返すがメトリクスは表示するだけ
    
    # MAE, MSE, RMSE, R2, RMSLE, MAPE といったメトリクスの計算結果のテーブルを取得
    test_metrics = pull()  # pycaret.regression.pull() は直前に表示されたテーブルを返す
    # テストデータに対する評価メトリクスを記録
    for k, v in test_metrics.to_dict("records")[0].items():
        if k != "Model":
            run.log_metric(name=f"Test:{k}", value=v)
            
    # 性能評価のためのプロットを作成
    logger.info("generating plots...")
    residuals = plot_model(tuned_model, save=True)
    prediction_error = plot_model(tuned_model, plot="error", save=True)
    feature_importance = plot_model(tuned_model, plot="feature", save=True)
    # 作成したプロットを記録
    run.log_file(file_path=residuals, name="Residuals", is_output=True)
    run.log_file(file_path=prediction_error, name="Prediction Error", is_output=True)
    run.log_file(file_path=feature_importance, name="Feature Importance", is_output=True)
    
    # ホールドアウトサンプルもすべて含めて学習
    logger.info("finalizing model...")
    final_model = finalize_model(tuned_model)
    # モデルのハイパーパラメーターを記録
    model_parameters = final_model.get_params()
    
    def convert_value_types_to_log_parameters(params: dict):
        """ log_parameters で記録できるようキャストする """
        for k, v in params.items():
            if "numpy" in str(type(v)):
                params[k] = v.item()
            elif type(v) == bool or v is None:
                params[k] = str(v)
        return params
    
    model_parameters = convert_value_types_to_log_parameters(model_parameters)
    run.log_parameters(model_parameters)
    
    # 学習したモデルを用いて予測対象のデータセットに対して予測を実行
    logger.info("making predictions on unceen data...")
    predictions = predict_model(final_model, data=unseen_data)
    predictions.to_csv("predictions.csv", index=False)
    # 予測結果の CSV を記録
    run.log_file(file_path="predictions.csv", name="predictions.csv", is_output=True)
    
    # 学習したモデルと特徴量変換パイプラインを pickle として保存
    logger.info("saving final model...")
    save_model(final_model, "model")
    # 保存した pickle を S3 にアップロードして記録
    run.log_file(file_path="model.pkl", name="model.pkl", is_output=True)