<h1> Structured data prediction using Cloud ML Engine </h1>

This notebook illustrates:
<ol>
<li> Creating datasets for Machine Learning using Dataflow
<li> Creating a model using the high-level Estimator API 
<li> Training on Cloud ML Engine
<li> Deploying model
<li> Predicting with model
</ol>

In [3]:
# change these to try this notebook out
#BUCKET = 'tflbikeuse'
#PROJECT = 'TFLBikeUse'
#REGION = 'us-central1'
import shutil
import numpy as np
import tensorflow as tf
BUCKET = 'tflbikesbucketregional'
PROJECT = 'TFLBikeUse-philipkm'
PROJECT_ID = 'tflbikeuse-philipkm'
REGION = 'us-central1'

In [4]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

<h2> Create ML dataset using Dataflow (I didn't do this)</h2>
<p>
We can use Cloud Dataflow to read in the BigQuery data and write it out as CSV files. 

Instead of using Beam/Dataflow, I had three other options:
<ol>
<li> Use Cloud Dataprep to visually author a Dataflow pipeline. Cloud Dataprep also allows me to explore the data, so we could have avoided much of the handcoding of Python/Seaborn calls above as well!
<li> Read from BigQuery directly using TensorFlow. 
<li> Use the BigQuery console (http://bigquery.cloud.google.com) to run a Query and save the result as a CSV file. For larger datasets, you may have to select the option to "allow large results" and save the result into a CSV file on Google Cloud Storage.
</ol>
<p>
However, in this case, I want to do some preprocessing, modifying data so that we can simulate what is known if no ultrasound has been performed. If I didn't need preprocessing, I could have used the web console. Also, I prefer to script it out rather than run queries on the user interface, so I am using Cloud Dataflow for the preprocessing.
<p>
Note that I have set in_test_mode=True in the following code -- this will run the code locally on a small subset of the data -- the full results were copied into your bucket using the following code (in the previous cell):
<pre>
gsutil -m cp -R gs://cloud-training-demos/babyweight gs://${BUCKET}
</pre>
If you are running in your own project, change in_test_mode=False; after you launch this, the notebook will appear to be hung. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about <b>30 minutes</b> for me with autoscaling to 15 workers -- Qwiklabs accounts won't be able scale to that level, and so doing it on Qwiklabs would have taken 3-4 hours. Hence, the short-cut.

In [7]:
#PHILM: I skipped this part
import apache_beam as beam
import datetime

def to_csv(rowdict):
    # pull columns from BQ and create a line
    import hashlib
    import copy
    CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',')
    
    # create synthetic data where we assume that no ultrasound has been performed
    # and so we don't know sex of the baby. Let's assume that we can tell the difference
    # between single and multiple, but that the errors rates in determining exact number
    # is difficult in the absence of an ultrasound.
    no_ultrasound = copy.deepcopy(rowdict)
    w_ultrasound = copy.deepcopy(rowdict)

    no_ultrasound['is_male'] = 'Unknown'
    if rowdict['plurality'] > 1:
      no_ultrasound['plurality'] = 'Multiple(2+)'
    else:
      no_ultrasound['plurality'] = 'Single(1)'
      
    # Change the plurality column to strings
    w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality']-1]
    
    # Write out two rows for each input row, one with ultrasound and one without
    for result in [no_ultrasound, w_ultrasound]:
      data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
      key = hashlib.sha224(data).hexdigest()  # hash the columns to form a key
      yield str('{},{}'.format(data, key))
  
def preprocess(in_test_mode):
    job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    
    if in_test_mode:
        print 'Launching local job ... hang on'
        OUTPUT_DIR = './preproc'
    else:
        print 'Launching Dataflow job {} ... hang on'.format(job_name)
        OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
    
    options = {
        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True
    }
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    if in_test_mode:
        RUNNER = 'DirectRunner'
    else:
        RUNNER = 'DataflowRunner'
    p = beam.Pipeline(RUNNER, options=opts)
    query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
AND month > 0
    """
  
    if in_test_mode:
        query = query + ' LIMIT 100' 
  
    for step in ['train', 'eval']:
        if step == 'train':
            selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) < 3'.format(query)
        else:
            selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) = 3'.format(query)

        (p 
         | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True))
         | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
         | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
        )
 
    job = p.run()
  
preprocess(in_test_mode=True)

SyntaxError: Missing parentheses in call to 'print' (<ipython-input-7-f68f8f1306c9>, line 37)

<img src="dataflow.png" width="500"/>

<H2> Check Data is Present<H2>

In [20]:
%bash
#gsutil ls gs://tflbikeuse/eval*
#gsutil cat -r 0-256 gs://tflbikeuse/eval*
#gsutil ls gs://tflbikeuse/train*
#gsutil cat -r 0-256 gs://tflbikeuse/train*
gsutil cat gs://tflbikesbucketregional/train*|wc -l

4758633


<h2> Create TensorFlow model using TensorFlow's Estimator API </h2>
<p>
First, write an input_fn to read the data.

In [5]:
CSV_COLUMNS =  'precipIntensity,temperature,badWeather,hour,dayofweek,bikesused,key,weekend'.split(',')
LABEL_COLUMN = 'bikesused'
KEY_COLUMN = 'key'
DEFAULTS = [[0.0],[15.0],["False"],[0],['Monday'],[0.5], ['nokey'],["False"]]

def read_dataset(prefix, pattern, batch_size=512):
    # use prefix to create filename
    filename = 'gs://{}/{}*'.format(BUCKET, prefix, pattern)   
    print(filename)
    if prefix == 'train':
        mode = tf.estimator.ModeKeys.TRAIN
        num_epochs = None # indefinitely
    else:
        mode = tf.estimator.ModeKeys.EVAL
        num_epochs = 1 # end-of-input after this
    
    # the actual input function passed to TensorFlow
    def _input_fn():
        # could be a path to one file or a file pattern.
        input_file_names = tf.train.match_filenames_once(filename)
        filename_queue = tf.train.string_input_producer(
            input_file_names, shuffle=True, num_epochs=num_epochs)
 
        # read CSV
        reader = tf.TextLineReader(skip_header_lines=1)
        _, value = reader.read_up_to(filename_queue, num_records=batch_size)
        if mode == tf.estimator.ModeKeys.TRAIN:
          value = tf.train.shuffle_batch([value], batch_size, capacity=10*batch_size, 
                                         min_after_dequeue=batch_size, enqueue_many=True, 
                                         allow_smaller_final_batch=False)
        value_column = tf.expand_dims(value, -1)
        columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)
        features = dict(zip(CSV_COLUMNS, columns))
        features.pop(KEY_COLUMN)
        #features.pop('callTime')
        label = features.pop(LABEL_COLUMN)
        return features, label
  
    return _input_fn

Next, define the feature columns

In [6]:
def get_wide_deep():
    # define column types
    #is_male,mother_age,plurality,gestation_weeks = \
    badWeather,precipIntensity,temperature,hour,weekend,dayofweek = \
        [\
            #tf.feature_column.numeric_column('badWeather'),
            tf.feature_column.categorical_column_with_vocabulary_list('badWeather',['True','False']),
            tf.feature_column.numeric_column('precipIntensity'),
            tf.feature_column.numeric_column('temperature'),
            tf.feature_column.numeric_column('hour'),
            #tf.feature_column.numeric_column('weekend'),
            tf.feature_column.categorical_column_with_vocabulary_list('weekend',['True','False']),
            tf.feature_column.categorical_column_with_vocabulary_list('dayofweek',['Sunday', 'Monday','Tuesday','Wednesday','Thursday','Friday','Saturday'])
        ]

    # discretize
    hour_buckets = tf.feature_column.bucketized_column(hour, 
                        boundaries=np.arange(0,24,1).tolist())
    
   # precipIntensity_buckets=tf.feature_column.bucketized_column(precipIntensity, 
   #                     boundaries=np.arange(0,25,1).tolist())
   # 
   # temperature_buckets=tf.feature_column.bucketized_column(temperature, 
   #                     boundaries=np.arange(-10,30,1).tolist())
      
    # sparse columns are wide 
    #wide = [badWeather,
    #        #hour_buckets,
    #        weekend,
    #        dayofweek]
   #wide = [badWeather,
   #        hour_buckets,
   #        precipIntensity_buckets,
   #        temperature_buckets,
   #        weekend,
   #        dayofweek]
      
    # feature cross all the wide columns and embed into a lower dimension
    #Columns
    #crossed = tf.feature_column.crossed_column(wide, hash_bucket_size=20000)
    #embed = tf.feature_column.embedding_column(crossed, 4)
    badWeather_indicator = tf.feature_column.indicator_column(badWeather)
    dayofweek_indicator = tf.feature_column.indicator_column(dayofweek)
    weekend_indicator = tf.feature_column.indicator_column(weekend)
    #badWeather=tf.feature_column.indicator_column(categorical_column)
    # continuous columns are deep
    #deep = [badWeather,
    #        embed]
    #indicators = [badWeather_indicator,
                  #dayofweek_indicator,
                  #weekend_indicator]
    #        embed]
    features = [precipIntensity,temperature,hour_buckets,badWeather_indicator,dayofweek_indicator,weekend_indicator]
    return features
   #return features

To predict with the TensorFlow model, we also need a serving input function. We will want all the inputs from our user.

In [7]:
def serving_input_fn():
    feature_placeholders = {
        'hour': tf.placeholder(tf.float32, [None]),
        'dayofweek': tf.placeholder(tf.string, [None]),
        'weekend': tf.placeholder(tf.string, [None]),
        #'badWeather': tf.placeholder(tf.bool, [None])
        'badWeather': tf.placeholder(tf.string, [None]),
        'temperature': tf.placeholder(tf.float32, [None]),
        'precipIntensity': tf.placeholder(tf.float32, [None])
    }
    features = {
        key: tf.expand_dims(tensor, -1)
        for key, tensor in feature_placeholders.items()
    }
    return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)

from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils

Finally, train!

In [8]:
from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils
from tensorflow.contrib.learn.python.learn import learn_runner

PATTERN = ""  # process only one of the shards, for testing purposes

def train_and_evaluate(output_dir):
    features = get_wide_deep()
    estimator = tf.estimator.DNNRegressor(
                         model_dir=output_dir,
                         #linear_feature_columns=wide,
                         #dnn_feature_columns=deep,
                         feature_columns=features,
                         #dnn_hidden_units=[64, 32])#,
                         hidden_units=[64, 32])
                         #config = run_config)
    train_spec=tf.estimator.TrainSpec(
                         input_fn=read_dataset('train', PATTERN),
                         max_steps=10000)
    exporter = tf.estimator.LatestExporter('exporter',serving_input_fn)
    eval_spec=tf.estimator.EvalSpec(
                         input_fn=read_dataset('eval', PATTERN),
                         steps=None,
                         start_delay_secs = 1, # start evaluating after N seconds
                         throttle_secs = 10,  # evaluate every N seconds
                         exporters=exporter)
    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
    
shutil.rmtree('tfl_trained_120419_1600', ignore_errors=True) # start fresh each time
train_and_evaluate('tfl_trained_120419_1600')

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_log_step_count_steps': 100, '_num_ps_replicas': 0, '_evaluation_master': '', '_session_config': None, '_is_chief': True, '_train_distribute': None, '_save_checkpoints_secs': 600, '_service': None, '_model_dir': 'tfl_trained_120419_1600', '_master': '', '_task_type': 'worker', '_tf_random_seed': None, '_save_checkpoints_steps': None, '_save_summary_steps': 100, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f0ffa6e24e0>, '_global_id_in_cluster': 0, '_num_worker_replicas': 1, '_keep_checkpoint_every_n_hours': 10000, '_task_id': 0, '_keep_checkpoint_max': 5}
gs://tflbikesbucketregional/train*
gs://tflbikesbucketregional/eval*
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after 10 secs (eval_spec.throttle_secs) or training is finished.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Do

INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 1632 into tfl_trained_120419_1600/model.ckpt.
INFO:tensorflow:step = 1632, loss = 46.20708
INFO:tensorflow:global_step/sec: 87.4222
INFO:tensorflow:step = 1732, loss = 52.642067 (1.151 sec)
INFO:tensorflow:global_step/sec: 92.7737
INFO:tensorflow:step = 1832, loss = 48.11212 (1.075 sec)
INFO:tensorflow:global_step/sec: 93.7868
INFO:tensorflow:step = 1932, loss = 46.947403 (1.067 sec)
INFO:tensorflow:global_step/sec: 94.5665
INFO:tensorflow:step = 2032, loss = 43.511772 (1.057 sec)
INFO:tensorflow:global_step/sec: 90.1905
INFO:tensorflow:step = 2132, loss = 48.000984 (1.109 sec)
INFO:tensorflow:global_step/sec: 96.3814
INFO:tensorflow:step = 2232, loss = 48.0306 (1.039 sec)
INFO:tensorflow:global_step/sec: 81.542
INFO:tensorflow:step = 2332, loss = 49.65771 (1.225 sec)
INFO:tensorflow:global_step/sec: 92.2175
INFO:tensorflow:step = 2432, loss = 49.38685 (1.087 sec)
INFO:tensorflow:Saving checkpoints for 2

INFO:tensorflow:Loss for final step: 47.904903.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2019-04-12-15:08:48
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from tfl_trained_120419_1600/model.ckpt-4207
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Finished evaluation at 2019-04-12-15:08:58
INFO:tensorflow:Saving dict for global step 4207: average_loss = 0.09216377, global_step = 4207, loss = 47.159664
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Signatures INCLUDED in export for Regress: None
INFO:tensorflow:Signatures INCLUDED in export for Predict: ['predict']
INFO:tensorflow:Signatures INCLUDED in export for Classify: None
INFO:tensorflow:Signatures EXCLUDED from export because they cannot be be served via TensorFlow Serving APIs:
INFO:tensorflow:'serving_default' : Regression input must be a

INFO:tensorflow:'regression' : Regression input must be a single string Tensor; got {'precipIntensity': <tf.Tensor 'Placeholder_5:0' shape=(?,) dtype=float32>, 'temperature': <tf.Tensor 'Placeholder_4:0' shape=(?,) dtype=float32>, 'weekend': <tf.Tensor 'Placeholder_2:0' shape=(?,) dtype=string>, 'badWeather': <tf.Tensor 'Placeholder_3:0' shape=(?,) dtype=string>, 'dayofweek': <tf.Tensor 'Placeholder_1:0' shape=(?,) dtype=string>, 'hour': <tf.Tensor 'Placeholder:0' shape=(?,) dtype=float32>}
INFO:tensorflow:Restoring parameters from tfl_trained_120419_1600/model.ckpt-5955
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: b"tfl_trained_120419_1600/export/exporter/temp-b'1555081781'/saved_model.pb"
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from tfl_trained_120419_1600/model.ckpt-

INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 7712 into tfl_trained_120419_1600/model.ckpt.
INFO:tensorflow:step = 7712, loss = 43.58886
INFO:tensorflow:global_step/sec: 90.1854
INFO:tensorflow:step = 7812, loss = 48.03334 (1.114 sec)
INFO:tensorflow:global_step/sec: 95.8177
INFO:tensorflow:step = 7912, loss = 46.93476 (1.046 sec)
INFO:tensorflow:global_step/sec: 94.7508
INFO:tensorflow:step = 8012, loss = 40.14379 (1.053 sec)
INFO:tensorflow:global_step/sec: 96.5349
INFO:tensorflow:step = 8112, loss = 45.30046 (1.038 sec)
INFO:tensorflow:global_step/sec: 94.2857
INFO:tensorflow:step = 8212, loss = 46.92676 (1.061 sec)
INFO:tensorflow:global_step/sec: 98.5955
INFO:tensorflow:step = 8312, loss = 44.25933 (1.011 sec)
INFO:tensorflow:global_step/sec: 97.4357
INFO:tensorflow:step = 8412, loss = 50.48991 (1.026 sec)
INFO:tensorflow:global_step/sec: 95.1432
INFO:tensorflow:step = 8512, loss = 48.472145 (1.050 sec)
IN

INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Finished evaluation at 2019-04-12-15:11:24
INFO:tensorflow:Saving dict for global step 10000: average_loss = 0.09252347, global_step = 10000, loss = 47.34372
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Signatures INCLUDED in export for Regress: None
INFO:tensorflow:Signatures INCLUDED in export for Predict: ['predict']
INFO:tensorflow:Signatures INCLUDED in export for Classify: None
INFO:tensorflow:Signatures EXCLUDED from export because they cannot be be served via TensorFlow Serving APIs:
INFO:tensorflow:'serving_default' : Regression input must be a single string Tensor; got {'precipIntensity': <tf.Tensor 'Placeholder_5:0' shape=(?,) dtype=float32>, 'temperature': <tf.Tensor 'Placeholder_4:0' shape=(?,) dtype=float32>, 'weekend': <tf.Tensor 'Placeholder_2:0' shape=(?,) dtype=string>, 'badWeather': <tf.Tensor 'Placeholder_3:0' shape=(?,) dtyp

In [18]:
from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils
from tensorflow.contrib.learn.python.learn import learn_runner

PATTERN = ""  # process only one of the shards, for testing purposes

def train_and_evaluate(output_dir):
    wide, deep = get_wide_deep()
    estimator = tf.estimator.DNNLinearCombinedRegressor(
                         model_dir=output_dir,
                         linear_feature_columns=wide,
                         dnn_feature_columns=deep,
                         dnn_hidden_units=[64, 32])#,
                         #config = run_config)
    train_spec=tf.estimator.TrainSpec(
                         input_fn=read_dataset('train', PATTERN),
                         max_steps=10000)
    exporter = tf.estimator.LatestExporter('exporter',serving_input_fn)
    eval_spec=tf.estimator.EvalSpec(
                         input_fn=read_dataset('eval', PATTERN),
                         steps=None,
                         start_delay_secs = 1, # start evaluating after N seconds
                         throttle_secs = 10,  # evaluate every N seconds
                         exporters=exporter)
    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
    
shutil.rmtree('tfl_trained_100419', ignore_errors=True) # start fresh each time
train_and_evaluate('tfl_trained_100419')

NameError: name 'deep' is not defined

In [10]:
!ls -lrt tfl_trained

ls: cannot access 'tfl_trained': No such file or directory


In [17]:
# to stop TensorBoard
from google.datalab.ml import TensorBoard
for pid in TensorBoard.list()['pid']:
    TensorBoard().stop(pid)
    print('Stopped TensorBoard with pid {}'.format(pid))

Stopped TensorBoard with pid 6085


In [13]:
from google.datalab.ml import TensorBoard
TensorBoard().start('./tfl_trained_100419')
TensorBoard().list()

Unnamed: 0,logdir,pid,port
0,./tfl_trained_100419,48821,51861
