# AI Platform でのトレーニングと推論

このノートブックでは、TensorFlow 2.1 のモデルを GCP の AI Platform を使用してトレーニングと推論を実行する方法を学びます。

AI Platform Training/Prediction はト ML モデルのトレーニングとデプロイを行うマネージドサービスです。 <br>
インフラの用意やプロビジョニング、メンテナンスを行うことなく、シームレスに ML の環境を利用することができます。

## 目的
- TensorFlow 2系を用いたモデルの作り方を学習する
- AI Platform Training を利用して、クラウド上での分散学習を行う方法を学習する
- AI Platform Prediction を利用して、オートスケールする推論環境へのデプロイと推論の実行方法を学ぶ

## 環境の準備

In [None]:
import os, json, math, subprocess
import numpy as np
import shutil
import tensorflow as tf
print("TensorFlow version: ",tf.version.VERSION)

cmd = 'gcloud config list project --format "value(core.project)"'
PROJECT = subprocess.Popen(
      cmd, stdout=subprocess.PIPE,
      shell=True, universal_newlines=True).stdout.readlines()[0].rstrip('\n')
print("Your current GCP Project Name is: {}".format(PROJECT))
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1
BUCKET = "{}-keras".format(PROJECT)

# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["REGION"] = REGION
os.environ["BUCKET"] = BUCKET # DEFAULT BUCKET WILL BE <PROJECT ID>-keras
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # SET TF ERROR LOG VERBOSITY

## Python の trainer パッケージを作成する

AI Platformのトレーニングに送信する `trainer` パッケージの作成をはじめましょう。

AI Platform Training のトレーニングジョブを実行する際には、クラウド上での実行のためにコードをまとめ上げてクラウドに送られます。<br>
そのために、コードは Python のパッケージにまとめられる必要があります。

Python のパッケージは、一つ以上の `.py` ファイルと、ディレクトリがパッケージであることを示すために `__init__.py` ファイルを含める必要があります。<br>
`__init__.py` は 初期実行を含めることもできますが、今回は必要ありませんので空のファイルとしておきます。

In [None]:
%%bash

# Remove files from previous notebook runs.
rm -rf taxifare_tf2

mkdir taxifare_tf2
touch taxifare_tf2/__init__.py

#### model.py ファイルを作成する

AI Platformに送信するファイルには、`__init__.py` 以外に二つの `.py` ファイルを含めます。<br>
- `model.py`: Keras モデルの定義 
- `task.py` : コマンドラインからの引数をパースし、`model.py` へ渡す実行ファイル

ノートブックのセルを `%%writefile` マジックからはじめると、指定された名前のファイルへセルの内容を書き込みます。<br>
`-a` フラグを追加すると、既に存在するファイルへ上書きを行います。<br>
そのため、以下の`%%writefile`から始まるセルは、 **かならず一回のみの実行**としてください。

最初に、必要なパッケージのインストールを行いましょう。

In [None]:
%%writefile taxifare_tf2/model.py

import datetime
import logging
import os

import numpy as np
import tensorflow as tf
from tensorflow import feature_column as fc
from tensorflow.keras import layers
from tensorflow.keras import models

# set TF error log verbosity
logging.getLogger("tensorflow").setLevel(logging.INFO)

まずは、すべてのデータ列名、予測するデータ列（ラベル）名、そしてデフォルトの値を定義しましょう。

In [None]:
%%writefile -a taxifare_tf2/model.py 

CSV_COLUMNS = [
    'fare_amount',
    'hourofday',
    'dayofweek',
    'pickup_longitude',
    'pickup_latitude',
    'dropoff_longitude',
    'dropoff_latitude'
]

LABEL_COLUMN = 'fare_amount'

DEFAULTS = [[0.0], [0], [0], [0.0], [0.0], [0.0], [0.0]]

次に、使用する特徴量と予測するラベルを定義し、トレーニング用データセットを読み込みます。

In [None]:
%%writefile -a taxifare_tf2/model.py 

def features_and_labels(row_data):
    label = row_data.pop(LABEL_COLUMN)
    return row_data, label  # features, label

# load the training data
def load_dataset(pattern, batch_size=1, mode=tf.estimator.ModeKeys.EVAL):
    dataset = (tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)
               .map(features_and_labels) # features, label
              )
    if mode == tf.estimator.ModeKeys.TRAIN:
        dataset = dataset.shuffle(1000)
    dataset = dataset.prefetch(1) # take advantage of multi-threading; 1=AUTOTUNE
    return dataset

また、同様に DNN モデルを設計します。

In [None]:
%%writefile -a taxifare_tf2/model.py 

def rmse(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true))) 

def build_model():
    INPUT_COLS = [
    'hourofday',
    'dayofweek',
    'pickup_longitude',
    'pickup_latitude',
    'dropoff_longitude',
    'dropoff_latitude',
]

    # input layer
    inputs = {
        colname : tf.keras.layers.Input(name=colname, shape=(), dtype='float32')
           for colname in INPUT_COLS
    }
    feature_columns = {
        colname : tf.feature_column.numeric_column(colname)
           for colname in INPUT_COLS
    }
    
    # the constructor for DenseFeatures takes a list of numeric columns
    # The Functional API in Keras requires that you specify: LayerConstructor()(inputs)
    dnn_inputs = tf.keras.layers.DenseFeatures(feature_columns.values())(inputs)

    # two hidden layers of [32, 8] just in like the BQML DNN
    h1 = tf.keras.layers.Dense(32, activation='relu', name='h1')(dnn_inputs)
    h2 = tf.keras.layers.Dense(8, activation='relu', name='h2')(h1)

    # final output is a linear activation because this is regression
    output = tf.keras.layers.Dense(1, activation='linear', name='fare')(h2)
    model = tf.keras.models.Model(inputs, output)
    model.compile(optimizer='adam', loss='mse', metrics=[rmse, 'mse'])
    return model

最後に、トレーニングプロセスを管理する関数を作成します。<br>
`args` の辞書は、 トレーニングの際に`task.py` を通してコマンドラインから渡されます。

In [None]:
%%writefile -a taxifare_tf2/model.py 

def train_and_export_model(args):
    TRAIN_BATCH_SIZE = 128 
    NUM_TRAIN_EXAMPLES = 50000 * args['train_epochs']
    NUM_EVALS = args['train_epochs']
    NUM_EVAL_EXAMPLES = 15000

    strategy = tf.distribute.MirroredStrategy()

    with strategy.scope():
        model = build_model()

    trainds = load_dataset(args['train_data_path'],
                           TRAIN_BATCH_SIZE * 4,
                           tf.estimator.ModeKeys.TRAIN)
    evalds = load_dataset(args['eval_data_path'],
                          1000,
                          tf.estimator.ModeKeys.EVAL).take(NUM_EVAL_EXAMPLES//1000)

    steps_per_epoch = NUM_TRAIN_EXAMPLES // (TRAIN_BATCH_SIZE * NUM_EVALS)

    callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath=args['output_dir']),
                tf.keras.callbacks.TensorBoard(args['output_dir'])]

    history = model.fit(trainds,
                        verbose=2,
                        validation_data=evalds,
                        epochs=NUM_EVALS,
                        steps_per_epoch=steps_per_epoch,
                        callbacks=callbacks)


#### `task.py` ファイルを作成する

次に、 `task.py` ファイルを作成します<br>
このファイルは、`model.py` の `train_and_export_model` 関数が使用するパラメータを、コマンドラインの引数を渡す役割を持ちます。

In [None]:
%%writefile taxifare_tf2/task.py 

import argparse
import json
import os

from . import model


if __name__ == "__main__":
    
    parser = argparse.ArgumentParser()
    
    parser.add_argument(
        "--train_data_path",
        help = "GCS or local path to training data",
        required = True
    )
    parser.add_argument(
        "--train_epochs",
        help = "Steps to run the training job for (default: 5)",
        type = int,
        default = 5
    )
    parser.add_argument(
        "--eval_data_path",
        help = "GCS or local path to evaluation data",
        required = True
    )
    parser.add_argument(
        "--output_dir",
        help = "GCS location to write checkpoints and export models",
        required = True
    )
    parser.add_argument(
        "--job-dir",
        help = "This is not used by our model, but it is required by gcloud",
    )
    args = parser.parse_args().__dict__

    model.train_and_export_model(args)

## モデルをローカルでテストする

AI Platform でトレーニングを行う前に、パッケージが動作するかどうかをローカルで確認しましょう。<br>

In [None]:
!rm -rf ./taxi_model
!python3 -m taxifare_tf2.task \
    --train_data_path=../data/taxi-train.csv \
    --eval_data_path=../data/taxi-valid.csv \
    --train_epochs=3 \
    --output_dir=./local_taxi_model

## Cloud AI Platform でトレーニングを実行する

全てがローカルで動作することが確認できましたので、いよいよクラウドで実行しましょう。<br>
まずは、トレーニングに使用するデータを GCS のバケットに移します。

In [30]:
!gsutil mb gs://{BUCKET}
!gsutil -m cp -r ../data/ gs://{BUCKET}/taxifare_example/

Creating gs://takumi-keras/...
ServiceException: 409 Bucket takumi-keras already exists.
CommandException: No URLs matched


クラウドにジョブを送信する際には、 [`gcloud ai-platform jobs submit training [ジョブ名]`](https://cloud.google.com/sdk/gcloud/reference/ml-engine/jobs/submit/training) を使用し、AI Platform に渡す追加のパラメータをいくつか指定します。
- ジョブ名: ジョブに付けるユニークな識別子。実行時の時間を付けてユニークにするのが一般的。
- package-path: GCS 上の Python パッケージのパス
- runtime-version: 使用するTensorFlow のバージョン
- python-version: 使用する Python のバージョン。現在は、 TF2.1 では Python 3.7 のみがサポートされています、
- region: トレーニングを行うリージョン。AI Platform Training がサポートされているリージョンについては、[こちら](https://cloud.google.com/ml-engine/docs/tensorflow/regions) を参照してください。
- scale-tier: トレーニングを行うインフラの構成。詳細は[こちら](https://cloud.google.com/ai-platform/training/docs/machine-types)を参照してください。

パラメータ名の無い `-- \` 以下には、 task.py に渡す引数を指定することができます、

In [None]:
OUTDIR = "gs://{}/taxifare_example/saved_model/".format(BUCKET)
TRAIN_DATA_PATH = "gs://{}/taxifare_example/data/taxi-train.csv".format(BUCKET)
EVAL_DATA_PATH = "gs://{}/taxifare_example/data/taxi-valid.csv".format(BUCKET)

!gsutil -m rm -rf {OUTDIR} # start fresh each time
!gcloud ai-platform jobs submit training taxifare_$(date -u +%y%m%d_%H%M%S) \
    --package-path=taxifare_tf2 \
    --module-name=taxifare_tf2.task \
    --job-dir=gs://{BUCKET}/taxifare_example \
    --python-version=3.7 \
    --runtime-version=2.1 \
    --region={REGION}\
    --scale-tier BASIC \
    -- \
    --train_data_path={TRAIN_DATA_PATH} \
    --eval_data_path={EVAL_DATA_PATH}  \
    --train_epochs=20 \
    --output_dir={OUTDIR}

## Tensorboard でトレーニングをモニタリングする

以下の手順でTensorboard を実行してください。
- 以下のセルを実行し、出力を Cloud Shell に貼り付けて実行
- Cloud Shell 右上の Web Preview -> Preview on Port 8080 をクリック
- Tensorboard を確認する（正しく表示されない場合は、しばらく待ち、ブラウザを更新してください

In [None]:
print('tensorboard --logdir {} --port 8080'.format(OUTDIR))

トレーニングは 5-7 分ほどで完了します。<br>
完了したら、以下のセルを実行し、 `SavedModel` が保存されていることを確認してください。

In [None]:
!gsutil ls -r {OUTDIR}*

## モデルをデプロイする

では、エクスポートされた `SavedModel` をデプロイして、REST APIで呼び出せるようにしましょう。<br>
そうすることによって、需要に応じてオートスケールする推論環境を使用することができます。

AI Platform Prediction はバージョニング機能を持っています。<br>
はじめにモデルフォルダを作成し、そのフォルダの中にモデルのバージョン (`v1`) を作成します。バージョンのデプロイには５分ほどの時間がかかります。


In [None]:
VERSION='v1'

!gcloud ai-platform models create taxifare --regions us-central1
!gcloud ai-platform versions create {VERSION} --model taxifare \
    --origin {OUTDIR} \
    --python-version=3.7 \
    --runtime-version 2.1

Monitor the model creation at [GCP Console > AI Platform](https://console.cloud.google.com/mlengine/models/taxifare/) でモニタリングをし、モデルのバージョン (`v1`)が作られたら次に進んでください。

モデルを使って推論をするために、新しいデータを用意する必要があります。<br>
では、午後6時に[ウォールストリートからブライアント公園まで](https://www.google.com/maps/dir/%E3%82%A6%E3%82%A9%E3%83%BC%E3%83%AB%E8%A1%97,+%E3%82%A2%E3%83%A1%E3%83%AA%E3%82%AB%E5%90%88%E8%A1%86%E5%9B%BD+New+York/Bryant+Park,+%E3%83%8B%E3%83%A5%E3%83%BC%E3%83%A8%E3%83%BC%E3%82%AF+%E3%83%8B%E3%83%A5%E3%83%BC%E3%83%A8%E3%83%BC%E3%82%AF%E5%B7%9E+%E3%82%A2%E3%83%A1%E3%83%AA%E3%82%AB%E5%90%88%E8%A1%86%E5%9B%BD/@40.7302283,-74.0247121,13z/data=!3m1!4b1!4m13!4m12!1m5!1m1!1s0x89c25a165bedccab:0x2cb2ddf003b5ae01!2m2!1d-74.0088256!2d40.7060361!1m5!1m1!1s0x89c259aae7a0b1bd:0xb49cafb82537f1a7!2m2!1d-73.9832326!2d40.7535965)の4マイルの距離をタクシー移動する際の料金を予測してみましょう。<br>

In [None]:
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import json

credentials = GoogleCredentials.get_application_default()
api = discovery.build("ml", "v1", credentials = credentials,
            discoveryServiceUrl = "https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json")

request_data = {"instances":
  [
      {
        "dayofweek": 1,
        "hourofday": 18,
        "pickup_longitude": -73.0101638,
        "pickup_latitude": 40.7059409,
        "dropoff_longitude": -73.9834672,
        "dropoff_latitude": 40.7532916,
      }
  ]
}

parent = "projects/{}/models/taxifare".format(PROJECT) # use default version

response = api.projects().predict(body = request_data, name = parent).execute()
print("response = {0}".format(response))

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License