# Estimators and multi-GPUs with tf.keras, tf.data, and eager execution

The [Estimators](https://www.tensorflow.org/programmers_guide/estimators) API is used for training models in a distributed enviornment such as on nodes with multiple GPUs or on many nodes with GPUs etc. This is particularly useful when training on huge datasets. But we will however present the API for the tiny Fashion-MNIST dataset as usual in the interest of time.

Essentially what we want to remember is that a `tf.keras.Model` can be trained with `tf.estimator` API by converting it to `tf.estimator.Estimator` object via the `tf.keras.estimator.model_to_estimator` method. Once converted we can apply the machinery of the `Estimator` to train on different hardware configurations.

In [1]:
!pip install -q -U tf-nightly-gpu
import tensorflow as tf

import numpy as np

# Enable Eager mode. Re-running this cell will fail. Restart the Runtime to re-enable Eager.
tf.enable_eager_execution()

In [2]:
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()

In [3]:
TRAINING_SIZE = len(train_images)
TEST_SIZE = len(test_images)

train_images = np.asarray(train_images, dtype=np.float32) / 255

# Convert the train images and add channels
train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1))

test_images = np.asarray(test_images, dtype=np.float32) / 255
# Convert the train images and add channels
test_images = test_images.reshape((TEST_SIZE, 28, 28, 1))

In [4]:
# How many categories we are predicting from (0-9)
LABEL_DIMENSIONS = 10

train_labels  = tf.keras.utils.to_categorical(train_labels, LABEL_DIMENSIONS)
test_labels = tf.keras.utils.to_categorical(test_labels, LABEL_DIMENSIONS)

# Cast the labels to floats, needed later
train_labels = train_labels.astype(np.float32)
test_labels = test_labels.astype(np.float32)

In [5]:
inputs = tf.keras.Input(shape=(28,28,1))  # Returns a placeholder tensor
x = tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation=tf.nn.relu)(inputs)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)
predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS, activation=tf.nn.softmax)(x)

In [6]:
model = tf.keras.Model(inputs=inputs, outputs=predictions)

In [7]:
optimizer = tf.train.RMSPropOptimizer(learning_rate=0.001)

model.compile(loss='categorical_crossentropy',
              optimizer=optimizer,
              metrics=['accuracy'])

Now to create an Estimator from the compiled Keras model we call the `model_to_estimator` method. Note that the initial model state of the keras model is preserved in the created Estimator.

So what's so good about Estimators? Well to start off with:

* you can run Estimator-based models on a local host or an a distributed multi-GPU environment without changing your model
* Estimators simplify sharing implementations between model developers
* Estimators build the graph for you


So how do we go about training our simple `tf.keras` model to use multi-GPUs? Well we can use the `tf.contrib.MirroredStrategy` paradigm which does in-graph replication with synchronous training. To use this we first create an estimator from the `tf.keras` model and give it the `MirroredStrategy` configuration via the `RunConfig`:

In [19]:
strategy = tf.contrib.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(train_distribute=strategy)

estimator = tf.keras.estimator.model_to_estimator(model, config=config)

INFO:tensorflow:Using the Keras model provided.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmplushljrq', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.contrib.distribute.python.mirrored_strategy.MirroredStrategy object at 0x7f161212e710>, '_device_fn': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f161212e780>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


To pipe data into Estimators we need to define a data importing function like shown below which returns batches of `(images, labels)` as shown below:

In [20]:
def input_fn(images, labels, repeat, batch_size):
    # Convert the inputs to a Dataset.
    dataset = tf.data.Dataset.from_tensor_slices((images, labels))

    # Shuffle, repeat, and batch the examples.
    SHUFFLE_SIZE = 10000
    dataset = dataset.shuffle(SHUFFLE_SIZE).repeat(repeat).batch(batch_size)

    # Return the dataset.
    return dataset

Now the good part! We can call the `train` function on our estimator giving it `input_fn` we defined as well as the number of steps of the optimizer we wish to run:

In [21]:
BATCH_SIZE=128
EPOCHS=5
STEPS = 1000

estimator.train(input_fn=lambda:input_fn(train_images,
                                         train_labels,
                                         repeat=EPOCHS,
                                         batch_size=BATCH_SIZE), 
                steps=STEPS)

INFO:tensorflow:Device is available but not used by distribute strategy: /device:CPU:0
INFO:tensorflow:Device is available but not used by distribute strategy: /device:GPU:0
INFO:tensorflow:Device is available but not used by distribute strategy: /device:GPU:1
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmplushljrq/keras_model.ckpt
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmplushljrq/model.ckpt.
INFO:tensorflow:loss = 2.310762, step = 0
INFO:tensorflow:global_step/sec: 153.118
INFO:tensorflow:loss = 2.023529, step = 100 (0.654 sec)
INFO:tensorflow:global_step/sec: 239.715
INFO:tensorflow:loss = 0.5813081, step = 200 (0.418 sec)
INFO:tensorflow:global_step/sec: 229.502


<tensorflow.python.estimator.estimator.Estimator at 0x7f1620247b00>

In order to check the performance of our model we can then run the `evaluate` method on our Estimator:

In [22]:
estimator.evaluate(input_fn=lambda:input_fn(test_images, test_labels, repeat=1, batch_size=BATCH_SIZE), steps=1)

INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2018-07-12-09:19:24
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmplushljrq/model.ckpt-1000
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/1]
INFO:tensorflow:Finished evaluation at 2018-07-12-09:19:24
INFO:tensorflow:Saving dict for global step 1000: accuracy = 0.84375, global_step = 1000, loss = 0.3817102
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 1000: /tmp/tmplushljrq/model.ckpt-1000


{'accuracy': 0.84375, 'loss': 0.3817102, 'global_step': 1000}