<h1> 特徴量エンジニアリング</h1>

このnotebookでは、特徴量エンジニアリングをどのように機械学習に取り入れるのかを学習していきましょう。

<ul>
<li> 特徴量の列を扱う </li>
<li> TensoFlowでフィーチャークロスを追加する </li>
<li> BigQueryでデータを読み込む </li>
<li> Dataflowを使用してデータセットを作成する </li>
<li> wide-and-deepモデルを利用する </li>

</ul>

In [None]:
import tensorflow as tf
import apache_beam as beam
import shutil
print(tf.__version__)

<h2> プロジェクトとバケットを環境変数を入力する</h2>

以下に注意してください:
<ol>
<li> Project IDは使用するプロジェクトを表す*ユニーク*な文字列です（プロジェクト名とは異なります）。GCPコンソールのホーム画面から確認することができます。 </li>
<li> クラウド上での機械学習のトレーニングでは、モデルファイルを保存したり復元したりというプロセスを含んでいます。まだバケットを持っていない場合、GCPコンソールからバケットを一つ作成してください。Project IDを先頭につけることでユニークなバケット名をつけることが一般的です。またコスト上の理由から、単一のRegionでバケットを作成するのがよいでしょう。</li>
</ol>
自分のProject IDとバケット名に<b>以下のセルの変数を変更してください。</b>


In [None]:
import os
PROJECT = 'cloud-training-demos' # ご自分のProject IDに変更してください。
BUCKET = 'cloud-training-demos-ml' # ご自分のバケット名に変更してください。選択したリージョンのリージョナルなバケットを使用してください。
REGION = 'us-central1' # Cloud MLEを利用できるリージョンを選択してください https://cloud.google.com/ml-engine/docs/regions

In [None]:
# for bash
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = '1.8' 

## ensure we're using python2 env
os.environ['CLOUDSDK_PYTHON'] = 'python2'

In [None]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

## ensure we predict locally with our current Python environment
gcloud config set ml_engine/local_python `which python`

<h2> 2. データを取得するクエリを作成する</h2>

timestampからいくつかの追加のカラムを作成し取得しましょう。

In [None]:
def create_query(phase, EVERY_N):
  if EVERY_N == None:
    EVERY_N = 4 #use full dataset
    
  #select and pre-process fields
  base_query = """
SELECT
  (tolls_amount + fare_amount) AS fare_amount,
  DAYOFWEEK(pickup_datetime) AS dayofweek,
  HOUR(pickup_datetime) AS hourofday,
  pickup_longitude AS pickuplon,
  pickup_latitude AS pickuplat,
  dropoff_longitude AS dropofflon,
  dropoff_latitude AS dropofflat,
  passenger_count*1.0 AS passengers,
  CONCAT(STRING(pickup_datetime), STRING(pickup_longitude), STRING(pickup_latitude), STRING(dropoff_latitude), STRING(dropoff_longitude)) AS key
FROM
  [nyc-tlc:yellow.trips]
WHERE
  trip_distance > 0
  AND fare_amount >= 2.5
  AND pickup_longitude > -78
  AND pickup_longitude < -70
  AND dropoff_longitude > -78
  AND dropoff_longitude < -70
  AND pickup_latitude > 37
  AND pickup_latitude < 45
  AND dropoff_latitude > 37
  AND dropoff_latitude < 45
  AND passenger_count > 0
  """
  
  #add subsampling criteria by modding with hashkey
  if phase == 'train': 
    query = "{} AND ABS(HASH(pickup_datetime)) % {} < 2".format(base_query,EVERY_N)
  elif phase == 'valid': 
    query = "{} AND ABS(HASH(pickup_datetime)) % {} == 2".format(base_query,EVERY_N)
  elif phase == 'test':
    query = "{} AND ABS(HASH(pickup_datetime)) % {} == 3".format(base_query,EVERY_N)
  return query
    
print(create_query('valid', 100)) #example query using 1% of data

もし結果を試してみたければ、上のクエリをhttps://bigquery.cloud.google.com/table/nyc-tlc:yellow.trips から実行してみましょう（LIMIT 10を忘れずに最後に加えてください）

<h2> 3. BigQueryからDataflowジョブを通して前処理を行う</h2>

このコードはBigQueryから読み込みを行い、そのままのデータをGoogle Cloud Storageに保存します。追加の前処理とクレンジングをDataflowの中で行うことができますが、その際は同じ前処理を毎回の推論のたびに忘れずに行う必要があります。<br>
そのため、毎回実行してくれるtf.transformを使用するか、TensorFlowのモデルの中で前処理をする方がよいでしょう。この機能については、他のnotebookで取り扱います。ここでは、シンプルにデータをBigQueryからDataflowを通してCSVファイルに書き出します。

BQからTensorFlowに直接データを読み込むこともできますが（https://www.tensorflow.org/api_docs/python/tf/contrib/cloud/BigQueryReader をご確認ください）、CSVファイルに書き出してからファイルからトレーニングを行う方が便利でしょう。これをスケーラブルに行うために、Dataflowを使いましょう。

これらの処理をクラウドで行うため、GCPコンソール上でジョブ(https://console.cloud.google.com/dataflow)のステータスを確認することができます。前処理のジョブがスタートするまでには、数分かかる場合があります。

In [None]:
%%bash
if gsutil ls | grep -q gs://${BUCKET}/taxifare/ch4/taxi_preproc/; then
  gsutil -m rm -rf gs://$BUCKET/taxifare/ch4/taxi_preproc/
fi

まず、データの前処理を行うための関数を定義しましょう。

In [None]:
import datetime

####
# Arguments:
#   -rowdict: Dictionary. The beam bigquery reader returns a PCollection in
#     which each row is represented as a python dictionary
# Returns:
#   -rowstring: a comma separated string representation of the record with dayofweek
#     converted from int to string (e.g. 3 --> Tue)
####
def to_csv(rowdict):
  days = ['null', 'Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']
  CSV_COLUMNS = 'fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers,key'.split(',')
  rowdict['dayofweek'] = days[rowdict['dayofweek']]
  rowstring = ','.join([str(rowdict[k]) for k in CSV_COLUMNS])
  return rowstring


####
# Arguments:
#   -EVERY_N: Integer. Sample one out of every N rows from the full dataset.
#     Larger values will yield smaller sample
#   -RUNNER: 'DirectRunner' or 'DataflowRunner'. Specfy to run the pipeline
#     locally or on Google Cloud respectively. 
# Side-effects:
#   -Creates and executes dataflow pipeline. 
#     See https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline
####
def preprocess(EVERY_N, RUNNER):
  job_name = 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
  print('Launching Dataflow job {} ... hang on'.format(job_name))
  OUTPUT_DIR = 'gs://{0}/taxifare/ch4/taxi_preproc/'.format(BUCKET)

  #dictionary of pipeline options
  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
    'maxNumWorkers': 8,
    'project': PROJECT,
    'runner': RUNNER
  }
  #instantiate PipelineOptions object using options dictionary
  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  #instantantiate Pipeline object using PipelineOptions
  with beam.Pipeline(options=opts) as p:
      for phase in ['train', 'valid']:
        query = create_query(phase, EVERY_N) 
        outfile = os.path.join(OUTPUT_DIR, '{}.csv'.format(phase))
        (
          p | 'read_{}'.format(phase) >> beam.io.Read(beam.io.BigQuerySource(query=query))
            | 'tocsv_{}'.format(phase) >> beam.Map(to_csv)
            | 'write_{}'.format(phase) >> beam.io.Write(beam.io.WriteToText(outfile))
        )
  print("Done")

それでは、パイプラインをローカルで実行してみましょう。この処理には<b>5分ほど</b>かかります。終了時には"Done"と表示されます。

In [None]:
preprocess(50*10000, 'DirectRunner') 

In [None]:
%%bash
gsutil ls gs://$BUCKET/taxifare/ch4/taxi_preproc/

## 4. BeamパイプラインをCloud Dataflow上で実行する

クラウド上で大規模なデータを処理するパイプラインを実行しましょう

In [None]:
%%bash
if gsutil ls | grep -q gs://${BUCKET}/taxifare/ch4/taxi_preproc/; then
  gsutil -m rm -rf gs://$BUCKET/taxifare/ch4/taxi_preproc/
fi

以下のステップは<b>15-20分</b>の時間がかかります。[GCPコンソール上で](https://console.cloud.google.com/dataflow)ジョブの進捗を確認しましょう。

In [None]:
preprocess(50*100, 'DataflowRunner') 
#全データセットを実行する際には、1つ目の引数をNoneに変更してください

ジョブが終了したら、Google Cloud Storage上に作成されているファイルを確認してみましょう。

In [None]:
%%bash
gsutil ls -l gs://$BUCKET/taxifare/ch4/taxi_preproc/

In [None]:
%%bash
#print first 10 lines of first shard of train.csv
gsutil cat "gs://$BUCKET/taxifare/ch4/taxi_preproc/train.csv-00000-of-*" | head

## 5. 新しいデータを使用してモデルを作成する

ローカルで開発を行うために、前処理をしたデータの一つめのシャードを取得しましょう

In [None]:
%%bash
if [ -d sample ]; then
  rm -rf sample
fi
mkdir sample
gsutil cat "gs://$BUCKET/taxifare/ch4/taxi_preproc/train.csv-00000-of-*" > sample/train.csv
gsutil cat "gs://$BUCKET/taxifare/ch4/taxi_preproc/valid.csv-00000-of-*" > sample/valid.csv

ここでは、INPUT_COLUMNSの中に2つの新しい特徴量、特徴量エンジニアリングを行った3つの特徴量、そしてバケット化とフィーチャークロスを含んだモデルがあります。

In [None]:
%%bash
grep -A 20 "INPUT_COLUMNS =" taxifare/trainer/model.py

In [None]:
%%bash
grep -A 50 "build_estimator" taxifare/trainer/model.py

In [None]:
%%bash
grep -A 15 "add_engineered(" taxifare/trainer/model.py

ローカル上のサンプルデータを使って、新しいモデルが上手く動くかどうか試してみましょう。(<b>5分ほど</b>の時間がかかります)

In [None]:
%%bash
rm -rf taxifare.tar.gz taxi_trained
export PYTHONPATH=${PYTHONPATH}:${PWD}/taxifare
python -m trainer.task \
  --train_data_paths=${PWD}/sample/train.csv \
  --eval_data_paths=${PWD}/sample/valid.csv  \
  --output_dir=${PWD}/taxi_trained \
  --train_steps=10 \
  --job-dir=/tmp

In [None]:
%%bash
ls taxi_trained/export/exporter/

```saved_model_cli```を利用して、エクスポートされたsignatureを見ることができます。ここでは、モデルは特徴量エンジニアリングを施した特徴量を必要としないことに注目してください。latdiff、londiff、そして二点のユークリッド距離は、serving_input_fn内の```add_engineered```のおかげで計算されます。

In [None]:
%%bash
model_dir=$(ls ${PWD}/taxi_trained/export/exporter | tail -1)
saved_model_cli show --dir ${PWD}/taxi_trained/export/exporter/${model_dir} --all

In [None]:
%%writefile /tmp/test.json
{"dayofweek": "Sun", "hourofday": 17, "pickuplon": -73.885262, "pickuplat": 40.773008, "dropofflon": -73.987232, "dropofflat": 40.732403, "passengers": 2}

In [None]:
%%bash
model_dir=$(ls ${PWD}/taxi_trained/export/exporter)
gcloud ai-platform local predict \
  --model-dir=${PWD}/taxi_trained/export/exporter/${model_dir} \
  --json-instances=/tmp/test.json

## 5. クラウド上でトレーニングを行う

下のセルは、ジョブを送信するとすぐに結果を返しますが、この処理には<b> 10-15分</b>の時間がかかります。ジョブの進捗は[GCPコンソールのAI Platform](https://console.cloud.google.com/mlengine)上で確認し、ジョブが完了するのを待ってください。

In [None]:
%%bash
OUTDIR=gs://${BUCKET}/taxifare/ch4/taxi_trained
JOBNAME=lab4a_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
  --region=$REGION \
  --module-name=trainer.task \
  --package-path=${PWD}/taxifare/trainer \
  --job-dir=$OUTDIR \
  --staging-bucket=gs://$BUCKET \
  --scale-tier=BASIC \
  --runtime-version=$TFVERSION \
  -- \
  --train_data_paths="gs://$BUCKET/taxifare/ch4/taxi_preproc/train*" \
  --eval_data_paths="gs://${BUCKET}/taxifare/ch4/taxi_preproc/valid*"  \
  --train_steps=5000 \
  --output_dir=$OUTDIR

RMSEが8.33249となり、前回の9.3よりも改善されました。もちろん、より大きなデータセットに対して学習と検証をしてみるまで結果はわかりませんが、これは期待が持てそうです。<br>
しかしその前に、ハイパーパラメータ・チューニングを行いましょう。

<b>コンソールでジョブのモニタリングを行い、完了するまで次に進まないでください。</b>

In [None]:
%%bash
gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1

In [None]:
%%bash
model_dir=$(gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1)
saved_model_cli show --dir ${model_dir} --all

In [None]:
%%bash
model_dir=$(gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1)
gcloud ai-platform local predict \
  --model-dir=${model_dir} \
  --json-instances=/tmp/test.json

### オプション: モデルをクラウドにデプロイする

In [None]:
%%bash
MODEL_NAME="feateng"
MODEL_VERSION="v1"
MODEL_LOCATION=$(gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1)
echo "Run these commands one-by-one (the very first time, you'll create a model and then create a version)"
#gcloud ai-platform versions delete ${MODEL_VERSION} --model ${MODEL_NAME}
#gcloud ai-platform models delete ${MODEL_NAME}
gcloud ai-platform models create ${MODEL_NAME} --regions $REGION
gcloud ai-platform versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --runtime-version $TFVERSION

In [None]:
%%bash
gcloud ai-platform predict --model=feateng --version=v1 --json-instances=/tmp/test.json

<h2> 6. ハイパーパラメータ・チューニング</h2>

<a href="hyperparam.ipynb">ハイパーパラメータ・チューニングのnotebook</a>を開いて、モデルにどのようなパラメータを利用するか決めましょう。それに基づいて、以下のハイパーパラメータを選択しました：

<ol>
<li> train_batch_size: 512 </li>
<li> nbuckets: 16 </li>
<li> hidden_units: "64 64 64 8" </li>    
</ol>

これにより、RMSEは5となり、8.3のモデルと比べて大きく改善することができました。それでは、大規模なデータセットに対して学習を行ってみましょう。

# オプション: 200万行のデータセットを使ってクラウド上でトレーニングを実行する

以下を実行すると、10個のワーカー（STANDARD_1 tier)を稼働させて200万行のデータを使ったトレーニングが実行され、20分ほどの時間がかかります。モデル自体は上のものと全く同じです。<br>
唯一のち外は、入力データ（より大規模なデータセット）とCloud ML Engineのtierです。（STANDARD_1はBASICより10倍のパフォーマンスを発揮します）Dataflowで前処理が15分ほどかかるので、パブリックなバケットから、処理済みのCSVファイルを取得して利用します。

分散学習をする際には、num_epochsではなくtrain_stepsを指定しましょう。それぞれの分散ワーカーは全体のデータセットの量を知りませんが、train_steps = num_rows × num_epochs ÷ train_batch_sizeで計算することができます。<br>
今回のケースでは、 $2141023 × 100 ÷ 512 = 418168$のtrain_stepsとなります


In [None]:
%%bash

## WARNING -- this uses significant resources and is optional.

OUTDIR=gs://${BUCKET}/taxifare/feateng2m
JOBNAME=lab4a_$(date -u +%y%m%d_%H%M%S)
TIER=STANDARD_1 
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=${PWD}/taxifare/trainer \
   --job-dir=$OUTDIR \
   --staging-bucket=gs://$BUCKET \
   --scale-tier=$TIER \
   --runtime-version=$TFVERSION \
   -- \
   --train_data_paths="gs://cloud-training-demos/taxifare/ch4/taxi_preproc/train*" \
   --eval_data_paths="gs://cloud-training-demos/taxifare/ch4/taxi_preproc/valid*"  \
   --output_dir=$OUTDIR \
   --train_steps=418168 \
   --train_batch_size=512 --nbuckets=16 --hidden_units="64 64 64 8"

### Tensorboardを開く

<ol>
<li>以下を実行し、出力をCloud Shellで実行する
<li>Cloud Shell上のWeb Previewをクリックする
<li>Preview on port 8080をクリックする
</ol>


In [None]:
OUTDIR='gs://{0}/taxifare/feateng2m'.format(BUCKET)
print('tensorboard --port=8080 --logdir={0}'.format(OUTDIR))

200万行のデータセットで学習したモデルのRMSEは \$3.03となりました。  以下のグラフが改善のステップを示しています。

In [None]:
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt

df = pd.DataFrame({'Lab' : pd.Series(['1a', '2-3', '4a', '4b', '4c']),
              'Method' : pd.Series(['Heuristic Benchmark', 'tf.learn', '+Feature Eng.', '+ Hyperparam', '+ 2m rows']),
              'RMSE': pd.Series([8.026, 9.4, 8.3, 5.0, 3.03]) })

plt.figure(figsize=(10,8))
ax = sns.barplot(data = df, x = 'Method', y = 'RMSE')
ax.set_ylabel('RMSE (dollars)')
ax.set_xlabel('Labs/Methods')
plt.plot(np.linspace(0, 4, 1000), [5] * 1000, 'b');

In [None]:
%%bash
gsutil -m mv gs://${BUCKET}/taxifare/ch4/ gs://${BUCKET}/taxifare/ch4_1m/

Copyright 2016 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License