# SageMaker Pipelines

Amazon SageMaker Pipelinesでは以下をサポートしています。

* **Pipeline** - SageMakerジョブのオーケストレーションとリソース作成に用いるステップや条件の有向非巡回グラフ。
* **Processing Job Step** - 特徴量エンジニアリング、データ検証、モデル評価、モデル解釈などのデータ処理ワークロードを実行するための、SageMakerのシンプルなマネージドエクスペリエンス。
* **Training Job Step** - トレーニングデータセットからのサンプルを提示することで、予測を行うためのモデルを訓練する反復的なプロセス。
* **Conditional Step** - パイプライン内の条件付き実行分岐を提供する。
* **Registering Model** - Amazon SageMakerでデプロイできるモデルを作成するために使用されるモデルレジストリ内にモデルパッケージリソースを作成する。
* **Parameterized Execution** - パラメータを与えることでパイプラインの実行を調整可能にする。
* **Transform Job Step** - データセットのバッチ変換。データセットのトレーニングや推論を妨げるノイズやバイアスを除去するためにデータを前処理したり、大規模なデータセットの推論結果を得たり、永続的なエンドポイントを必要としないときに推論を実行するために利用する。

# SageMaker Studio拡張機能

SageMaker Studioは、実験、トレーニングジョブ、パイプラインなどのSageMakerリソースを視覚的に検査するための豊富な機能を備えています。

![](img/sm_studio_extensions_pipelines.png)



# BERTのパイプライン

処理（Processing）ステップでは、事前学習済み BERT モデルを使用して `review_body` テキストから BERT 埋め込みを作成するための特徴量エンジニアリングを実行し、データセットをトレーニング、検証、およびテストファイルに分割します。
Tensorflow のトレーニングに最適化するために、ファイルを TFRecord 形式で保存します。

トレーニング（Training）ステップでは、BERTモデルをCustomer Reviews Datasetに合わせてファインチューニングし、入力の `review_body` に対する `star_rating` を予測するために新しい分類レイヤーを追加します。

評価（Evaluation）ステップでは、トレーニングされたモデルとテストデータセットを入力とし、分類評価メトリクスを含むJSONファイルを生成します。

条件（Condition）ステップでは、評価ステップで求めたモデルの精度がある閾値を超えた場合に、このモデルを登録します。

![](img/bert_sagemaker_pipeline.png)

ここで作成するパイプラインは、前処理、トレーニング、評価、モデル登録という典型的な機械学習アプリケーションのパターンに従っています。

![A typical ML Application pipeline](img/pipeline-full.png)

In [None]:
from botocore.exceptions import ClientError

import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

import botocore.config

config = botocore.config.Config(
    user_agent_extra='dsoaws/1.0'
)

sm = boto3.Session().client(service_name="sagemaker", 
                            region_name=region,
                            config=config)
s3 = boto3.Session().client(service_name="s3", 
                            region_name=region,
                            config=config)
featurestore_runtime = boto3.Session().client(service_name="sagemaker-featurestore-runtime", 
                                              region_name=region,
                                              config=config)

# S3パスをセット（パブリックS3バケット）

In [None]:
s3_public_path_tsv = "s3://amazon-reviews-pds/tsv"

In [None]:
%store s3_public_path_tsv

# S3出力先パスをセット（アカウント内のプライベートS3バケット）

In [None]:
s3_private_path_tsv = "s3://{}/amazon-reviews-pds/tsv".format(bucket)
print(s3_private_path_tsv)

In [None]:
%store s3_private_path_tsv

# パブリックS3バケットからアカウント内のプライベートS3バケットにデータをコピー
データセット全体はとても大きいので、ここでは待ち時間を減らすため3つのファイルのみをコピーしましょう。

In [None]:
!aws s3 cp --recursive $s3_public_path_tsv/ $s3_private_path_tsv/ --exclude "*" --include "amazon_reviews_us_Digital_Software_v1_00.tsv.gz"
!aws s3 cp --recursive $s3_public_path_tsv/ $s3_private_path_tsv/ --exclude "*" --include "amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz"
!aws s3 cp --recursive $s3_public_path_tsv/ $s3_private_path_tsv/ --exclude "*" --include "amazon_reviews_us_Gift_Card_v1_00.tsv.gz"

# パイプラインを `Experiment` として追跡

In [None]:
import time

In [None]:
%store -r pipeline_name

try:
    print("Using existing pipeline: {}".format(pipeline_name))
except NameError:
    timestamp = int(time.time())
    pipeline_name = "BERT-pipeline-{}".format(timestamp)
    print("Created Pipeline Name: " + pipeline_name)

In [None]:
running_executions = 0
completed_executions = 0

try:
    existing_pipeline_executions_response = sm.list_pipeline_executions(
        PipelineName=pipeline_name,
        SortOrder="Descending",
    )

    if "PipelineExecutionSummaries" in existing_pipeline_executions_response.keys():
        if len(existing_pipeline_executions_response["PipelineExecutionSummaries"]) > 0:
            execution = existing_pipeline_executions_response["PipelineExecutionSummaries"][0]
            if "PipelineExecutionStatus" in execution:
                if execution["PipelineExecutionStatus"] == "Executing":
                    running_executions = running_executions + 1
                else:
                    completed_executions = completed_executions + 1

            print(
                "[INFO] You have {} Pipeline execution(s) currently running and {} execution(s) completed.".format(
                    running_executions, completed_executions
                )
            )
    else:
        print("[OK] Please continue.")
except:
    pass

if running_executions == 0:
    timestamp = int(time.time())
    pipeline_name = "BERT-pipeline-{}".format(timestamp)
    print("Created Pipeline Name: " + pipeline_name)

In [None]:
print(pipeline_name)

In [None]:
%store pipeline_name

In [None]:
%store -r pipeline_experiment_name

from smexperiments.experiment import Experiment

try:
    pipeline_experiment_name
except NameError:
    pipeline_experiment = Experiment.create(
        experiment_name=pipeline_name,
        description="Amazon Customer Reviews BERT Pipeline Experiment",
        sagemaker_boto_client=sm,
    )
    pipeline_experiment_name = pipeline_experiment.experiment_name
    print("Created Pipeline Experiment Name: {}".format(pipeline_experiment_name))

In [None]:
print(pipeline_experiment_name)

In [None]:
%store pipeline_experiment_name

# `Trial` の作成

In [None]:
from smexperiments.trial import Trial

In [None]:
%store -r pipeline_trial_name

try:
    pipeline_trial_name
except NameError:
    timestamp = int(time.time())
    pipeline_trial = Trial.create(
        trial_name="trial-{}".format(timestamp), experiment_name=pipeline_experiment_name, sagemaker_boto_client=sm
    )
    pipeline_trial_name = pipeline_trial.trial_name
    print("Created Trial Name: {}".format(pipeline_trial_name))

In [None]:
print(pipeline_trial_name)

In [None]:
%store pipeline_trial_name

# パイプライン実行の調整に用いるパラメータを定義

ワークフローパラメータを定義することで、パイプラインをパラメータ化し、パイプラインの定義を変更することなく、パイプラインの実行時に使用される設定を変化させることができます。

サポートされているパラメータタイプは以下の通りです。

* `ParameterString` - Pythonの `str` 型を表す。
* `ParameterInteger` - Pythonの `int` 型を表す。
* `ParameterFloat` - Pythonの `float` 型を表す。

これらのパラメータは、パイプラインの実行時にオーバーライド可能なデフォルト値の提供をサポートしています。
指定するデフォルト値は、そのパラメータの型のインスタンスでなければなりません。

In [None]:
from sagemaker.workflow.parameters import (
    ParameterString,
    ParameterInteger,
    ParameterFloat
)

# 特徴量エンジニアリングステップ

![Define a Processing Step for Feature Engineering](img/pipeline-2.png)

![](img/prepare_dataset_bert.png)

In [None]:
raw_input_data_s3_uri = "s3://{}/amazon-reviews-pds/tsv/".format(bucket)
print(raw_input_data_s3_uri)

In [None]:
!aws s3 ls $raw_input_data_s3_uri

# パイプラインパラメータを定義

これらのパラメータは、パイプライン全体で使用されます。

## BERT の最大シーケンス長
最大シーケンス長 `MaxSeqLength` は、レビューテキストの単語数分布に基づいて選択しました。

![](img/max_seq_length_viz.png)

In [None]:
import time

timestamp = int(time.time())

input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1,
)

processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.c5.2xlarge",
)

max_seq_length = ParameterInteger(
    name="MaxSeqLength",
    default_value=64,
)

balance_dataset = ParameterString(
    name="BalanceDataset",
    default_value="True",
)

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.90,
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.05,
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.05,
)

feature_store_offline_prefix = ParameterString(
    name="FeatureStoreOfflinePrefix",
    default_value="reviews-feature-store-" + str(timestamp),
)

feature_group_name = ParameterString(
    name="FeatureGroupName",
    default_value="reviews-feature-group-" + str(timestamp),
)

`SKLearnProcessor` のインスタンスを作成し、それを `ProcessingStep` で使用します。

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
)

### パイプラインステップのキャッシュをセットアップ

[ISO 8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) フォーマットを使用して、一定期間のパイプラインステップをキャッシュします。 

SageMakerパイプラインステップのキャッシュの詳細はこちらを参照してください: https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html

In [None]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

最後に、Scikit Learnプロセッサーのインスタンスを使って `ProcessingStep` を構築します。
これは、入出力のチャンネルと、処理ステップが実行されたときに走るコード `preprocess-scikit-text-to-bert-feature-store.py` を含みます。
既存のPython SDKに慣れている人にとっては、プロセッサーインスタンスの `run` メソッドと非常によく似ていることがわかるかと思います。

ステップ自体の入力データとして `ProcessingStep` に渡される `processing_input` パラメータに注目してください。
この入力データは、プロセッサーのインスタンスが実行される際に使用されます。

また、処理ジョブの出力設定で指定されている `"bert-train"`、`"bert-validation"`、`"bert-test"` という名前のチャンネルにも注意してください。
このようなステップの `Properties` は、後続のステップで使用することができ、実行時にはランタイムの値に解決されます。
特に、トレーニングステップを定義する際には、このような使い方をします。

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs = [
    ProcessingInput(
        input_name="raw-input-data",
        source=input_data,
        destination="/opt/ml/processing/input/data/",
        s3_data_distribution_type="ShardedByS3Key",
    )
]

processing_outputs = [
    ProcessingOutput(
        output_name="bert-train",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/bert/train",
    ),
    ProcessingOutput(
        output_name="bert-validation",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/bert/validation",
    ),
    ProcessingOutput(
        output_name="bert-test",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/bert/test",
    ),
]

processing_step = ProcessingStep(
    name="Processing",
    code="preprocess-scikit-text-to-bert-feature-store.py",
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=[
        "--train-split-percentage",
        str(train_split_percentage.default_value),
        "--validation-split-percentage",
        str(validation_split_percentage.default_value),
        "--test-split-percentage",
        str(test_split_percentage.default_value),
        "--max-seq-length",
        str(max_seq_length.default_value),
        "--balance-dataset",
        str(balance_dataset.default_value),
        "--feature-store-offline-prefix",
        str(feature_store_offline_prefix.default_value),
        "--feature-group-name",
        str(feature_group_name.default_value),
    ],
    cache_config=cache_config
)

print(processing_step)

# トレーニングステップ

![Define a Training Step to Train a Model](img/pipeline-3.png)

In [None]:
train_instance_type = ParameterString(name="TrainInstanceType", default_value="ml.c5.9xlarge")

train_instance_count = ParameterInteger(name="TrainInstanceCount", default_value=1)

# トレーニングのハイパーパラメータのセットアップ

なお、`max_seq_length` は、上記の処理ステップ用のハイパーパラメータから再利用されています。

In [None]:
epochs = ParameterInteger(name="Epochs", default_value=1)

learning_rate = ParameterFloat(name="LearningRate", default_value=0.00001)

epsilon = ParameterFloat(name="Epsilon", default_value=0.00000001)

train_batch_size = ParameterInteger(name="TrainBatchSize", default_value=128)

validation_batch_size = ParameterInteger(name="ValidationBatchSize", default_value=128)

test_batch_size = ParameterInteger(name="TestBatchSize", default_value=128)

train_steps_per_epoch = ParameterInteger(name="TrainStepsPerEpoch", default_value=50)

validation_steps = ParameterInteger(name="ValidationSteps", default_value=50)

test_steps = ParameterInteger(name="TestSteps", default_value=50)

train_volume_size = ParameterInteger(name="TrainVolumeSize", default_value=256)

use_xla = ParameterString(
    name="UseXLA",
    default_value="True",
)

use_amp = ParameterString(
    name="UseAMP",
    default_value="True",
)

freeze_bert_layer = ParameterString(
    name="FreezeBERTLayer",
    default_value="False",
)

enable_sagemaker_debugger = ParameterString(
    name="EnableSageMakerDebugger",
    default_value="False",
)

enable_checkpointing = ParameterString(
    name="EnableCheckpointing",
    default_value="False",
)

enable_tensorboard = ParameterString(
    name="EnableTensorboard",
    default_value="False",
)

input_mode = ParameterString(
    name="InputMode",
    default_value="File",
)

run_validation = ParameterString(
    name="RunValidation",
    default_value="True",
)

run_test = ParameterString(
    name="RunTest",
    default_value="False",
)

run_sample_predictions = ParameterString(
    name="RunSamplePredictions",
    default_value="False",
)

### モデルパフォーマンスを追跡するためのメトリクスのセットアップ

次のサンプルのログの行は...
```
[step: 100] val_loss: 0.55 - val_accuracy: 74.64%
```

...次のメトリクスをCloudWatchに生成します。

`validation:loss` =  0.55

`validation:accuracy` = 74.64

<img src="img/cloudwatch_validation_metrics.png" align="left">

In [None]:
metrics_definitions = [
    {"Name": "train:loss", "Regex": "loss: ([0-9\\.]+)"},
    {"Name": "train:accuracy", "Regex": "accuracy: ([0-9\\.]+)"},
    {"Name": "validation:loss", "Regex": "val_loss: ([0-9\\.]+)"},
    {"Name": "validation:accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"},
]

### デバッガーとプロファイラーのセットアップ

こちらのドキュメントの通りにDebuggerのルールを定義します:  https://docs.aws.amazon.com/sagemaker/latest/dg/debugger-built-in-rules.html

In [None]:
from sagemaker.debugger import Rule, ProfilerRule, rule_configs
from sagemaker.debugger import DebuggerHookConfig
from sagemaker.debugger import ProfilerConfig, FrameworkProfile

debugger_hook_config = DebuggerHookConfig(
    s3_output_path="s3://{}".format(bucket),
)

profiler_config = ProfilerConfig(
    system_monitor_interval_millis=500,
    framework_profile_params=FrameworkProfile(local_path="/opt/ml/output/profiler/", start_step=5, num_steps=10),
)

In [None]:
rules = [ProfilerRule.sagemaker(rule_configs.ProfilerReport())]

### Estimatorの作成

Estimatorと入力データセットの設定を行います。
典型的なトレーニングスクリプトでは、入力チャンネルからデータを読み込み、ハイパーパラメータを使ってトレーニングを設定し、モデルをトレーニングして、後でホストできるようにモデルを`model_dir`に保存します。

また、トレーニングしたモデルを保存するモデルパスも指定します。

なお、渡された `train_instance_type` パラメータは、パイプラインの他の場所でも使用されたり、渡されたりする可能性があります。
この例では、`train_instance_type` はestimatorに渡されます。

In [None]:
from sagemaker.tensorflow import TensorFlow

estimator = TensorFlow(
    entry_point="tf_bert_reviews.py",
    source_dir="src",
    role=role,
    instance_count=train_instance_count,  # 少なくともこの数の入力ファイルがあることを確認してください。利用可能なデータがないとShardedByS3Key分散戦略のジョブは失敗します。
    instance_type=train_instance_type,
    volume_size=train_volume_size,
    py_version="py37",
    framework_version="2.3.1",
    hyperparameters={
        "epochs": epochs,
        "learning_rate": learning_rate,
        "epsilon": epsilon,
        "train_batch_size": train_batch_size,
        "validation_batch_size": validation_batch_size,
        "test_batch_size": test_batch_size,
        "train_steps_per_epoch": train_steps_per_epoch,
        "validation_steps": validation_steps,
        "test_steps": test_steps,
        "use_xla": use_xla,
        "use_amp": use_amp,
        "max_seq_length": max_seq_length,
        "freeze_bert_layer": freeze_bert_layer,
        "enable_sagemaker_debugger": enable_sagemaker_debugger,
        "enable_checkpointing": enable_checkpointing,
        "enable_tensorboard": enable_tensorboard,
        "run_validation": run_validation,
        "run_test": run_test,
        "run_sample_predictions": run_sample_predictions,
    },
    input_mode=input_mode,
    metric_definitions=metrics_definitions,
    debugger_hook_config=debugger_hook_config,
    profiler_config=profiler_config,
    rules=rules,
)

### トレーニンングステップの設定

最後に、estimatorインスタンスを使って、`TrainingStep` を構築します。
`TrainingStep`の入力としては、先行する `ProcessingStep` の `Properties` を用います。
既存のPython SDKに精通している人にとっては、estimatorの`fit`メソッドに非常に似ていることがわかるかと思います。

特に、`TrainingStep` には、`"train"`、`"validation"`、`"test"`の出力チャンネルの `S3Uri` を渡しています。
ワークフローステップの `properties` 属性は、describeコールの対応するレスポンスのオブジェクトモデルにマッチします。
これらのプロパティは、プレースホルダー値として参照することができ、実行時に解決されるか、または記入されます。
例えば、`ProcessingStep` の `properties` 属性は、[DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) レスポンスオブジェクトのオブジェクトモデルにマッチします。

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name="Train",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["bert-train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["bert-validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["bert-test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)

print(training_step)

# 評価ステップ

![Define a Model Evaluation Step to Evaluate the Trained Model](img/pipeline-4.png)

まず、モデルの評価を行うProcessingのステップで指定される評価スクリプトを開発します．

評価スクリプト `evaluation.py` は、トレーニング済みモデルとテストデータセットを入力とし、accuracyなどの分類評価メトリクスを含むJSONファイルを生成します。

パイプラインの実行後には、生成された `evaluation.json` を調べて分析を行います。

評価スクリプトでは、以下を行います。

* モデルの読み込み
* テストデータの読み込み
* テストデータに対する複数の予測を実行
* accuracyなどを含んだ分類レポートを作成する
* 評価レポートを評価ディレクトリに保存

次に、`ScriptProcessor`プロセッサーのインスタンスを作成し、それを`ProcessingStep`で使用します。

`processing_instance_type` パラメータがプロセッサーに渡されていることに注意してください。


In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
    max_runtime_in_seconds=7200,
)

プロセッサーのインスタンスを使用して、`ProcessingStep` を構築します。
同時に、入出力チャンネルと、パイプライン実行の際に実行されるコードを渡します。
既存のPython SDKに慣れている人にとっては、プロセッサーインスタンスの `run` メソッドと非常によく似ていることがわかるかと思います。

`TrainingStep` と `ProcessingStep` の `properties` 属性は、それぞれ [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) と [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) レスポンスオブジェクトのオブジェクトモデルにマッチします。

In [None]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(name="EvaluationReport", output_name="metrics", path="evaluation.json")

In [None]:
evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    code="evaluate_model_metrics.py",
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/input/model",
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingInputs["raw-input-data"].S3Input.S3Uri,
            destination="/opt/ml/processing/input/data",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="metrics", s3_upload_mode="EndOfJob", source="/opt/ml/processing/output/metrics/"
        ),
    ],
    job_arguments=[
        "--max-seq-length",
        str(max_seq_length.default_value),
    ],
    property_files=[evaluation_report],
)

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

print(model_metrics)

# モデル登録ステップ

![](img/pipeline-5.png)

トレーニングステップで使用したestimatorインスタンスを使って、`RegisterModel`のインスタンスを構築します。
パイプラインで `RegisterModel` を実行した結果はモデルパッケージ（Model Package）になります。
モデルパッケージは、推論に必要なすべての要素をパッケージ化した、再利用可能なモデルアーティファクトの抽象化です。
主に、使用する推論イメージを定義する推論仕様と、オプションとしてモデルの重みのパスで構成されています。

モデルパッケージグループは、モデルパッケージの集合体です。
特定のMLビジネス問題のためにモデルパッケージグループを作成し、そこにバージョンやモデルパッケージを追加していくことができます。
通常、お客様はSageMakerワークフローパイプラインのためにModelPackageGroupを作成し、ワークフローパイプラインを実行するたびにバージョン/モデルパッケージをグループに追加し続けることを想定しています。

`RegisterModel`の構造は、既存のPython SDKに慣れている人にとっては、Estimatorインスタンスの`register`メソッドと非常によく似ていることがわかると思います。

特に、`TrainingStep` のプロパティから `S3ModelArtifacts` を渡しています。
`TrainingStep`の`properties`属性は、[DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html)レスポンスオブジェクトのオブジェクトモデルと一致しています。

後にモデルレジストリやCI/CD作業で使用する特定のモデルパッケージグループ名を提供したことに注意してください。

In [None]:
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

deploy_instance_type = ParameterString(name="DeployInstanceType", default_value="ml.m5.4xlarge")

deploy_instance_count = ParameterInteger(name="DeployInstanceCount", default_value=1)

In [None]:
model_package_group_name = f"BERT-Reviews-{timestamp}"

print(model_package_group_name)

In [None]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version="2.3.1",
    instance_type=deploy_instance_type,
    image_scope="inference",
)
print(inference_image_uri)

In [None]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegisterModel",
    estimator=estimator,
    image_uri=inference_image_uri,  # デフォルトではトレーニングイメージを使用するため、必ず指定する
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/jsonlines"],
    response_types=["application/jsonlines"],
    inference_instances=[deploy_instance_type],
    transform_instances=["ml.m5.4xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

# デプロイステップのためのモデルの作成

![](img/pipeline-5.png)


In [None]:
from sagemaker.model import Model

model_name = "bert-model-{}".format(timestamp)

model = Model(
    name=model_name,
    image_uri=inference_image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

In [None]:
from sagemaker.inputs import CreateModelInput

create_inputs = CreateModelInput(
    instance_type=deploy_instance_type,
)

In [None]:
from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=create_inputs,
)

# 条件付きデプロイステップ
![](img/pipeline-6.png)

最後に、評価ステップで測定したモデルのaccuracyが所定の閾値を超えた場合にのみ、このモデルを登録するようにしたいと思います。
`ConditionStep`により、ステップのプロパティの条件に基づいて、パイプラインのDAG内での条件付き実行をサポートします。

以降では、次の3つを実行します。

* 評価ステップの出力に含まれるaccuracy値に対して、`ConditionGreaterThan`を定義
* `ConditionStep` の条件リストで、この条件を使用
* `RegisterModel` ステップコレクションを `ConditionStep` の `if_steps` に渡す

In [None]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

min_accuracy_value = ParameterFloat(name="MinAccuracyValue", default_value=0.10)

minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=min_accuracy_value,  # accuracy
)

minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step],  # 条件を満たしたらモデル登録に進む
    else_steps=[],  # 満たさなかったらパイプラインを終了する
)

# パラメータ、ステップ、条件のパイプラインを定義する

これらをワークフローのパイプラインに結びつけて、実行したり、スケジューリングしたりできるようにしましょう。

パイプラインには、`name`、`parameters`、`steps`が必要です。
名前は `(account, region)` のペアの中でユニークでなければならないので、名前にタイムスタンプを加えます。

以下に注意してください。

* 定義時に使用されているすべてのパラメータが存在する必要がある。
* パイプラインに渡されるステップは、実行順である必要はない。SageMaker Workflowサービスが、ステップ間の _データ依存性_ のDAGを解決する。
* ステップは、パイプラインのステップリストまたは単一の条件ステップのif/elseリストのいずれかに一意でなければならない。

# SageMakerへパイプラインを送信して実行

パイプラインの定義をワークフローサービスに送信してみましょう。
渡されたロールは、ステップで定義されたすべてのジョブを作成するために、ワークフローサービスによって使用されます。

# パイプラインを作成

### _以下の `WARNING` は無視してください。_

In [None]:
from sagemaker.workflow.pipeline import Pipeline

existing_pipelines = 0

existing_pipelines_response = sm.list_pipelines(
    PipelineNamePrefix=pipeline_name,
    SortOrder="Descending",
)

if "PipelineSummaries" in existing_pipelines_response.keys():
    if len(existing_pipelines_response["PipelineSummaries"]) > 0:
        existing_pipelines = existing_pipelines + 1
        print("[INFO] You already have created {} pipeline with name {}.".format(existing_pipelines, pipeline_name))
    else:
        pass

if existing_pipelines == 0:  # パイプラインの作成は一度だけ
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            input_data,
            processing_instance_count,
            processing_instance_type,
            max_seq_length,
            balance_dataset,
            train_split_percentage,
            validation_split_percentage,
            test_split_percentage,
            feature_store_offline_prefix,
            feature_group_name,
            train_instance_type,
            train_instance_count,
            epochs,
            learning_rate,
            epsilon,
            train_batch_size,
            validation_batch_size,
            test_batch_size,
            train_steps_per_epoch,
            validation_steps,
            test_steps,
            train_volume_size,
            use_xla,
            use_amp,
            freeze_bert_layer,
            enable_sagemaker_debugger,
            enable_checkpointing,
            enable_tensorboard,
            input_mode,
            run_validation,
            run_test,
            run_sample_predictions,
            min_accuracy_value,
            model_approval_status,
            deploy_instance_type,
            deploy_instance_count,
        ],
        steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
        sagemaker_session=sess,
    )

    pipeline.create(role_arn=role)["PipelineArn"]
    print("Created pipeline with name {}".format(pipeline_name))
else:
    print(
        "****************************************************************************************************************"
    )
    print(
        "You have already create a pipeline with the name {}. This is OK. Please continue to the next cell.".format(
            pipeline_name
        )
    )
    print(
        "****************************************************************************************************************"
    )

### _ここの `WARNING` は無視してください。_

# パイプラインの実行

### _以下での `WARNING` は無視してください。_

In [None]:
running_executions = 0
completed_executions = 0

if existing_pipelines > 0:
    existing_pipeline_executions_response = sm.list_pipeline_executions(
        PipelineName=pipeline_name,
        SortOrder="Descending",
    )

    if "PipelineExecutionSummaries" in existing_pipeline_executions_response.keys():
        if len(existing_pipeline_executions_response["PipelineExecutionSummaries"]) > 0:
            execution = existing_pipeline_executions_response["PipelineExecutionSummaries"][0]
            if "PipelineExecutionStatus" in execution:
                if execution["PipelineExecutionStatus"] == "Executing":
                    running_executions = running_executions + 1
                else:
                    completed_executions = completed_executions + 1

            print(
                "[INFO] You have {} Pipeline execution(s) currently running and {} execution(s) completed.".format(
                    running_executions, completed_executions
                )
            )
    else:
        pass
else:
    pass

if running_executions == 0:  # リソースの使用を制限するため同時のパイプライン実行はひとつに絞る
    execution = pipeline.start(
        parameters=dict(
            InputData=raw_input_data_s3_uri,
            ProcessingInstanceCount=1,
            ProcessingInstanceType="ml.c5.2xlarge",
            MaxSeqLength=64,
            BalanceDataset="True",
            TrainSplitPercentage=0.9,
            ValidationSplitPercentage=0.05,
            TestSplitPercentage=0.05,
            FeatureStoreOfflinePrefix=feature_store_offline_prefix,
            FeatureGroupName=feature_group_name,
            LearningRate=0.000012,
            TrainInstanceType="ml.c5.9xlarge",
            TrainInstanceCount=1,
            Epochs=1,
            Epsilon=0.00000001,
            TrainBatchSize=128,
            ValidationBatchSize=128,
            TestBatchSize=128,
            TrainStepsPerEpoch=50,
            ValidationSteps=50,
            TestSteps=50,
            TrainVolumeSize=256,
            UseXLA="True",
            UseAMP="True",
            FreezeBERTLayer="False",
            EnableSageMakerDebugger="False",
            EnableCheckpointing="False",
            EnableTensorboard="False",
            InputMode="File",
            RunValidation="True",
            RunTest="False",
            RunSamplePredictions="False",
            MinAccuracyValue=0.10,
            ModelApprovalStatus="PendingManualApproval",
            DeployInstanceType="ml.m5.4xlarge",
            DeployInstanceCount=1,
        )
    )
    running_executions = running_executions + 1
    print("Started pipeline {}.  Ignore any warnings above.".format(pipeline_name))
    print(execution.arn)
else:
    print(
        "********************************************************************************************************************"
    )
    print(
        "You have already launched {} pipeline execution(s).  This is OK.  Please continue to see the next cell.".format(
            running_executions
        )
    )
    print(
        "********************************************************************************************************************"
    )

### _^^ ここ ^^ での `WARNING` はすべて無視してください。_

# パイプラインの完了まで待機してください。

### _次のセルの実行には40分程度かかります。しばらくお待ちください。_

In [None]:
%%time

import time
from pprint import pprint

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

while pipeline_execution_status == "Executing":
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
        pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
    except Exception as e:
        print("Please wait...")
        time.sleep(30)

pprint(executions_response)

### _^^ ここ ^^ のパイプライン実行が完了するまでお待ちください。_

# 完了したらパイプライン実行ステップとステータスを表示

In [None]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
pipeline_execution_arn = executions_response[0]["PipelineExecutionArn"]

print("Pipeline execution status {}".format(pipeline_execution_status))
print("Pipeline execution arn {}".format(pipeline_execution_arn))

In [None]:
from pprint import pprint

steps = sm.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)

pprint(steps)

# パイプラインで生成されたすべてのアーティファクトを表示

In [None]:
processing_job_name = None
training_job_name = None

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(steps["PipelineExecutionSteps"]):
    print(execution_step)
    # We are doing this because there appears to be a bug of this LineageTableVisualizer handling the Processing Step
    if execution_step["StepName"] == "Processing":
        processing_job_name = execution_step["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1]
        print(processing_job_name)
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step["StepName"] == "Train":
        training_job_name = execution_step["Metadata"]["TrainingJob"]["Arn"].split("/")[-1]
        print(training_job_name)
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)

## パイプライン実行をExperimentのTrialとして追加

In [None]:
# -aws-processing-job is the default name assigned by ProcessingJob
processing_job_tc = "{}-aws-processing-job".format(processing_job_name)
print(processing_job_tc)

In [None]:
response = sm.associate_trial_component(TrialComponentName=processing_job_tc, TrialName=pipeline_trial_name)

In [None]:
# -aws-training-job is the default name assigned by TrainingJob
training_job_tc = "{}-aws-training-job".format(training_job_name)
print(training_job_tc)

In [None]:
response = sm.associate_trial_component(TrialComponentName=training_job_tc, TrialName=pipeline_trial_name)

# SageMaker Debuggerの結果を分析

In [None]:
restored_estimator = sagemaker.estimator.Estimator.attach(training_job_name)

In [None]:
from IPython.core.display import display, HTML

display(
    HTML(
        '<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}?prefix={}/">S3 Debugger Output Data</a></b>'.format(
            bucket, restored_estimator.base_job_name
        )
    )
)

# SageMaker Debuggerのプロファイリングレポートをダウンロード

In [None]:
profiler_report_s3_uri = "s3://{}/{}/rule-output/ProfilerReport/profiler-output".format(
    bucket, restored_estimator.base_job_name
)

In [None]:
!aws s3 ls $profiler_report_s3_uri/

In [None]:
!aws s3 cp --recursive $profiler_report_s3_uri ./generated_profiler_report/

In [None]:
from IPython.core.display import display, HTML

display(
    HTML('<b>Review <a target="blank" href="./generated_profiler_report/profiler-report.html">Profiler Report</a></b>')
)

### _^^ Click `Trust HTML` in the profiler-report.html tab that opens ^^_

# SageMaker Studio内でプロファイリングレポートをレビュー

![SageMaker Studio Extensions](img/studio_pipeline_training_debugger_assigned.png)

# SageMaker Studio内でパイプラインをレビュー
![SageMaker Studio Extensions](img/sm_studio_extensions_pipelines.png)

# リソースを解放

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>