# H2O & TensorFlow Deep Learning Demo

### Introduction
In this tutorial, we'll build a simple 2-layer deep artificial neural network to classify handwritten digits [MNIST](http://yann.lecun.com/exdb/mnist/). If you are not familiar with these terms, please check out our [Deep Learning Booklet](https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/booklets/v2_2015/PDFs/online/DeepLearning_Vignette.pdf).

### Prerequisites
1. Install TensorFlow from [https://www.tensorflow.org](https://www.tensorflow.org)
2. Download Sparkling Water from [http://www.h2o.ai/download/sparkling-water/spark16](http://www.h2o.ai/download/sparkling-water/spark16)
3. Follow [instructions to setup PySparkling](http://www.h2o.ai/download/sparkling-water/spark16#pysparkling) (especially steps 1 and 2)
4. Launch a Jupyter Notebook that connects to PySparkling:
                                              
```
cd ~/spark-1.6.1-bin-hadoop2.6
export SPARK_HOME=`pwd`
export MASTER="local-cluster[3,2,1024]" 
cd ~/sparkling-water-1.6.5
IPYTHON_OPTS="notebook" bin/pysparkling                                              
```
                                                                              


## Connect to H2O and Import MNIST dataset

We connect to an H2O cluster (here: 3 nodes), and import the MNIST dataset (pre-split into 60k rows for training and 10k rows for testing). Each row has 28^2=784 grayscale pixel values from 0 to 255.

In [3]:
# Import H2O and TensorFlow
import h2o
import tensorflow as tf

In [4]:
## Read MNIST data into H2O
from pysparkling import H2OContext
h2o.__version__
hc = H2OContext(sc).start()
print(hc)
DATASET_DIR="http://s3.amazonaws.com/h2o-public-test-data/bigdata/laptop/mnist"
train_frame = h2o.import_file("{}/{}".format(DATASET_DIR, "train.csv.gz"))
test_frame = h2o.import_file("{}/{}".format(DATASET_DIR, "test.csv.gz"))

  def _ipython_display_formatter_default(self):
  def _formatters_default(self):
  def _deferred_printers_default(self):
  def _singleton_printers_default(self):
  def _type_printers_default(self):
  def _singleton_printers_default(self):
  def _type_printers_default(self):
  def _deferred_printers_default(self):


0,1
H2O cluster uptime:,7 seconds 702 milliseconds
H2O cluster version:,3.8.2.6
H2O cluster name:,sparkling-water-jchow_1479130325
H2O cluster total nodes:,3
H2O cluster total free memory:,2.88 GB
H2O cluster total cores:,24
H2O cluster allowed cores:,24
H2O cluster healthy:,True
H2O Connection ip:,172.16.2.89
H2O Connection port:,54327


H2OContext: ip=172.16.2.89, port=54327 (open UI at http://172.16.2.89:54327 )

Parse Progress: [##################################################] 100%

Parse Progress: [##################################################] 100%


In [5]:
## can simulate larger clusters here
NODES=3

In [6]:
## Initialize TensorFlow session and test it
def map_fun(i):
  import tensorflow as tf
  with tf.Graph().as_default() as g:
    hello = tf.constant('Sparkling, TensorFlow!', name="hello_constant")
    with tf.Session() as sess:
      return sess.run(hello)
sc.parallelize(range(NODES), NODES).map(map_fun).collect()

['Sparkling, TensorFlow!', 'Sparkling, TensorFlow!', 'Sparkling, TensorFlow!']

In [7]:
train_df = hc.as_spark_frame(train_frame).repartition(NODES)
test_df = hc.as_spark_frame(test_frame).repartition(NODES)
#train_df.printSchema()

## Define a TensorFlow Deep Learning model 
Now, we define a TensorFlow Deep Learning model with 2 hidden layers of 50 neurons each, and the Rectifier activation function. We use the Softmax function to turn the 10 output neuron activation values into 10 class probabilities. We initialize the weights and biases with Gaussian noise. We train the model with Gradient descent with a fixed learning rate, no momentum, and use mini-batch for faster training.

In [8]:
## Define the number of hidden neurons per layer
HN=50

# - it loads local training data into numpy array (from Spark -> Python)
# - train TF Deep Learning model with 2 hidden layer
# - output accuracy on training data
def create_nn(data_train, data_test, iterations, batch_size):
    ## input
    x = tf.placeholder(tf.float32, [None, 784])
    ## weights
    W = [tf.Variable(tf.random_normal([784,HN],stddev=0.1))
        ,tf.Variable(tf.random_normal([HN, HN],stddev=0.1))
        ,tf.Variable(tf.random_normal([HN, 10],stddev=0.1))]
    ## biases
    b = [tf.Variable(tf.random_normal([HN],    stddev=0.1))
        ,tf.Variable(tf.random_normal([HN],    stddev=0.1))
        ,tf.Variable(tf.random_normal([10],    stddev=0.1))]
    ## hidden layer activation
    h1 = tf.nn.relu(   tf.matmul(x,  W[0]) + b[0])
    h2 = tf.nn.relu(   tf.matmul(h1, W[1]) + b[1])
    ## output
    y = tf.nn.softmax( tf.matmul(h2, W[2]) + b[2])
    ## storage for actual labels
    y_ = tf.placeholder(tf.float32, [None, 10])
    ## cost function
    cross_entropy = -tf.reduce_sum(y_*tf.log(y))                    
    ## optimizer
    train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)
    
    # Train the model
    init = tf.initialize_all_variables()
    sess = tf.Session()
    sess.run(init)
    print("Training TensorFlow Deep Learning model")
    for i in range(iterations):
      #print("TensorFlow iter: ", i, " session: ", sess)
      batch_xs, batch_ys = data_train.next_batch(batch_size)
      sess.run(train_step, feed_dict={x: batch_xs, y_: batch_ys})
        
    model = [(sess.run(W[0]),sess.run(W[1]),sess.run(W[2]),sess.run(b[0]),sess.run(b[1]),sess.run(b[2]))]

    # Model evaluation
    correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
    batch_xs, batch_ys = data_test.next_batch(batch_size)
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    print("Training Accuracy:", sess.run(accuracy, feed_dict={x: batch_xs, y_: batch_ys}))
    #print(sess.run(tf.argmax(y,1), feed_dict={x: batch_xs, y_: batch_ys}))
    
    sess.close()
    return iter(model)
    
    # Export the model
    #from tensorflow_serving.session_bundle import exporter
    #export_path = "/tmp/xxx/"
    #saver = tf.train.Saver(sharded=True)
    #model_exporter = exporter.Exporter(saver)
    #signature = exporter.classification_signature(input_tensor=x, scores_tensor=y)
    #model_exporter.init(sess.graph.as_graph_def(), default_graph_signature=signature)
    #model_exporter.export(export_path, tf.constant(FLAGS.export_version), sess)
    
## Internal Helpers

# Sampling with replacement to provide a batch size
# Load everything into numpy datastructure
import numpy as np

def expand1hot(response, levels):
    nrows = response.shape[0]
    result = np.zeros((nrows, levels), dtype=np.float32)
    result[np.arange(nrows), response.astype(np.int8)] = 1.0
    return result

class RowData:
    def __init__(self, it):
        self._part_array = np.array([ [a for a in x] for x in it], dtype=np.float32)
        # Definition of input features
        self._x = range(784)
        # Index of response
        self._y = 784

    def next_batch(self, n):
        # Sample from local data without replacement
        dim = self._part_array.shape[0] # number of rows
        sample = np.random.choice(dim, n, replace=False)
        data = self._part_array[sample, :]
        # Data coming from H2O, pixel values are 0..255 -> normalize to 0..1
        # FIXME: this should be done via RDD or H2O API directly !
        train = data[:, self._x]/255
        response = expand1hot(data[:, self._y], 10)
        return (train, response)

## Run TensorFlow on each H2O/PySparkling node

We use the Spark Map/Reduce paradigm to distribute the training across multiple worker nodes, each node trains on its local data (stored in H2O, accessed by TensorFlow via JVM -> Python serialization provided by the PySpark(ling) API).

Here, we train only for a short time for demo purposes. This is certainly not the best quality model we can build.

In [9]:
# Number of batches to iterate
ITERATIONS = 100
# Batch size (per iteration)
BATCH_SIZE = 100
# Use MNIST dataset provided by TensorFlow - for debugging only
USE_TF_MNIST=False

def train_nn(iterations, batch_size, use_tf_mnist=False):
    def perPartition(it):
        if not use_tf_mnist:
            train_data = RowData(it)
            test_data = train_data
        else:
            from tensorflow.examples.tutorials.mnist import input_data
            mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
            train_data = mnist.train
            test_data = mnist.train
            
        return create_nn(train_data, test_data, iterations, batch_size)
        
    return perPartition

In [10]:
coeffs_per_node = train_df.mapPartitions(train_nn(ITERATIONS, BATCH_SIZE, USE_TF_MNIST)).collect()

In [11]:
# Now, we have the weights and biases for each node
print(len(coeffs_per_node))    ## Number of nodes
print(len(coeffs_per_node[0])) ## Number of weight and bias arrays 

3
6


## Convert the TensorFlow model into a H2O Deep Learning model

In [12]:
# Average the weights and biases across all node-local models
avg_coeffs = [c for c in coeffs_per_node[0]]
for i in range(0,len(avg_coeffs)):
    for node in range(1,NODES):
        avg_coeffs[i] = avg_coeffs[i] + coeffs_per_node[node][i]
avg_coeffs = [c/NODES for c in avg_coeffs]

num_weights=len(coeffs_per_node[0])/2

## Convert the model coefficients (weights/biases) to H2O Frames
H2O_w = [h2o.H2OFrame(np.transpose(c)) for c in avg_coeffs[0:num_weights]]
H2O_b = [h2o.H2OFrame(np.transpose(np.matrix(c))) for c in avg_coeffs[num_weights:2*num_weights]]

print [c.dim for c in H2O_w]
print [c.dim for c in H2O_b]


Parse Progress: [##################################################] 100%

Parse Progress: [##################################################] 100%

Parse Progress: [##################################################] 100%

Parse Progress: [##################################################] 100%

Parse Progress: [##################################################] 100%

Parse Progress: [##################################################] 100%
[[50, 784], [50, 50], [10, 50]]
[[50, 1], [50, 1], [10, 1]]


In [13]:
#Initialize an H2O Model with those weights/biases
from h2o.estimators.deeplearning import H2ODeepLearningEstimator

## Create an H2O Deep Learning model from the TensorFlow model
dlmodel = H2ODeepLearningEstimator(
    model_id="model_from_TF", ## we want to be able to find the model in Flow later
    hidden=[HN,HN],           ## same Network layout as TF - two hidden layers
    epochs=0,                 ## no training done in H2O - just copy over the model from TF
    ignore_const_cols=False,  ## keep all input features (unless we also drop const cols in TF)
    sparse=True,              ## faster as 0 input remains 0 -> sparse activation -> sparse updates
    variable_importances=True
    ### Initialize the H2O model with the TensorFlow model state
    ### Requires H2O 3.8.2.1 or later
    ,initial_weights=[H2O_w[0],H2O_w[1],H2O_w[2]]
    ,initial_biases =[H2O_b[0],H2O_b[1],H2O_b[2]]
)
train_frame[784] = train_frame[784].asfactor()
dlmodel.train(x=list(range(784)),y=784,training_frame=train_frame)


deeplearning Model Build Progress: [                                                  ] 00%


## Score the H2O Deep Learning model in H2O (with the TensorFlow state)

In [14]:
## We can let H2O evaluate the performance of the TensorFlow model on the test set
dlmodel.model_performance(test_frame)


ModelMetricsMultinomial: deeplearning
** Reported on test data. **

MSE: 0.81
R^2: 0.903401189728
LogLoss: 2.30258509299

Confusion Matrix: vertical: actual; across: predicted



0,1,2,3,4,5,6,7,8,9,10,11
0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,Error,Rate
96.0,111.0,92.0,98.0,95.0,77.0,97.0,117.0,89.0,108.0,0.9020408,884 / 980
117.0,113.0,123.0,121.0,109.0,104.0,113.0,117.0,118.0,100.0,0.9004405,"1,022 / 1,135"
107.0,95.0,98.0,92.0,102.0,90.0,103.0,126.0,104.0,115.0,0.9050388,"934 / 1,032"
115.0,114.0,111.0,98.0,108.0,85.0,101.0,97.0,93.0,88.0,0.9029703,"912 / 1,010"
99.0,94.0,111.0,119.0,91.0,100.0,101.0,84.0,92.0,91.0,0.9073320,891 / 982
94.0,78.0,97.0,108.0,84.0,82.0,89.0,84.0,91.0,85.0,0.9080717,810 / 892
101.0,102.0,98.0,85.0,95.0,73.0,86.0,98.0,111.0,109.0,0.9102296,872 / 958
107.0,115.0,113.0,101.0,114.0,88.0,85.0,108.0,113.0,84.0,0.8949416,"920 / 1,028"
88.0,92.0,104.0,109.0,96.0,91.0,117.0,91.0,86.0,100.0,0.9117043,888 / 974



Top-10 Hit Ratios: 


0,1
k,hit_ratio
1,0.0935
2,0.194
3,0.2907000
4,0.3891000
5,0.4841
6,0.5832000
7,0.6896001
8,0.7780001
9,0.8741001




In [15]:
## Overall classification error of the TF model (in H2O form) on the test set - not very good yet - needs more training
dlmodel.model_performance(test_frame).confusion_matrix()['Error'][-1]

0.9065

## Extract the Java scoring code (POJO) for the TensorFlow model

In [16]:
#dlmodel.download_pojo()  ## too large for Github

## Continue Training the Deep Learning model in H2O

In [17]:
## Train in H2O for 1 more epoch (one full pass over the training data)
dlmodel.epochs=1
dlmodel.train(x=list(range(784)),y=784,training_frame=train_frame)


deeplearning Model Build Progress: [##################################################] 100%


In [18]:
## Check the classification error of the H2O model after a bit of training in H2O - much better!
p=dlmodel.model_performance(test_frame)
p.confusion_matrix()['Error'][-1]

0.7801

## Inspect the model in Flow
Since the model is now in H2O, we can inspect it from [Flow](http://localhost:54321), run ```print(hc)``` to see the URL to connect to Flow.

For example, we can graphically inspect the variable importance or the confusion matrix. We can also score the model on the test set in Flow, continue training the model from this checkpoint, or inspect the Java scoring code (POJO). We highly recommend you to get familiar with Flow if you're not already.