# Anatomy of Tensorflow Estimator Class
[Canned Estimators]https://www.tensorflow.org/programmers_guide/estimators)

In [1]:
from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils
import tensorflow.contrib.learn as tflearn
import tensorflow.contrib.layers as tflayers
import tensorflow.contrib.metrics as tfmetrics
import tensorflow as tf
import numpy as np
import os


  from ._conv import register_converters as _register_converters


In [2]:
import os
os.environ['BUCKET'] = 'telemar-flights'

## Provide an input function
TensorFlow Estimator needs a callback function that provides features and labels, and takes no input.

In [3]:
CSV_COLUMNS  = ('ontime,dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' + \
                ',carrier,dep_lat,dep_lon,arr_lat,arr_lon,origin,dest').split(',')
LABEL_COLUMN = 'ontime'
DEFAULTS     = [[0.0],[0.0],[0.0],[0.0],[0.0],[0.0],\
                ['na'],[0.0],[0.0],[0.0],[0.0],['na'],['na']]

def read_dataset(filename, mode=tf.estimator.ModeKeys.EVAL, batch_size=512, num_training_epochs=10):

  # the actual input function passed to TensorFlow
  def _input_fn():
    num_epochs = num_training_epochs if mode == tf.estimator.ModeKeys.TRAIN else 1
    
    # could be a path to one file or a file pattern.
    input_file_names = tf.train.match_filenames_once(filename)
    filename_queue = tf.train.string_input_producer(
        input_file_names, num_epochs=num_epochs, shuffle=True)
 
    # read CSV
    reader = tf.TextLineReader()
    _, value = reader.read_up_to(filename_queue, num_records=batch_size)
    value_column = tf.expand_dims(value, -1)
    columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)
    features = dict(zip(CSV_COLUMNS, columns))
    label = features.pop(LABEL_COLUMN)
    return features, label
  
  return _input_fn

# Provide a model
## First: define a helper function
Here we can select the features to build models on.  
Use `tf.feature_column.categorical_column_with_vocabulary_file` (documentation [here](https://www.tensorflow.org/api_docs/python/tf/feature_column/categorical_column_with_vocabulary_file)) to create two new features `origin` and `dest`.

In [4]:
def get_features_raw(origin_file, dest_file):
    real = {
      colname : tf.feature_column.numeric_column(colname) \
          for colname in \
            ('dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' + 
             ',dep_lat,dep_lon,arr_lat,arr_lon').split(',')
    }
    sparse = {
      'carrier': tf.feature_column.categorical_column_with_vocabulary_list('carrier',
                  vocabulary_list='AS,B6,WN,HA,OO,F9,NK,EV,DL,UA,US,AA,MQ,VX'.split(','),
                  dtype=tf.string)
      # HANDS ON: insert your solution here
      , 'origin': tf.feature_column.categorical_column_with_vocabulary_file('origin',origin_file)
      , 'dest'   : tf.feature_column.categorical_column_with_vocabulary_file('dest',dest_file)
    }
    return real, sparse

def get_features(origin_file, dest_file):
    return get_features_raw(origin_file, dest_file)

## Wide and Deep
Use `tf.feature_column.crossed_column` (documented [here](https://www.tensorflow.org/api_docs/python/tf/feature_column/crossed_column)) to create the following features:
- `dep_loc` as cross of `d_dep_lat` and  `d_dep_lon`, determine it's bucket size from the discretization of  `d_dep_lat` and  `d_dep_lon`;
- `arr_loc` as cross of `d_arr_lat` and  `d_arr_lon`, determine it's bucket size from the discretization of  `d_arr_lat` and  `d_arr_lon`;
- `dep_arr` as cross of `dep_loc` and  `arr_loc`, determine it's bucket size from the discretization of `dep_loc` and  `arr_loc`;
- `ori_dest` as cross of `origin` and `dest`, using 1000 buckets.

In [5]:
def parse_hidden_units(s):
    return [int(item) for item in s.split(',')]

def create_embed(sparse_col):
    dim = 10 # default
    if hasattr(sparse_col, 'bucket_size'):
       nbins = sparse_col.bucket_size
       if nbins is not None:
          dim = 1 + int(round(np.log2(nbins)))
    return tf.feature_column.embedding_column(sparse_col, dimension=dim)  
  
def wide_and_deep_model(output_dir,  origin_file, dest_file, nbuckets=5, hidden_units='64,32', learning_rate=0.01):
    real, sparse = get_features(origin_file, dest_file)

    # the lat/lon columns can be discretized to yield "air traffic corridors"
    latbuckets = np.linspace(20.0, 50.0, nbuckets).tolist()  # USA
    lonbuckets = np.linspace(-120.0, -70.0, nbuckets).tolist() # USA
    disc = {}
    disc.update({
       'd_{}'.format(key) : tf.feature_column.bucketized_column(real[key], latbuckets) \
          for key in ['dep_lat', 'arr_lat']
    })
    disc.update({
       'd_{}'.format(key) : tf.feature_column.bucketized_column(real[key], lonbuckets) \
          for key in ['dep_lon', 'arr_lon']
    })

    # cross columns that make sense in combination
    # HANDS ON: insert your solution here
    sparse['dep_loc'] = tf.feature_column.crossed_column([disc['d_dep_lat'], disc['d_dep_lon']],\
                                                nbuckets*nbuckets)
    sparse['arr_loc'] = tf.feature_column.crossed_column([disc['d_arr_lat'], disc['d_arr_lon']],\
                                                nbuckets*nbuckets)
    sparse['dep_arr'] = tf.feature_column.crossed_column([sparse['dep_loc'], sparse['arr_loc']],\
                                                nbuckets ** 4)
    sparse['ori_dest'] = tf.feature_column.crossed_column([sparse['origin'], sparse['dest']], \
                                                hash_bucket_size=1000)
    
    # create embeddings of all the sparse columns
    embed = {
       colname : create_embed(col) \
          for colname, col in sparse.items()
    }
    real.update(embed)
    
    estimator = \
        tf.estimator.DNNLinearCombinedClassifier(model_dir=output_dir,
                                           linear_feature_columns=sparse.values(),
                                           dnn_feature_columns=real.values(),
                                           dnn_hidden_units=parse_hidden_units(hidden_units),
                                           loss_reduction=tf.losses.Reduction.SUM_OVER_BATCH_SIZE,
                                           linear_optimizer=tf.train.FtrlOptimizer(learning_rate=learning_rate),
                                           dnn_optimizer=tf.train.AdagradOptimizer(learning_rate=learning_rate*0.25))
    
    return estimator

# Provide a function for REST API
Inference will be requested on data coming from a JSON

In [10]:
def serving_input_fn():
    feature_placeholders = {
      key : tf.placeholder(tf.float32, [None]) \
        for key in ('dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' +
             ',dep_lat,dep_lon,arr_lat,arr_lon').split(',')
    }
    feature_placeholders.update( {
      key : tf.placeholder(tf.string, [None]) \
        for key in 'carrier,origin,dest'.split(',')
    } )

    features = {
      key: tf.expand_dims(tensor, -1)
      for key, tensor in feature_placeholders.items()
    }
    return tf.estimator.export.build_raw_serving_input_receiver_fn(feature_placeholders)

# Add custom metrics

In [11]:
def my_rmse(predictions, labels, **args):
  prob_ontime = predictions['probabilities'][:,1]

  return {'rmse': tf.metrics.root_mean_squared_error(prob_ontime, labels)}

## Run a small training session on datalab

In [7]:
%%bash
echo "reading from $BUCKET"

DATA_DIR=data/flights
rm -rf $DATA_DIR
mkdir -p $DATA_DIR

for STEP in train test; do
  gsutil cp gs://$BUCKET/flights/chapter8/output/${STEP}Flights-00001*.csv full.csv
  head -10003 full.csv > $DATA_DIR/${STEP}.csv
  rm full.csv
done

ls -l $DATA_DIR

reading from telemar-flights
total 1684
-rw-r--r-- 1 root root 749937 Jun 19 10:09 test.csv
-rw-r--r-- 1 root root 969431 Jun 19 10:09 train.csv


Copying gs://telemar-flights/flights/chapter8/output/trainFlights-00001-of-00007.csv...
/ [0 files][    0.0 B/108.4 MiB]                                                -- [0 files][ 85.9 MiB/108.4 MiB]                                                - [1 files][108.4 MiB/108.4 MiB]                                                \
Operation completed over 1 objects/108.4 MiB.                                    
Copying gs://telemar-flights/flights/chapter8/output/testFlights-00001-of-00007.csv...
/ [0 files][    0.0 B/732.4 KiB]                                                / [1 files][732.4 KiB/732.4 KiB]                                                
Operation completed over 1 objects/732.4 KiB.                                    


In [12]:
%%bash
rm -rf trained_model

In [None]:
import json
import os

import tensorflow as tf
from tensorflow.contrib.learn.python.learn import learn_runner

BUCKET = os.environ['BUCKET']

arguments = {'traindata': 'data/flights/train.csv',
             'evaldata': 'data/flights/test.csv',
             'origin_file': os.path.join('gs://'+BUCKET,'flights/chapter8/keys/origin.txt'),
             'dest_file': os.path.join('gs://'+BUCKET,'flights/chapter8/keys/dest.txt'),
             'num_training_epochs': 1,
             'batch_size': 100,
             'nbuckets': 5,  
             'hidden_units': '64,64,64,16,4', # Architecture of DNN part of wide-and-deep network
             'learning_rate': 0.001 }

output_dir = 'trained_model'
# when hp-tuning, we need to use different output directories for different runs
output_dir = os.path.join(
    output_dir,
    json.loads(
        os.environ.get('TF_CONFIG', '{}')
    ).get('task', {}).get('trial', '')
)
 

# run
tf.reset_default_graph()
tf.logging.set_verbosity(tf.logging.INFO)

# create estimator
estimator = wide_and_deep_model(output_dir,
                                arguments['origin_file'],
                                arguments['dest_file'], 
                                arguments['nbuckets'],
                                arguments['hidden_units'],
                                arguments['learning_rate'])

estimator = tf.contrib.estimator.add_metrics(estimator, 
                                             my_rmse)

train_spec = tf.estimator.TrainSpec(input_fn=read_dataset(arguments['traindata'], 
                                                          mode=tf.estimator.ModeKeys.TRAIN, 
                                                          batch_size=arguments['batch_size'], 
                                                          num_training_epochs=arguments['num_training_epochs']))

eval_spec = tf.estimator.EvalSpec(input_fn=read_dataset(arguments['evaldata']),
                                  steps = None,
                                  start_delay_secs = 10, # start evaluating after N seconds
                                  throttle_secs = 600)  # evaluate every N seconds

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

estimator.export_savedmodel(os.path.join(output_dir,'Servo'),
                            serving_input_receiver_fn=serving_input_fn())

INFO:tensorflow:vocabulary_size = 322 in origin is inferred from the number of elements in the vocabulary_file gs://telemar-flights/flights/chapter8/keys/origin.txt.
INFO:tensorflow:vocabulary_size = 322 in dest is inferred from the number of elements in the vocabulary_file gs://telemar-flights/flights/chapter8/keys/dest.txt.
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_task_type': 'worker', '_train_distribute': None, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f8604811d90>, '_evaluation_master': '', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_service': None, '_num_ps_replicas': 0, '_tf_random_seed': None, '_master': '', '_num_worker_replicas': 1, '_task_id': 0, '_log_step_count_steps': 100, '_model_dir': 'trained_model/', '_global_id_in_cluster': 0, '_save_summary_steps': 100}
INFO:tensorflo