# Developing, Training, and Deploying a TensorFlow model on Google Cloud Platform (completely within Jupyter)


In Chapter 9 of [Data Science on the Google Cloud Platform](http://shop.oreilly.com/product/0636920057628.do), I trained a TensorFlow Estimator model to predict flight delays.

In this notebook, we'll modernize the workflow:
* Use eager mode for TensorFlow development
* Use tf.data to write the input pipeline
* Run the notebook as-is on Cloud using Deep Learning VM or Kubeflow pipelines
* Deploy the trained model to Cloud ML Engine as a web service

The combination of eager mode, tf.data and DLVM/KFP makes this workflow a lot easier.
We don't need to deal with Python packages or Docker containers.

In [1]:
# change these to try this notebook out
# In "production", these will be replaced by the parameters passed to papermill
BUCKET = ''
PROJECT = 'ma-poc-automation'
REGION = 'us-central1'
DEVELOP_MODE = True
NBUCKETS = 5


In [2]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [3]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


## Creating the input data pipeline

In [4]:
DATA_BUCKET = "gs://cloud-training-demos/flights/chapter8/output/"
TRAIN_DATA_PATTERN = DATA_BUCKET + "train*"
VALID_DATA_PATTERN = DATA_BUCKET + "test*"

### Use tf.data to read the CSV files



In [6]:
import os, json, math
import numpy as np
import tensorflow as tf
print("Tensorflow version " + tf.__version__)

Tensorflow version 1.15.0


In [7]:
CSV_COLUMNS  = ('ontime,dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' + \
                ',carrier,dep_lat,dep_lon,arr_lat,arr_lon,origin,dest').split(',')
LABEL_COLUMN = 'ontime'
DEFAULTS     = [[0.0],[0.0],[0.0],[0.0],[0.0],[0.0],\
                ['na'],[0.0],[0.0],[0.0],[0.0],['na'],['na']]

def decode_csv(line):
  column_values = tf.io.decode_csv(line, DEFAULTS)
  column_names = CSV_COLUMNS
  decoded_line = dict(zip(column_names, column_values)) # create a dictionary {'column_name': value, ...} for each line  
  return decoded_line

def load_dataset(pattern):
  filenames = tf.data.Dataset.list_files(pattern)
  dataset = filenames.interleave(tf.data.TextLineDataset, cycle_length=16)
  dataset = dataset.map(decode_csv)
  return dataset

In [9]:
%%writefile example_input.json
{"dep_delay": 14.0, "taxiout": 13.0, "distance": 319.0, "avg_dep_delay": 25.863039, "avg_arr_delay": 27.0, "carrier": "WN", "dep_lat": 32.84722, "dep_lon": -96.85167, "arr_lat": 31.9425, "arr_lon": -102.20194, "origin": "DAL", "dest": "MAF"}
{"dep_delay": -9.0, "taxiout": 21.0, "distance": 301.0, "avg_dep_delay": 41.050808, "avg_arr_delay": -7.0, "carrier": "EV", "dep_lat": 29.984444, "dep_lon": -95.34139, "arr_lat": 27.544167, "arr_lon": -99.46167, "origin": "IAH", "dest": "LRD"}

Overwriting example_input.json


In [11]:
def features_and_labels(features):
  label = features.pop('ontime') # this is what we will train for
  return features, label
  
def prepare_dataset(pattern, batch_size, truncate=None, mode=tf.estimator.ModeKeys.TRAIN):
  dataset = load_dataset(pattern)
  dataset = dataset.map(features_and_labels)
  dataset = dataset.cache()
  if mode == tf.estimator.ModeKeys.TRAIN:
    dataset = dataset.shuffle(1000)
    dataset = dataset.repeat()
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(10)
  if truncate is not None:
    dataset = dataset.take(truncate)
  print(type(dataset))
  return dataset



## Create TensorFlow wide-and-deep model

We'll create feature columns, and do some discretization and feature engineering.
See the book for details.

In [12]:
from tensorflow import feature_column

real = {
    colname : feature_column.numeric_column(colname) \
          for colname in \
            ('dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' + 
             ',dep_lat,dep_lon,arr_lat,arr_lon').split(',')
}
sparse = {
      'carrier': feature_column.categorical_column_with_vocabulary_list('carrier',
                  vocabulary_list='AS,VX,F9,UA,US,WN,HA,EV,MQ,DL,OO,B6,NK,AA'.split(',')),
      'origin' : feature_column.categorical_column_with_hash_bucket('origin', hash_bucket_size=1000),
      'dest'   : feature_column.categorical_column_with_hash_bucket('dest', hash_bucket_size=1000)
}

### Feature engineering

In [13]:
latbuckets = np.linspace(20.0, 50.0, NBUCKETS).tolist()  # USA
lonbuckets = np.linspace(-120.0, -70.0, NBUCKETS).tolist() # USA
disc = {}
disc.update({
       'd_{}'.format(key) : feature_column.bucketized_column(real[key], latbuckets) \
          for key in ['dep_lat', 'arr_lat']
})
disc.update({
       'd_{}'.format(key) : feature_column.bucketized_column(real[key], lonbuckets) \
          for key in ['dep_lon', 'arr_lon']
})

# cross columns that make sense in combination
sparse['dep_loc'] = feature_column.crossed_column([disc['d_dep_lat'], disc['d_dep_lon']], NBUCKETS*NBUCKETS)
sparse['arr_loc'] = feature_column.crossed_column([disc['d_arr_lat'], disc['d_arr_lon']], NBUCKETS*NBUCKETS)
sparse['dep_arr'] = feature_column.crossed_column([sparse['dep_loc'], sparse['arr_loc']], NBUCKETS ** 4)
sparse['ori_dest'] = feature_column.crossed_column(['origin', 'dest'], hash_bucket_size=1000)

# embed all the sparse columns
embed = {
       colname : feature_column.embedding_column(col, 10) \
          for colname, col in sparse.items()
}
real.update(embed)

if DEVELOP_MODE:
    print(sparse.keys())
    print(real.keys())

dict_keys(['carrier', 'origin', 'dest', 'dep_loc', 'arr_loc', 'dep_arr', 'ori_dest'])
dict_keys(['dep_delay', 'taxiout', 'distance', 'avg_dep_delay', 'avg_arr_delay', 'dep_lat', 'dep_lon', 'arr_lat', 'arr_lon', 'carrier', 'origin', 'dest', 'dep_loc', 'arr_loc', 'dep_arr', 'ori_dest'])


### Serving

This serving input function is how the model will be deployed for prediction. We require these fields for prediction

In [14]:
def serving_input_fn():
    feature_placeholders = {
        # All the real-valued columns
        column: tf.placeholder(tf.float32, [None]) \
             for column in ('dep_delay,taxiout,distance,avg_dep_delay,avg_arr_delay' + 
                            ',dep_lat,dep_lon,arr_lat,arr_lon').split(',')
    }
    feature_placeholders.update({
        column: tf.placeholder(tf.string, [None]) for column in ['carrier', 'origin', 'dest']
    })
    features = feature_placeholders # no transformations
    return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)

## Train the model and evaluate once in a while

Also checkpoint

In [None]:
model_dir='gs://{}/flights/trained_model'.format(BUCKET)
os.environ['OUTDIR'] = model_dir  # needed for deployment
print('Writing trained model to {}'.format(model_dir))

In [None]:
!gsutil -m rm -rf $OUTDIR

In [19]:
estimator = tf.estimator.DNNLinearCombinedClassifier(
        model_dir = model_dir,
        linear_feature_columns = sparse.values(),
        dnn_feature_columns = real.values(),
        dnn_hidden_units = [64, 32]
)

train_batch_size = 64
train_input_fn = lambda: prepare_dataset(TRAIN_DATA_PATTERN, train_batch_size).make_one_shot_iterator().get_next()
eval_batch_size = 100 if DEVELOP_MODE else 10000
eval_input_fn = lambda: prepare_dataset(VALID_DATA_PATTERN, eval_batch_size, eval_batch_size*10, tf.estimator.ModeKeys.EVAL).make_one_shot_iterator().get_next()
num_steps = 10 if DEVELOP_MODE else (1000000 // train_batch_size)

train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps = num_steps)
exporter = tf.estimator.LatestExporter('exporter', serving_input_fn)
eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=10, exporters=exporter)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': 'gs://hostedkfp-default-bmeiqdunw2/flights/trained_model', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f2c3a7738d0>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:N

({'accuracy': 0.234,
  'accuracy_baseline': 0.766,
  'auc': 0.5,
  'auc_precision_recall': 0.883,
  'average_loss': 26.094366,
  'label/mean': 0.766,
  'loss': 2609.4365,
  'precision': 0.0,
  'prediction/mean': 1.260668e-06,
  'recall': 0.0,
  'global_step': 10},
 [b'gs://hostedkfp-default-bmeiqdunw2/flights/trained_model/export/exporter/1587687502'])

Copyright 2016 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License