# Estimators - Distributed training and monitoring

This is a timed lab from the Coursera/Google GCP Data Engineering Course on Serverless ML.  It has been updated to Python3 and reworked a bit to create a more readable and usable experience.

In this notebook, call ```train_and_evaluate``` instead of hand-coding a ML pipeline. This allows us to carry out evaluation as part of our training loop instead of as a separate step. It also adds in failure-handling that is necessary for distributed training capabilities.

Use TensorBoard to monitor the training.

*Note*: It would be wonderful to run this on colaboratory, but bigquery is not available on colaboratory - at least not easily available.  It may be available as an API call with a token - that remains to be seen.  Because of this, this notebook needs to be run on DataLab.

In [17]:
import datalab.bigquery as bq
import tensorflow as tf
import numpy as np
import shutil
import tensorflow as tf
import os

# Print TensorFlow version and the path.  Data files will be in the "TaxiData" subfolder, but we need to know what the path is from the notebook.
print("TensorFlow Version: {}".format(tf.__version__))
print("Relative Path     : {}".format(os.getcwd()))

TensorFlow Version: 1.8.0
Relative Path     : /content/datalab/notebooks


## Input

Read the data, but in a more general manner - read in batches.  Instead of using Pandas which must fit in memory, we use Datasets.

In this case, we write a read_dataset function that reads the data.  It discerns between train and test (the mode).  And a batch size is set.

In [18]:
CSV_COLUMNS = ['fare_amount', 'pickuplon','pickuplat','dropofflon','dropofflat','passengers', 'key']
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]

def read_dataset(filename, mode, batch_size = 512):
  def _input_fn():
    def decode_csv(value_column):
      columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
      features = dict(zip(CSV_COLUMNS, columns))
      label = features.pop(LABEL_COLUMN)
      return features, label
    
    # Create list of files that match pattern
    file_list = tf.gfile.Glob(filename)

    # Create dataset from file list
    dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
    
    if mode == tf.estimator.ModeKeys.TRAIN:
        num_epochs = None # indefinitely
        dataset = dataset.shuffle(buffer_size = 10 * batch_size)
    else:
        num_epochs = 1 # end-of-input after this
 
    dataset = dataset.repeat(num_epochs).batch(batch_size)
    return dataset.make_one_shot_iterator().get_next()
  return _input_fn

## Create features out of input data

For now, pass features through.

In [19]:
INPUT_COLUMNS = [
    tf.feature_column.numeric_column('pickuplon'),
    tf.feature_column.numeric_column('pickuplat'),
    tf.feature_column.numeric_column('dropofflat'),
    tf.feature_column.numeric_column('dropofflon'),
    tf.feature_column.numeric_column('passengers'),
]

def add_more_features(feats):
  # Nothing to add (yet!)
  return feats

feature_cols = add_more_features(INPUT_COLUMNS)

## train_and_evaluate

Write a serving_input_fn particular to the problem being solved.

In [20]:
def serving_input_fn():
  feature_placeholders = {
    'pickuplon' : tf.placeholder(tf.float32, [None]),
    'pickuplat' : tf.placeholder(tf.float32, [None]),
    'dropofflat' : tf.placeholder(tf.float32, [None]),
    'dropofflon' : tf.placeholder(tf.float32, [None]),
    'passengers' : tf.placeholder(tf.float32, [None]),
  }
  features = {
      key: tf.expand_dims(tensor, -1)
      for key, tensor in feature_placeholders.items()
  }
  return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)

Also, in general this is a more general ```train_and_evaluate``` function.  Make sure that ```serving_input_fn``` is defined before this.  The generality comes because we pass in the file paths instead of hard-coding them in the function as is done in the Coursera/Google class.

In [22]:
def train_and_evaluate(training_file, validation_file, output_dir, num_train_steps):

  estimator = tf.estimator.LinearRegressor(
                       model_dir = output_dir,
                       feature_columns = feature_cols)
  train_spec=tf.estimator.TrainSpec(
                       input_fn = read_dataset(training_file, mode = tf.estimator.ModeKeys.TRAIN),
                       max_steps = num_train_steps)
  exporter = tf.estimator.LatestExporter('exporter', serving_input_fn)
  eval_spec=tf.estimator.EvalSpec(
                       input_fn = read_dataset(validation_file, mode = tf.estimator.ModeKeys.EVAL),
                       steps = None,
                       start_delay_secs = 1, # start evaluating after N seconds
                       throttle_secs = 10,  # evaluate every N seconds
                       exporters = exporter)
  tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

Run the training.

In [21]:
OUTDIR = 'taxi_trained'

In [23]:
shutil.rmtree(OUTDIR, ignore_errors = True) # start fresh each time
train_and_evaluate('/content/datalab/notebooks/TaxiData/taxi-train.csv',
                   '/content/datalab/notebooks/TaxiData/taxi-valid.csv',
                   OUTDIR, num_train_steps = 5000)

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f11b97a3a58>, '_log_step_count_steps': 100, '_num_ps_replicas': 0, '_save_checkpoints_steps': None, '_num_worker_replicas': 1, '_model_dir': 'taxi_trained', '_save_summary_steps': 100, '_evaluation_master': '', '_train_distribute': None, '_task_id': 0, '_master': '', '_service': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_is_chief': True, '_task_type': 'worker', '_tf_random_seed': None, '_keep_checkpoint_every_n_hours': 10000, '_global_id_in_cluster': 0}
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after 10 secs (eval_spec.throttle_secs) or training is finished.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow

## Monitoring with TensorBoard

After training, we can run TensorBoard to see what happened.

In [24]:
from google.datalab.ml import TensorBoard

TensorBoard().start('./{}'.format(OUTDIR))
TensorBoard().list()

Unnamed: 0,logdir,pid,port
0,./taxi_trained,9372,38493
1,./taxi_trained,14172,55015


Stop TensorBoard (by Pid)

In [25]:
for pid in TensorBoard.list()['pid']:
    TensorBoard().stop(pid)
    print('Stopped TensorBoard with pid {}'.format(pid))

Stopped TensorBoard with pid 9372
Stopped TensorBoard with pid 14172


Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License