<a href="https://colab.research.google.com/github/rkrissada/100DayOfMLCode/blob/master/day_060_distributed_training_and_monitoring.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Distributed training and monitoring

we refactor to call train_and_evaluate instead of hand-coding our ML pipeline. This allows us to carry out evaluation as part of our training loop instead of as a separate step. It also adds in failure-handling that is necessary for distributed training capabilities.

In [0]:
import tensorflow as tf
import numpy as np
import shutil

### Input

that we are reading in batches. Instead of using Pandas, we will use Datasets.

In [0]:
csv_columns = ['fare_amount', 'pickuplon','pickuplat','dropofflon','dropofflat','passengers', 'key']
label_column = 'fare_amount'
defaults = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]

def read_dataset(filename, mode, batch_size = 512):
  def _input_fn():
    def decode_csv(value_column):
      columns = tf.decode_csv(value_column,record_defaults = defaults)
      features = dict(zip(csv_columns, columns))
      label = features.pop(label_column)
      return features, label
    
    file_list = tf.gfile.Glob(filename)
    
    dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
    
    if mode == tf.estimator.ModeKeys.TRAIN:
      num_epochs = None
      dataset = dataset.shuffle(buffer_size = 10 * batch_size)
    else:
      num_epochs = 1
    
    dataset = dataset.repeat(num_epochs).batch(batch_size)
    return dataset.make_one_shot_iterator().get_next()
  return _input_fn

### Create features out of input data

For now, pass these through.

In [0]:
input_columns = [
    tf.feature_column.numeric_column('pickuplon'),
    tf.feature_column.numeric_column('pickuplat'),
    tf.feature_column.numeric_column('dropofflat'),
    tf.feature_column.numeric_column('dropofflon'),
    tf.feature_column.numeric_column('passengers'),
]

def add_more_features(feats):
  return feats

feature_cols = add_more_features(input_columns)

### train_and_evaluate

In [0]:
def serving_input_fn():
  feature_placeholders = {
      'pickuplon' : tf.placeholder(tf.float32, [None]),
      'pickuplat' : tf.placeholder(tf.float32, [None]),
      'dropofflat' : tf.placeholder(tf.float32, [None]),
      'dropofflon' : tf.placeholder(tf.float32, [None]),
      'passengers' : tf.placeholder(tf.float32, [None]),
  }
  
  features = {
      key: tf.expand_dims(tensor, -1)
      for key, tensor in feature_placeholders.items()
  }
  return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)

In [0]:
def train_and_evaluate(output_dir, num_train_steps):
  estimator = tf.estimator.LinearRegressor(
    model_dir = output_dir,
    feature_columns = feature_cols)
  train_spec = tf.estimator.TrainSpec(
    input_fn = read_dataset('./taxi-train.csv', mode = tf.estimator.ModeKeys.TRAIN),
    max_steps = num_train_steps)
  exporter = tf.estimator.LatestExporter('exporter',serving_input_fn)
  eval_spec = tf.estimator.EvalSpec(
    input_fn = read_dataset('./taxi-valid.csv', mode = tf.estimator.ModeKeys.TRAIN),
    steps = None,
    start_delay_secs = 1,
    throttle_secs = 10,
    exporters = exporter)
  tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

In [0]:
outdir = 'taxi_trained'
shutil.rmtree(outdir, ignore_errors = True)
train_and_evaluate(outdir, num_train_steps = 5000)

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': 'taxi_trained', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7faa85f82908>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Star

### Monitoring with TensorBoard

In [0]:
from google.datalab.ml import TensorBoard
TensorBoard().start('./taxi_trained')
TensorBoard().list()

In [0]:
# to stop TensorBoard
for pid in TensorBoard.list()['pid']:
    TensorBoard().stop(pid)
    print('Stopped TensorBoard with pid {}'.format(pid))