* CodeCommitのリポジトリは同様のものを使う -> バケットも共通
* パイプラインは新しいものを使う
* Lambdaはそのまま

# 2. MLOpsエンジニアによる実験パイプラインの構築
本ノートブックでは、データサイエンティストの実験を支援する実験パイプラインを構築します。
この実験パイプラインの構築は、MLOpsエンジニアによって実行されます。
AWSマネジメントコンソールから実施することができますが、今回はノートブック上でboto3を用いて構築を行います。

### 参考：MLOpsエンジニア

https://docs.aws.amazon.com/wellarchitected/latest/machine-learning-lens/mloe-02.html

MLOps engineer — Builds and manages automation pipelines to operationalize the ML platform and ML pipelines for fully/partially automated CI/CD pipelines. These pipelines automate building Docker images, model training, and model deployment. MLOps engineers also have a role in overall platform governance such as data / model lineage, as well as infrastructure and model monitoring.

MLOpsエンジニア - 完全/部分的に自動化されたCI/CDパイプラインのためのMLプラットフォームとMLパイプラインを運用するための自動化パイプラインを構築し管理する。これらのパイプラインは、Dockerイメージの構築、モデルのトレーニング、およびモデルのデプロイを自動化します。また、MLOpsエンジニアは、データ/モデルのリネージ、インフラストラクチャやモデルのモニタリングなど、プラットフォーム全体のガバナンスを担う役割も担っています。

## 0. 事前準備（手動）

構築作業のために、このノートブックを実行しているIAMroleに対して、いくつか権限が必要になります。
これらの権限を付与するために、手動でIAMfullAccessを付与してください。（実際の運用の際は最小権限を考慮ください）


* CodeCommitのCreate
* LambdaのCreate, SFnの実行
* SFnのCreate
* Lambda用、SFn用のIAMを作成するための権限
* S3バケットのCreate

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.attach_role_policy

In [None]:
import boto3
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

role = get_execution_role()
region = sagemaker_session.boto_region_name
account_id = boto3.client('sts').get_caller_identity().get('Account')

In [None]:
print(role)
print(region)
print(account_id)

## IAMは設定済みのため、削除

## 【削除】1. S3バケット作成、データ配置
実験のデータを格納するためのs3バケットを格納します。
このバケットは、LambdaがStepFunctionsにソースコードを連携するためにも利用します。

## 【削除】2.CodeCommitリポジトリの作成
モジュール化されたソースコードを管理するためのリポジトリを作成します。
機械学習プロジェクトごとにリポジトリを用意する想定です。

## 【削除】3.AWS Lambdaの構築
コードがpushされた時に、コンフィグファイル（experiment.yml）に指定されたパイプラインを起動するためのLambda関数を構築します。

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html#Lambda.Client.create_function

## 【削除】4. LambdaとCodeCommitの連携
CodeCommitのプロジェクト用リポジトリにコードがpushされた場合にLambdaが起動するように、LambdaとCodeCommitの連携をします。

Lambda、CodeCommitのコンソール画面から、トリガーが設定されていることが確認できます。

# 5.0 Experiments を作成する
state machine に指定するため、事前に作成が必要

In [None]:
import sys
!{sys.executable} -m pip install sagemaker-experiments

In [None]:
from smexperiments.experiment import Experiment

In [None]:
#experiment_name = "sfn-evaluate-model"
experiment_name = "demo-exp-project1"


# create the experiment if it doesn't exist
try:
    experiment_evaluate = Experiment.load(experiment_name=experiment_name)
except Exception as ex:
    if "ResourceNotFound" in str(ex):
        experiment_evaluate = Experiment.create(
            experiment_name=experiment_name, 
            description="model evaluation", 
            sagemaker_boto_client=boto3.client('sagemaker'))

print(experiment_evaluate.experiment_name)

## 5.StepFunctionsのステートマシン作成
今回は作成済みのstate machineをデプロイしますが、作成には Workflow Studio を利用するのもよいでしょう。

https://aws.amazon.com/jp/blogs/news/new-aws-step-functions-workflow-studio-a-low-code-visual-tool-for-building-state-machines/

### 5-1. ロールの作成と、カスタムポリシーアタッチ

### ロール作成
新パイプライン用の実行ロールを作成します。

demo-StepFunctions-ExperimentPipeline-Role2

In [None]:
iam_client = boto3.client('iam')

In [None]:
import json

In [None]:
step_functions_role_name = 'demo-StepFunctions-ExperimentPipeline-Role2'

assume_role_policy = {
      "Version": "2012-10-17",
      "Statement": {"Sid": "",
                    "Effect": "Allow",
                    "Principal": {"Service": ["states.amazonaws.com",
                                              "sagemaker.amazonaws.com"
                                             ]
                                 },
                    "Action": "sts:AssumeRole"
                   }
    }

response = iam_client.create_role(
    Path = '/service-role/',
    RoleName = step_functions_role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy),
    MaxSessionDuration=3600*12 # 12 hours
)

step_functions_role_arn = response['Role']['Arn']

In [None]:
step_functions_role_arn

### ポリシー作成

以下の権限を持つカスタムポリシーを作成します。
* StateMachineのアップデートのための、CloudWatchEvent権限
* SageMakerのProcessingジョブ発行
* S3からのファイル読み込み
* CloudWatch Logsへ記録

In [None]:
import json

step_functions_policy_name = 'demo-StepFunctions-ExperimentPipeline-Policy2'
custom_policy ={
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:DescribeRule",
                "events:PutRule",
                "sagemaker:CreateTrial",
                "sagemaker:CreateProcessingJob",
                "sagemaker:CreateTrainingJob",
                "s3:ListBucket",
                "s3:PutObject",
                "s3:GetObject",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": step_functions_role_arn,
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        }
    ]
}

response = iam_client.create_policy(
    PolicyName=step_functions_policy_name,
    PolicyDocument=json.dumps(custom_policy),
)

step_functions_policy_arn = response['Policy']['Arn']

In [None]:
step_functions_policy_arn

作成したカスタムポリシーをロールにアタッチします。

In [None]:
response = iam_client.attach_role_policy(
    RoleName=step_functions_role_name,
    PolicyArn=step_functions_policy_arn
)

### 5-2. state_definition.jsonを作成'
visual editorで作成することもできます。ここでは簡単に作成済みのjsonから構築します。

修正中ステートマシン（学習ジョブ版）

In [None]:
state_definition = {
  "Comment": "A description of my state machine",
  "StartAt": "CreateTrial",
  "States": {
    "CreateTrial": {
      "Type": "Task",
      "Next": "Preprocess",
      "Parameters": {
        "ExperimentName.$": "$$.Execution.Input['experiment']",
        "TrialName.$": "$$.Execution.Input['trial']"
      },
      "Resource": "arn:aws:states:::aws-sdk:sagemaker:createTrial"
    },
    "Preprocess": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync",
      "Parameters": {
        "ProcessingInputs": [
          {
            "InputName": "input",
            "AppManaged": False,
            "S3Input": {
              "S3Uri.$": "$$.Execution.Input['pipeline']['input_data_uri']",
              "LocalPath": "/opt/ml/processing/input",
              "S3DataType": "S3Prefix",
              "S3InputMode": "File",
              "S3DataDistributionType": "FullyReplicated",
              "S3CompressionType": "None"
            }
          },
          {
            "InputName": "code",
            "AppManaged": False,
            "S3Input": {
              "S3Uri.$": "$$.Execution.Input['preprocess']['code_s3']",
              "LocalPath": "/opt/ml/processing/input/code",
              "S3DataType": "S3Prefix",
              "S3InputMode": "File",
              "S3DataDistributionType": "FullyReplicated",
              "S3CompressionType": "None"
            }
          }
        ],
        "ProcessingOutputConfig": {
          "Outputs": [
            {
              "OutputName": "output",
              "AppManaged": False,
              "S3Output": {
                "S3Uri.$": "$$.Execution.Input['preprocess']['output_data_uri']",
                "LocalPath": "/opt/ml/processing/output",
                "S3UploadMode": "EndOfJob"
              }
            }
          ]
        },
        "AppSpecification": {
          "ImageUri.$": "$$.Execution.Input['preprocess']['ImageUri']",
          "ContainerArguments.$": "$$.Execution.Input['preprocess']['args']",
          "ContainerEntrypoint.$": "States.Array('python3', $$.Execution.Input['preprocess']['ContainerEntrypoint'])"
        },
        "ProcessingResources": {
          "ClusterConfig": {
            "InstanceCount.$": "$$.Execution.Input['preprocess']['InstanceCount']",
            "InstanceType.$": "$$.Execution.Input['preprocess']['InstanceType']",
            "VolumeSizeInGB.$": "$$.Execution.Input['preprocess']['VolumeSizeInGB']"
          }
        },
        "RoleArn": step_functions_role_arn,
        "ProcessingJobName.$": "States.Format('{}-preprocess', $$.Execution.Input['id'])"
      },
      "Next": "train"
    },
    "train": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
      "Parameters": {
        "AlgorithmSpecification": {
          "MetricDefinitions": [
            {
              "Name": "train:coefficient",
              "Regex": ".*?coefficient: (.*?)"
            },
            {
              "Name": "train:intercept ",
              "Regex": ".*?intercept: (.*?)"
            }
          ],
          "TrainingImage.$": "$$.Execution.Input['train']['ImageUri']",
          "TrainingInputMode": "File"
        },
        "HyperParameters": {
          "sagemaker_program.$": "States.Format('/opt/ml/input/data/train_code/{}', $$.Execution.Input['train']['code'])"
        },
        "InputDataConfig": [
          {
            "ChannelName": "preprocess",
            "ContentType": "text/csv",
            "DataSource": {
              "S3DataSource": {
                "S3DataDistributionType": "FullyReplicated",
                "S3DataType": "S3Prefix",
                "S3Uri": "s3://demo-exp-pipeline-project1/demo-exp-project1_TeamRole_2022-04-04T02:58:25.379+0000_729803bc930a08a719ac0f74c316937b5515ea91/preprocess/"
              }
            }
          },
          {
            "ChannelName": "train_code",
            "ContentType": "text/csv",
            "DataSource": {
              "S3DataSource": {
                "S3DataDistributionType": "FullyReplicated",
                "S3DataType": "S3Prefix",
                "S3Uri.$": "$$.Execution.Input['train']['code_s3']"
              }
            }
          }
        ],
        "OutputDataConfig": {
          "S3OutputPath.$": "$$.Execution.Input['train']['output_data_uri']"
        },
        "ResourceConfig": {
          "InstanceCount.$": "$$.Execution.Input['train']['InstanceCount']",
          "InstanceType.$": "$$.Execution.Input['train']['InstanceType']",
          "VolumeSizeInGB.$": "$$.Execution.Input['train']['VolumeSizeInGB']"
        },
        "StoppingCondition": {
          "MaxRuntimeInSeconds": 86400
        },
        "ExperimentConfig": {
          "ExperimentName.$": "$$.Execution.Input['experiment']",
          "TrialName.$": "$$.Execution.Input['trial']",
          "TrialComponentDisplayName": "train"
        },
        "RoleArn": step_functions_role_arn,
        "TrainingJobName.$": "States.Format('{}-train', $$.Execution.Input['id'])"
      },
      "Next": "predict"
    },
    "predict": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync",
      "Parameters": {
        "ProcessingInputs": [
          {
            "InputName": "input_preprocess",
            "AppManaged": False,
            "S3Input": {
              "S3Uri.$": "$$.Execution.Input['preprocess']['output_data_uri']",
              "LocalPath": "/opt/ml/processing/input/preprocess",
              "S3DataType": "S3Prefix",
              "S3InputMode": "File",
              "S3DataDistributionType": "FullyReplicated",
              "S3CompressionType": "None"
            }
          },
          {
            "InputName": "input_train",
            "AppManaged": False,
            "S3Input": {
              "S3Uri.$": "$.ModelArtifacts.S3ModelArtifacts",
              "LocalPath": "/opt/ml/processing/input/train",
              "S3DataType": "S3Prefix",
              "S3InputMode": "File",
              "S3DataDistributionType": "FullyReplicated",
              "S3CompressionType": "None"
            }
          },
          {
            "InputName": "code",
            "AppManaged": False,
            "S3Input": {
              "S3Uri.$": "$$.Execution.Input['predict']['code_s3']",
              "LocalPath": "/opt/ml/processing/input/code",
              "S3DataType": "S3Prefix",
              "S3InputMode": "File",
              "S3DataDistributionType": "FullyReplicated",
              "S3CompressionType": "None"
            }
          }
        ],
        "ProcessingOutputConfig": {
          "Outputs": [
            {
              "OutputName": "output",
              "AppManaged": False,
              "S3Output": {
                "S3Uri.$": "$$.Execution.Input['predict']['output_data_uri']",
                "LocalPath": "/opt/ml/processing/output",
                "S3UploadMode": "EndOfJob"
              }
            }
          ]
        },
        "AppSpecification": {
          "ImageUri.$": "$$.Execution.Input['predict']['ImageUri']",
          "ContainerArguments.$": "$$.Execution.Input['predict']['args']",
          "ContainerEntrypoint.$": "States.Array('python3', $$.Execution.Input['predict']['ContainerEntrypoint'])"
        },
        "ProcessingResources": {
          "ClusterConfig": {
            "InstanceCount.$": "$$.Execution.Input['predict']['InstanceCount']",
            "InstanceType.$": "$$.Execution.Input['predict']['InstanceType']",
            "VolumeSizeInGB.$": "$$.Execution.Input['predict']['VolumeSizeInGB']"
          }
        },
        "RoleArn": step_functions_role_arn,
        "ProcessingJobName.$": "States.Format('{}-predict', $$.Execution.Input['id'])"
      },
      "Next": "evaluate"
    },
    "evaluate": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
      "Parameters": {
        "AlgorithmSpecification": {
          "MetricDefinitions": [
            {
              "Name": "test:accuracy",
              "Regex": ".*?accuracy: (0.\\d+).*?"
            },
            {
              "Name": "test:roc_auc ",
              "Regex": ".*?roc_auc: (0.\\d+).*?"
            }
          ],
          "TrainingImage.$": "$$.Execution.Input['evaluate']['ImageUri']",
          "TrainingInputMode": "File"
        },
        "HyperParameters": {
          "sagemaker_program.$": "States.Format('/opt/ml/input/data/evaluate_code/{}', $$.Execution.Input['evaluate']['code'])"
        },
        "InputDataConfig": [
          {
            "ChannelName": "preprocess",
            "ContentType": "text/csv",
            "DataSource": {
              "S3DataSource": {
                "S3DataDistributionType": "FullyReplicated",
                "S3DataType": "S3Prefix",
                "S3Uri.$": "$$.Execution.Input['preprocess']['output_data_uri']"
              }
            }
          },
          {
            "ChannelName": "predict",
            "ContentType": "text/csv",
            "DataSource": {
              "S3DataSource": {
                "S3DataDistributionType": "FullyReplicated",
                "S3DataType": "S3Prefix",
                "S3Uri.$": "$$.Execution.Input['predict']['output_data_uri']"
              }
            }
          },
          {
            "ChannelName": "evaluate_code",
            "ContentType": "text/csv",
            "DataSource": {
              "S3DataSource": {
                "S3DataDistributionType": "FullyReplicated",
                "S3DataType": "S3Prefix",
                "S3Uri.$": "$$.Execution.Input['evaluate']['code_s3']"
              }
            }
          }
        ],
        "OutputDataConfig": {
          "S3OutputPath.$": "$$.Execution.Input['evaluate']['output_data_uri']"
        },
        "ResourceConfig": {
          "InstanceCount.$": "$$.Execution.Input['evaluate']['InstanceCount']",
          "InstanceType.$": "$$.Execution.Input['evaluate']['InstanceType']",
          "VolumeSizeInGB.$": "$$.Execution.Input['evaluate']['VolumeSizeInGB']"
        },
        "StoppingCondition": {
          "MaxRuntimeInSeconds": 86400
        },
        "ExperimentConfig": {
          "ExperimentName.$": "$$.Execution.Input['experiment']",
          "TrialName.$": "$$.Execution.Input['trial']",
          "TrialComponentDisplayName": "evaluate"
        },
        "RoleArn": step_functions_role_arn,
        "TrainingJobName.$": "States.Format('{}-evaluate', $$.Execution.Input['id'])"
      },
      "End": True
    }
  }
}

In [None]:
### jsonファイル作成
with open('state_definition.json', mode='wt', encoding='utf-8') as file:
    json.dump(state_definition, file, ensure_ascii=False, indent=4)

### 5-3. StepFunctionsの実験パイプラインを構築
(注意)ロール作成後即座に実行すると、作成が間に合わずエラーになる場合がある。その場合少し待って再度実行する

In [None]:
import json
stepfunctions = boto3.client('stepfunctions')

stepfunctions.create_state_machine(
    name='exp-preprocess-train-predict-evaluate2',
    definition=open("state_definition.json").read(),
    roleArn=step_functions_role_arn
)

以上で、MLOpsエンジニアによって実験パイプラインが構築されました。
データサイエンティストはこの実験パイプラインを利用して、実験環境であるコンテナやハードウェアであるインスタンスタイプを指定して、実験の試行錯誤を行うことができます。
次のノートブックでは、ノートブックから.pyファイルへのモジュール化を行います。