# Pipelines Sample

## 離反予測を用いた SageMaker Pipelines の ML パイプライン構築

### シナリオ

電話回線の離反データセット（回線ごとのデータと離反した/しなかったの結果が残る）を使って、
SageMaker Pipelines を用いたML パイプラインを構築します。
データの詳細については[こちら](https://github.com/aws-samples/amazon-sagemaker-examples-jp/blob/master/xgboost_customer_churn/xgboost_customer_churn.ipynb)に詳細があります。  

3333 行の 元データを 1111 行ずつ 3 分割し、それぞれ 1 日目に入手するデータ、 2 日目に入手するデータ、 3 日目に入手するデータと仮定する。  

#### 1日目

まずは 1 日目のデータを使ってモデルを開発、学習し、モデルを評価し、デプロイすることを、手で（SageMaker Pipelines を使わない）で行います。

#### 2日目

2 日目のデータが手に入るので、1 日目のデータと合わせて再学習し、モデルを評価します。 1 日目のモデルより評価が高ければ 2 日目のモデルをデプロイします。これらを手動で動かすのは辛いので SageMaker Pipelines で動かすようセットアップしたあとパイプラインを実行します。また、モデルレジストリにモデルも登録します。



### 準備
まずは、データを入手し、3 分割する。

In [None]:
import os, json, sagemaker, pandas as pd, numpy as np
from sklearn.model_selection import train_test_split
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.inputs import TrainingInput
from sagemaker import get_execution_role
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import ProcessingStep,TrainingStep
from sagemaker.estimator import Estimator
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.transformer import Transformer
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
from sagemaker.workflow.pipeline import Pipeline

In [None]:
# データをダウンロード
![ -e DKD2e_data_sets.zip ] && rm DKD2e_data_sets.zip
!wget http://dataminingconsultant.com/DKD2e_data_sets.zip
!unzip -o DKD2e_data_sets.zip

In [None]:
# 使用するデータを確認
df = pd.read_csv('./Data sets/churn.txt')
df.head()

In [None]:
# データを分割する際、離反データが偏らないように、離反したデータと離反しなかったデータを分けて分割する
df_true = df[df['Churn?']=='True.'].reset_index()
df_false = df[df['Churn?']=='False.'].reset_index()
df_true = df_true.drop(['index'],axis=1)
df_false = df_false.drop(['index'],axis=1)

In [None]:
# 分割前にシャッフルする
df_true_shuffle = df_true.sample(frac=1, random_state=42)
df_false_shuffle = df_false.sample(frac=1, random_state=42)

In [None]:
# 3分割する
split_num = 3
split_df_list = []
for i in range(split_num):
    idx_min_true,idx_max_true = i*len(df_true)//split_num,(i+1)*len(df_true)//split_num
    idx_min_false,idx_max_false = i*len(df_false)//split_num,(i+1)*len(df_false)//split_num
    tmp_df = pd.concat([df_true[idx_min_true:idx_max_true],df_false[idx_min_false:idx_max_false]],axis=0)
    split_df_list.append(tmp_df)

In [None]:
# 分割ファイルをローカルに出力する
shots_dir = './rawdata/'
os.makedirs(f'{shots_dir}/', exist_ok=True)
local_csvfile_list = []
for i,split_df in enumerate(split_df_list):
    file_name = f'{shots_dir}day_{str(i+1)}.csv'
    split_df.to_csv(file_name,index=False)
    local_csvfile_list.append(file_name)
print(*local_csvfile_list)

## 一日目のデータで前処理、学習、評価、デプロイ、予測を手作業で
### 前処理
前処理は[こちら](https://github.com/aws-samples/amazon-sagemaker-examples-jp/blob/master/xgboost_customer_churn/xgboost_customer_churn.ipynb)と同じことを SageMaker Processing で行う。コンテナは scikit-learn のビルトインコンテナを利用する

In [None]:
# Processor 定義
ROLE = get_execution_role()
HANDSON_NAME = 'sagemaker-pipelines-sample'
PRE_PROCESS_JOBNAME = f'{HANDSON_NAME}-pre-processing'
sklearn_processor = SKLearnProcessor(
    base_job_name = PRE_PROCESS_JOBNAME,
    framework_version='0.23-1',
    role=ROLE,
    instance_type='ml.m5.xlarge',instance_count=1
)

BUCKET = sagemaker.session.Session().default_bucket()
RAWDATA_PREFIX = shots_dir.replace('./','').replace('/','')
RAWDATA_S3_URI = f's3://{BUCKET}/{HANDSON_NAME}/{RAWDATA_PREFIX}'

# input 定義
RAWCSV_S3_URI = sagemaker.s3.S3Uploader.upload(local_csvfile_list[0],RAWDATA_S3_URI)
PRE_PROCESSING_RAW_INPUT_DIR = '/opt/ml/processing/input/raw'

# output 定義
PRE_PROCESSING_TRAIN_OUTPUT_DIR = '/opt/ml/processing/output/train'
PRE_PROCESSING_VALID_OUTPUT_DIR = '/opt/ml/processing/output/valid'
PRE_PROCESSING_TEST_OUTPUT_DIR = '/opt/ml/processing/output/test'

sklearn_processor.run(code='./preprocess/preprocess.py',
                      # ProcessingInput は指定したものを全て S3 から processing インスタンスにコピーされる。 Destination でコピー先を指定できる。
                      inputs=[
                          ProcessingInput( 
                              source=RAWCSV_S3_URI,
                              destination=PRE_PROCESSING_RAW_INPUT_DIR
                          ),
                      ],
                      # processing インスタンスの source にあるものを全て S3 に格納する。(processing インスタンス側でこのディレクトリは自動で作成される)
                      outputs=[
                          ProcessingOutput(
                              output_name = 'train',
                              source=PRE_PROCESSING_TRAIN_OUTPUT_DIR,
                          ),
                          ProcessingOutput(
                              output_name = 'valid',
                              source=PRE_PROCESSING_VALID_OUTPUT_DIR,
                          ),
                          ProcessingOutput(
                              output_name = 'test',
                              source=PRE_PROCESSING_TEST_OUTPUT_DIR,
                          )
                      ],
                      # processing インスタンスのどこに csv ファイルが配置されたか、どこにファイルを出力すればよいのか、を
                      # コードに渡すための引数
                      arguments=[
                          '--raw-input-dir',PRE_PROCESSING_RAW_INPUT_DIR,
                          '--train-output-dir',PRE_PROCESSING_TRAIN_OUTPUT_DIR,
                          '--valid-output-dir',PRE_PROCESSING_VALID_OUTPUT_DIR,
                          '--test-output-dir',PRE_PROCESSING_TEST_OUTPUT_DIR,
                      ]
                     )

### 学習
xgboost を利用する。ハイパーパラメータは[こちら](https://github.com/aws-samples/amazon-sagemaker-examples-jp/blob/master/xgboost_customer_churn/xgboost_customer_churn.ipynb)と同じにして SageMaker Training で行う。   
コンテナは xgboost のビルトインコンテナを利用する

In [None]:
TRAIN_CSV_S3_URI = sklearn_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'] + '/train.csv'
VALID_CSV_S3_URI = sklearn_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][1]['S3Output']['S3Uri'] + '/valid.csv'
TEST_CSV_S3_URI = sklearn_processor.latest_job.describe()['ProcessingOutputConfig']['Outputs'][2]['S3Output']['S3Uri'] + '/test.csv'
print(TRAIN_CSV_S3_URI)
print(VALID_CSV_S3_URI)
print(TEST_CSV_S3_URI)

In [None]:
CONTENT_TYPE='text/csv'
TRAIN_S3_INPUT = TrainingInput(TRAIN_CSV_S3_URI, content_type=CONTENT_TYPE)
VALID_S3_INPUT = TrainingInput(VALID_CSV_S3_URI, content_type=CONTENT_TYPE)

In [None]:
XGB_CONTAINER = sagemaker.image_uris.retrieve("xgboost", sagemaker.session.Session().boto_region_name, "1.2-1")

In [None]:
TRAIN_JOBNAME = f'{HANDSON_NAME}-traing'
MODEL_S3_URI = f's3://{BUCKET}/{TRAIN_JOBNAME}'
HYPERPARAMETERS = {
    "max_depth":"5",
    "eta":"0.2",
    "gamma":"4",
    "min_child_weight":"6",
    "subsample":"0.8",
    "objective":"binary:logistic",
    "num_round":"100"
}
xgb = Estimator(
    XGB_CONTAINER,
    ROLE,
    base_job_name = TRAIN_JOBNAME,
    hyperparameters=HYPERPARAMETERS,
    instance_count=1, 
    instance_type='ml.m5.xlarge',
    output_path = MODEL_S3_URI
)

In [None]:
xgb.fit({'train': TRAIN_S3_INPUT, 'validation': VALID_S3_INPUT})

### モデルの評価
* AUC で行う
* SageMaker Processing を利用する
* xgboost のビルトインコンテナを利用する

In [None]:
MODEL_DATA_S3_URI = xgb.model_data
POST_PROCESS_JOBNAME = f'{HANDSON_NAME}-post-processing'
POST_PROCESSING_OUTPUT_S3_URI = f's3://{BUCKET}/{POST_PROCESS_JOBNAME}/post_processing_output'
POST_PROCESSING_MODEL_DIR = '/opt/ml/processing/model'
POST_PROCESSING_INPUT_DIR = '/opt/ml/processing/input'
POST_PROCESSING_OUTPUT_DIR = '/opt/ml/processing/output'

eval_processor = ScriptProcessor(
    base_job_name = POST_PROCESS_JOBNAME,
    image_uri=XGB_CONTAINER,
    command=["python3"],
    instance_type='ml.m5.xlarge',
    instance_count=1,
    role=ROLE,
)
eval_processor.run(
    code = './postprocess/postprocess.py',
    inputs=[
        ProcessingInput( 
            source=TEST_CSV_S3_URI,
            destination=POST_PROCESSING_INPUT_DIR
        ),
        ProcessingInput(
            source=MODEL_DATA_S3_URI,
            destination=POST_PROCESSING_MODEL_DIR
        )
    ],
    outputs=[
        ProcessingOutput(
            source=POST_PROCESSING_OUTPUT_DIR,
            destination=POST_PROCESSING_OUTPUT_S3_URI
        )
    ],
    arguments=[
        '--input-dir',POST_PROCESSING_INPUT_DIR,
        '--output-dir',POST_PROCESSING_OUTPUT_DIR,
        '--model-dir',POST_PROCESSING_MODEL_DIR,
    ]
)


## 2 日目はパイプラインを作成する
1 日目とほぼ同じことをするのでパイプラインで自動化する。
追加する処理として、1 日目のデータで学習したモデルと 2 日目のデータを追加して学習したモデルで精度を比較し、よかったほうをデプロイする、という処理もパイプラインで実現する。
今まで大文字で書いていた変数が SageMaker に渡していたパラメータですので、それらを Pipelines で扱えるようパラメータインスタンスにします。

In [None]:
RAWCSV_S3_URI = sagemaker.s3.S3Uploader.upload(local_csvfile_list[1],RAWDATA_S3_URI)

In [None]:
TRAIN_CSV_S3_URI

In [None]:
# 前処理ステップ定義
sklearn_processor = SKLearnProcessor(
    base_job_name = PRE_PROCESS_JOBNAME,
    framework_version='0.23-1',
    role=ROLE,
    instance_type='ml.m5.xlarge',instance_count=1
)

rawcsv_s3_uri_param = ParameterString(name='RawCsvS3Uri',default_value=RAWCSV_S3_URI)


LASTTIME_PRE_PROCESSING_TRAIN_DATA_INPUT_DIR = '/opt/ml/processing/input/train'
LASTTIME_PRE_PROCESSING_VALID_DATA_INPUT_DIR = '/opt/ml/processing/input/valid'
LASTTIME_PRE_PROCESSING_TEST_DATA_INPUT_DIR = '/opt/ml/processing/input/test'
lasttime_pre_processing_train_data_s3_uri_param = ParameterString(name='lasttime-pre-processing-train-data-S3-URI',default_value=TRAIN_CSV_S3_URI)
lasttime_pre_processing_valid_data_s3_uri_param = ParameterString(name='lasttime-pre-processing-valid-data-S3-URI',default_value=VALID_CSV_S3_URI)
lasttime_pre_processing_test_data_s3_uri_param = ParameterString(name='lasttime-pre-processing-test-data-S3-URI',default_value=TEST_CSV_S3_URI)

pre_process_step = ProcessingStep(
    code='./preprocess/preprocess.py',
    name=f'{PRE_PROCESS_JOBNAME}_step',
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=rawcsv_s3_uri_param,
            destination=PRE_PROCESSING_RAW_INPUT_DIR
        ),
        ProcessingInput(
            source=lasttime_pre_processing_train_data_s3_uri_param,
            destination=LASTTIME_PRE_PROCESSING_TRAIN_DATA_INPUT_DIR
        ),
        ProcessingInput(
            source=lasttime_pre_processing_valid_data_s3_uri_param,
            destination=LASTTIME_PRE_PROCESSING_VALID_DATA_INPUT_DIR
        ),
        ProcessingInput(
            source=lasttime_pre_processing_test_data_s3_uri_param,
            destination=LASTTIME_PRE_PROCESSING_TEST_DATA_INPUT_DIR
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name = 'train',
            source=PRE_PROCESSING_TRAIN_OUTPUT_DIR,
        ),
        ProcessingOutput(
            output_name = 'valid',
            source=PRE_PROCESSING_VALID_OUTPUT_DIR,
        ),
        ProcessingOutput(
            output_name = 'test',
            source=PRE_PROCESSING_TEST_OUTPUT_DIR,
        )
    ],
    job_arguments=[
        '--raw-input-dir',PRE_PROCESSING_RAW_INPUT_DIR,
        '--lasttime-train-input-dir',LASTTIME_PRE_PROCESSING_TRAIN_DATA_INPUT_DIR,
        '--lasttime-valid-input-dir',LASTTIME_PRE_PROCESSING_VALID_DATA_INPUT_DIR,
        '--lasttime-test-input-dir',LASTTIME_PRE_PROCESSING_TEST_DATA_INPUT_DIR,
        '--train-output-dir',PRE_PROCESSING_TRAIN_OUTPUT_DIR,
        '--valid-output-dir',PRE_PROCESSING_VALID_OUTPUT_DIR,
        '--test-output-dir',PRE_PROCESSING_TEST_OUTPUT_DIR,   
    ]
)

In [None]:
# 学習ステップ定義
xgb = Estimator(
    XGB_CONTAINER,
    ROLE,
    base_job_name = TRAIN_JOBNAME,
    hyperparameters=HYPERPARAMETERS,
    instance_count=1, 
    instance_type='ml.m5.xlarge',
    output_path = MODEL_S3_URI
)

train_step = TrainingStep(
    name=f"{TRAIN_JOBNAME}_step",
    estimator=xgb,
    inputs={
        "train": TrainingInput(
            s3_data=pre_process_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type=CONTENT_TYPE
        ),
        "validation": TrainingInput(
            s3_data=pre_process_step.properties.ProcessingOutputConfig.Outputs[
                "valid"
            ].S3Output.S3Uri,
            content_type=CONTENT_TYPE
        )
    },
)

In [None]:
thistime_train_eval_processor = ScriptProcessor(
    base_job_name = f'{HANDSON_NAME}_thistime_train_eval',
    image_uri=XGB_CONTAINER,
    command=['python3'],
    instance_type='ml.m5.xlarge',
    instance_count=1,
    role=ROLE,
)
thistime_train_eval_report = PropertyFile(
    name='thistime_train_evaluation_report',
    output_name='thistime_train_evaluation',
    path='evaluation.json'
)

thistime_train_eval_step = ProcessingStep(
    code='./postprocess/postprocess.py',
    name=f'{HANDSON_NAME}_thistime_train_eval_step',
    processor=thistime_train_eval_processor,
    inputs=[
        ProcessingInput(
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination=POST_PROCESSING_MODEL_DIR
        ),
        ProcessingInput(
            source=pre_process_step.properties.ProcessingOutputConfig.Outputs[
                'test'
            ].S3Output.S3Uri,
            destination=POST_PROCESSING_INPUT_DIR
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='thistime_train_evaluation',
            source=POST_PROCESSING_OUTPUT_DIR
        ),
    ],
    property_files=[thistime_train_eval_report],
    job_arguments=[
        '--input-dir',POST_PROCESSING_INPUT_DIR,
        '--output-dir',POST_PROCESSING_OUTPUT_DIR,
        '--model-dir',POST_PROCESSING_MODEL_DIR,        
    ]
)

In [None]:
lasttime_train_eval_processor = ScriptProcessor(
    base_job_name = f'{HANDSON_NAME}_lasttime_train_eval',
    image_uri=XGB_CONTAINER,
    command=['python3'],
    instance_type='ml.m5.xlarge',
    instance_count=1,
    role=ROLE,
)
lasttime_train_eval_report = PropertyFile(
    name='lasttime_train_evaluation_report',
    output_name='lasttime_train_evaluation',
    path='lasttime_train_evaluation.json'
)
lasttime_train_model_s3_uri_param = ParameterString(name='lasttime-train-model-S3-URI',default_value=MODEL_DATA_S3_URI)
lasttime_train_eval_step = ProcessingStep(
    code='./postprocess/postprocess.py',
    name=f'{HANDSON_NAME}_lasttime_train_eval_step',
    processor=lasttime_train_eval_processor,
    inputs=[
        ProcessingInput(
            source=lasttime_train_model_s3_uri_param,
            destination=POST_PROCESSING_MODEL_DIR
        ),
        ProcessingInput(
            source=pre_process_step.properties.ProcessingOutputConfig.Outputs[
                'test'
            ].S3Output.S3Uri,
            destination=POST_PROCESSING_INPUT_DIR
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='lasttime_train_evaluation',
            source=POST_PROCESSING_OUTPUT_DIR
        ),
    ],
    property_files=[lasttime_train_eval_report],
    job_arguments=[
        '--input-dir',POST_PROCESSING_INPUT_DIR,
        '--output-dir',POST_PROCESSING_OUTPUT_DIR,
        '--model-dir',POST_PROCESSING_MODEL_DIR,        
    ]
)

In [None]:
model = Model(
    image_uri=XGB_CONTAINER,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker.session.Session(),
    role=ROLE,
)
model_inputs = CreateModelInput(
    instance_type="ml.m5.large",
    # accelerator_type="ml.eia1.medium",
)
create_model_step = CreateModelStep(
    name=f"{HANDSON_NAME}-create-model-step",
    model=model,
    inputs=model_inputs,
)

In [None]:
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=thistime_train_eval_step,
        property_file=thistime_train_eval_report,
        json_path="regression_metrics.auc.value",
    ),
    right=JsonGet(
        step=lasttime_train_eval_step,
        property_file=lasttime_train_eval_report,
        json_path="regression_metrics.auc.value",
    ),
)

cond_step = ConditionStep(
    name=f'{HANDSON_NAME}_condtion',
    conditions=[cond_gte],
    if_steps=[create_model_step],
    else_steps=[], 
)

In [None]:
pipeline_name = f"{HANDSON_NAME}-pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        lasttime_pre_processing_train_data_s3_uri_param,
        lasttime_pre_processing_valid_data_s3_uri_param,
        lasttime_pre_processing_test_data_s3_uri_param,
        lasttime_train_model_s3_uri_param
    ],
    steps=[
        pre_process_step,
        train_step,
        thistime_train_eval_step,
        lasttime_train_eval_step,
        create_model_step,
        # cond_step
    ],
)

In [None]:
definition = json.loads(pipeline.definition())
definition

In [None]:
pipeline.upsert(role_arn=ROLE)

In [None]:
print(pre_process_step)
print(train_step,)
print(thistime_train_eval_step,)
print(lasttime_train_eval_step,)
print(create_model_step,)
print(cond_step)

In [None]:
rawcsv_s3_uri_param