# Distributed Machine Learning using Tensorflow + Cloud ML Engine

A barebones example showing the simplest path from raw data to distributed cloud training with the fewest lines of code

**This notebook is intended to be run on Google Cloud Datalab**: https://cloud.google.com/datalab/docs/quickstarts

Datalab will have the required libraries installed by default for this code to work. If you choose to run this code outside of Datalab you may run in to version and dependency issues which you will need to resolve.

In [4]:
import pandas as pd
import tensorflow as tf

In [5]:
print(tf.__version__)

1.0.0


## Tensorflow APIs
<img src="assets/TFHierarchy.png">

Tensorflow is a heirarchical framework. The further down the heirarchy you go, the more flexibility you have, but that more code you have to write. A best practice is start at the highest level of abstraction. Then if you need additional flexibility for some reason drop down one layer. 

For this tutorial we will be operating at the highest levels of Tensorflow abstraction, using Estimator and Experiments APIs.

## Steps

1. Load raw data

2. Write Tensorflow Code

 1. Define Feature Columns
 
 2. Define Estimator

 3. Define Input Function
 
 4. Define Serving Function

 5. Define Experiment

3. Package Code

4. Train

5. Inspect Results

6. Deploy Model

7. Get Predictions

### 1) Load Raw Data

This is a publically available dataset on boston area housing prices circa 1978. It is hosted in a public GCS bucket.

In [3]:
#downlad data from GCS and store as pandas dataframe 
df = pd.read_csv(
  filepath_or_buffer='https://storage.googleapis.com/vijay-public/boston_housing/housing.data.txt',
  delim_whitespace=True,
  names=["CRIM","ZN","INDUS","CHAS","NOX","RM","AGE","DIS","RAD","TAX","PTRATIO","B","LSTAT","MEDV"])

In [4]:
df.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,MEDV
0,0.00632,18.0,2.31,0,0.538,6.575,65.2,4.09,1,296.0,15.3,396.9,4.98,24.0
1,0.02731,0.0,7.07,0,0.469,6.421,78.9,4.9671,2,242.0,17.8,396.9,9.14,21.6
2,0.02729,0.0,7.07,0,0.469,7.185,61.1,4.9671,2,242.0,17.8,392.83,4.03,34.7
3,0.03237,0.0,2.18,0,0.458,6.998,45.8,6.0622,3,222.0,18.7,394.63,2.94,33.4
4,0.06905,0.0,2.18,0,0.458,7.147,54.2,6.0622,3,222.0,18.7,396.9,5.33,36.2


### 2) Write Tensorflow Code

#### 2.A Define Feature Columns

Feature columns are your Estimator's data "interface." They tell the estimator in what format they should expect data and how to interpret it (is it one-hot? sparse? dense? categorical? conteninous?).  https://www.tensorflow.org/api_docs/python/tf/contrib/layers/feature_column




In [6]:
FEATURES = ["CRIM", "ZN", "INDUS", "NOX", "RM",
            "AGE", "DIS", "TAX", "PTRATIO"]
LABEL = "MEDV"

feature_cols = [tf.contrib.layers.real_valued_column(k)
                  for k in FEATURES] #list of Feature Columns

#### 2.B Define Estimator

An Estimator is what actually implements your training, eval and prediction loops. Every estimator has the following methods:

- fit() for training
- eval() for evaluation
- predict() for prediction
- export_savedmodel() for writing model state to disk

Tensorflow has several canned estimator that already implement these methods (DNNClassifier, LogisticClassifier etc..) or you can implement a custom estimator. For an example of implementing a custom estimator see [here](https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/blogs/timeseries/rnn_cloudmle.ipynb).

For simplicity we will use a canned estimator. To instantiate an estimator simply pass it what Feature Columns to expect.

In [6]:
def generate_estimator(output_dir):
  return tf.contrib.learn.DNNRegressor(feature_columns=feature_cols,
                                            hidden_units=[10, 10],
                                            model_dir=output_dir)

#### 2.C Define Input Function

Now that you have an estimator and it knows what type of data to expect and how to intepret, you need to actually pass the data to it! This is the job of the input function. 

The input function returns a (features, label) tuple
- features: A dictionary. Each key is a feature column name and its value is the Tensor containing the data for that Feature
- label: A Tensor containing the label column

In [7]:
def generate_input_fn(data_set):
    def input_fn():
      features = {k: tf.constant(data_set[k].values) for k in FEATURES}
      labels = tf.constant(data_set[LABEL].values)
      return features, labels
    return input_fn

#### 2.D Define Serving Input Function

To predict with the model, we need to define a serving input function which will be used to read inputs from a user at prediction time. If necessary transforms it also preforms transormations neccessary to get the user data into the format the estimator expects.

The serving input function resturs a (features, labels, inputs) tuple
- features: A dict of features to be passed to the Estimator
- label: Always 'None' for prediction
- inputs: A dictionary of inputs the predictions server should expect from the user

In [26]:
from tensorflow.contrib.learn.python.learn.utils import input_fn_utils

def serving_input_fn():
  feature_placeholders = {
      column.name: tf.placeholder(column.dtype, [None])
      for column in feature_cols
  }
  # DNNCombinedLinearClassifier expects rank 2 Tensors, but inputs should be
  # rank 1, so that we can provide scalars to the server
  features = {
    key: tf.expand_dims(tensor, -1)
    for key, tensor in feature_placeholders.items()
  }
  return input_fn_utils.InputFnOps(
    features,
    None,
    feature_placeholders
  )

#### 2.E Define Experiment

An experiment is ultimatley what you pass to learn_runner.run() for Cloud ML training. It encapsulated the Estimator (which in turn encapsulates the feature column definitions) and the Input Function.

Notice that to instantiate an Experiment you don't pass the Estimator or Input Functions directly. Instead you pass a function that *returns* an Estimator or Input Function. This is why we wrapped the Estimator and Input Function instantiations in generator functions during steps 3 and 4 respectively. It is also why we wrap the Experiment itself in a generator function, because a downstream function will expect it in this format.

In [25]:
from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils

def generate_experiment_fn(output_dir):
  return tf.contrib.learn.Experiment(
    generate_estimator(output_dir),
    train_input_fn=generate_input_fn(df), 
    eval_input_fn=generate_input_fn(df), #Normally you would pass a different eval set
    export_strategies=[saved_model_export_utils.make_export_strategy(
            serving_input_fn,
            default_output_alternative_key=None,
        )],
    train_steps=1000,
    eval_steps=1000
  )
  

### 3) Package Code

You've now written all the tensoflow code you need!

To make it compatible with Cloud ML Engine we'll combine the above tensorflow code into a single python file with two simple changes

1. Add some boilerplate code to parse the command line arguments required for gcloud.
2. Use the learn_runner.run() function to run the experiment

We also add an \__init__\.py file to the foler. This is just python convention for identifying modules

In [3]:
%%bash
mkdir trainer
touch trainer/__init__.py

In [27]:
%%writefile trainer/task.py

import argparse
import pandas as pd
import tensorflow as tf
from tensorflow.contrib.learn.python.learn import learn_runner
from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils
from tensorflow.contrib.learn.python.learn.utils import input_fn_utils

tf.logging.set_verbosity(tf.logging.ERROR)

df = pd.read_csv(
  filepath_or_buffer='https://storage.googleapis.com/vijay-public/boston_housing/housing.data.txt',
  delim_whitespace=True,
  names=["CRIM","ZN","INDUS","CHAS","NOX","RM","AGE","DIS","RAD","TAX","PTRATIO","B","LSTAT","MEDV"])

FEATURES = ["CRIM", "ZN", "INDUS", "NOX", "RM",
            "AGE", "DIS", "TAX", "PTRATIO"]
LABEL = "MEDV"

feature_cols = [tf.contrib.layers.real_valued_column(k)
                  for k in FEATURES] #list of Feature Columns

def generate_estimator(output_dir):
  return tf.contrib.learn.DNNRegressor(feature_columns=feature_cols,
                                            hidden_units=[10, 10],
                                            model_dir=output_dir)

def generate_input_fn(data_set):
    def input_fn():
      features = {k: tf.constant(data_set[k].values) for k in FEATURES}
      labels = tf.constant(data_set[LABEL].values)
      return features, labels
    return input_fn

def serving_input_fn():
  feature_placeholders = {
      column.name: tf.placeholder(column.dtype, [None])
      for column in feature_cols
  }
  # DNNCombinedLinearClassifier expects rank 2 Tensors, but inputs should be
  # rank 1, so that we can provide scalars to the server
  features = {
    key: tf.expand_dims(tensor, -1)
    for key, tensor in feature_placeholders.items()
  }
  return input_fn_utils.InputFnOps(
    features,
    None,
    feature_placeholders
  )

def generate_experiment_fn(output_dir):
  return tf.contrib.learn.Experiment(
    generate_estimator(output_dir),
    train_input_fn=generate_input_fn(df), 
    eval_input_fn=generate_input_fn(df), #Normally you would pass a different eval set
    export_strategies=[saved_model_export_utils.make_export_strategy(
            serving_input_fn,
            default_output_alternative_key=None,
        )],
    train_steps=3000,
    eval_steps=1000
  )

######CLOUD ML ENGINE BOILERPLATE CODE BELOW######
if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  # Input Arguments
  parser.add_argument(
      '--output_dir',
      help='GCS location to write checkpoints and export models',
      required=True
  )
  parser.add_argument(
        '--job-dir',
        help='this model ignores this field, but it is required by gcloud',
        default='junk'
    )
  args = parser.parse_args()
  arguments = args.__dict__
  output_dir = arguments.pop('output_dir')

  learn_runner.run(generate_experiment_fn, output_dir)

Overwriting trainer/task.py


### 4) Train
Now that our code is packaged we can invoke it using the gcloud command line tool to run the training. 

Note: Since our dataset is so small and our model is simple the overhead of provisioning the cluster is longer than the actual training time. Accordingly you'll notice the single VM cloud training takes longer than the local training, and the distributed cloud training takes longer than single VM cloud. For larger datasets and more complex models this will reverse

#### Set Environment Vars
We'll create environment variables for our project name GCS Bucket and reference this in future commands.

In [19]:
GCS_BUCKET = 'gs://vijays-sandbox-ml/housing' #CHANGE THIS TO YOUR BUCKET
PROJECT = 'vijays-sandbox' #CHANGE THIS TO YOUR PROJECT ID
REGION = 'us-central1' #OPTIONALLY CHANGE THIS

In [18]:
import os
os.environ['GCS_BUCKET'] = GCS_BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

#### Run local
It's a best practice to first run locally on a small dataset to check for errors. Note you can ignore the warnings in this case, as long as there are no errors.

In [None]:
%%bash
gcloud ml-engine local train \
   --module-name=trainer.task \
   --package-path=trainer \
   -- \
   --output_dir='./output'

#### Run on cloud (1 cloud ML unit)

First we specify which GCP project to use.

In [9]:
%%bash
gcloud config set project $PROJECT

Updated property [core/project].


Then we specify which GCS bucket to write to and a job name.
Job names submitted to the ml engine must be project unique, so we append the system date/time. Update the cell below to point to a GCS bucket you own.

In [14]:
%%bash
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME/ \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output


jobId: housing_170530_210025
state: QUEUED


Job [housing_170530_210025] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe housing_170530_210025

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs housing_170530_210025


#### Run on cloud (10 cloud ML units)
Because we are using the TF Experiments interface, distributed computing just works! The only change we need to make to run in a distributed fashion is to add the [--scale-tier](https://cloud.google.com/ml/pricing#ml_training_units_by_scale_tier) argument. Cloud ML Engine then takes care of distributing the training across devices for you!


In [15]:
%%bash
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME \
   --scale-tier=STANDARD_1 \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_170530_210046
state: QUEUED


Job [housing_170530_210046] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe housing_170530_210046

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs housing_170530_210046


#### Run on cloud GPU (3 cloud ML units)

Also works with GPUs!

"BASIC_GPU" corresponds to one Tesla K80 at the time of this writing, hardware subject to change. 1 GPU is charged as 3 cloud ML units.

In [20]:
%%bash
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME \
   --scale-tier=BASIC_GPU \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_170530_213407
state: QUEUED


Job [housing_170530_213407] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe housing_170530_213407

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs housing_170530_213407


#### Run on 8 cloud GPUs (24 cloud ML units)
To train across multiple GPUs you use a [custom scale tier](https://cloud.google.com/ml/docs/concepts/training-overview#job_configuration_parameters).

You specify the number and types of machines you want to run on in a config.yaml, then reference that config.yaml via the --config config.yaml command line argument.

Here I am specifying a master node with machine type complex_model_m_gpu and one worker node of the same type. Each complex_model_m_gpu has 4 GPUs so this job will run on 2x4=8 GPUs total. 

Note your GCP project needs to have approval to run on that many GPUs or else you will get a quota exceeded error.

In [22]:
%%writefile config.yaml
trainingInput:
  scaleTier: CUSTOM
  masterType: complex_model_m_gpu
  workerType: complex_model_m_gpu
  workerCount: 1

Writing config.yaml


In [23]:
%%bash
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME \
   --config config.yaml \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_170530_213425
state: QUEUED


Job [housing_170530_213425] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe housing_170530_213425

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs housing_170530_213425


### 5) Inspect Results Using Tensorboard

Tensorboard is a utility that allows you to visualize your model

In [6]:
from google.datalab.ml import TensorBoard
TensorBoard().start('output')

1808

Works with GCS URLs too

In [9]:
!gsutil list $GCS_BUCKET | tail -1

gs://vijays-sandbox-ml/housing/housing_170530_210133/


In [15]:
TensorBoard().start('gs://vijays-sandbox-ml/housing/housing_170518_211430/output') #replace with output from previous cell

3007

Cleanup Tensorboard processes

In [16]:
for pid in TensorBoard.list()['pid']:
  TensorBoard().stop(pid)
  print 'Stopped TensorBoard with pid {}'.format(pid)

Stopped TensorBoard with pid 1808
Stopped TensorBoard with pid 1830
Stopped TensorBoard with pid 2530
Stopped TensorBoard with pid 2687
Stopped TensorBoard with pid 2846
Stopped TensorBoard with pid 3007


### 6) Deploy Model For Predictions

Cloud ML Engine has a prediction service that will wrap our tensorflow model with a REST API and allow remote clients to get predictions.

You can deploy the model from the Google Cloud Console GUI, or you can use the gcloud command line tool. We will use the latter method.

In [None]:
%%bash
MODEL_NAME="housing_prices"
MODEL_VERSION="v1"
MODEL_LOCATION="output/export/Servo/1496167084020" #REPLACE this with the correct timestamp
echo "Deleting and deploying $MODEL_NAME $MODEL_VERSION from $MODEL_LOCATION ... this will take a few minutes"
gcloud ml-engine versions delete ${MODEL_VERSION} --model ${MODEL_NAME}
gcloud ml-engine models delete ${MODEL_NAME}
gcloud ml-engine models create ${MODEL_NAME} --regions us-central1
gcloud ml-engine versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --staging-bucket='gs://vijays-sandbox-ml'

### 7) Get Predictions

There are two flavors of the ML Engine Prediction Service: Batch and online.

Online prediction is more appropriate for latency sensitive requests as results are returned quickly and synchronously. 

Batch prediction is more appropriate for large prediction requests that you only need to run a few times a day.

The prediction services expects prediction requests in standard JSON format so first we will create a JSON file with a couple of housing records.


In [27]:
%%writefile records.json
{"CRIM": 0.00632,"ZN": 18.0,"INDUS": 2.31,"NOX": 0.538, "RM": 6.575, "AGE": 65.2, "DIS": 4.0900, "TAX": 296.0, "PTRATIO": 15.3}
{"CRIM": 0.00332,"ZN": 0.0,"INDUS": 2.31,"NOX": 0.437, "RM": 7.7, "AGE": 40.0, "DIS": 5.0900, "TAX": 250.0, "PTRATIO": 17.3}

Writing records.json


Now we will pass this file to the prediction service using the gcloud command line tool. Results are returned immediatley!

In [28]:
!gcloud ml-engine predict --model housing_prices --version v1 --json-instances records.json

OUTPUTS
27.9056
34.0557
