##### Copyright 2019 The TensorFlow Authors.

In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# tf.distribute.Strategy with Training Loops

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://www.tensorflow.org/alpha/tutorials/distribute/training_loops"><img src="https://www.tensorflow.org/images/tf_logo_32px.png" />View on TensorFlow.org</a>
  </td>
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/r2/tutorials/distribute/training_loops.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/tensorflow/docs/blob/master/site/en/r2/tutorials/distribute/training_loops.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
  </td>
</table>

This tutorial demonstrates how to use [`tf.distribute.Strategy`](https://www.tensorflow.org/guide/distribute_strategy) with custom training loops. We will train a simple CNN model on the fashion MNIST dataset. The fashion MNIST dataset contains 60,000 training images of size 28 x 28 and 10,000 test images of size 28 x 28.

We use custom training loops to train our model because the loops give us greater flexibility and control training. Moreover, it is easier to debug the model and the training loop.

In [0]:
from __future__ import absolute_import, division, print_function, unicode_literals

# Import TensorFlow
!pip install tensorflow-gpu==2.0.0-alpha0
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)

## Download the fashion MNIST dataset

In [0]:
fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

## Create a strategy to distribute the variables and the graph

How does `tf.distribute.MirroredStrategy` strategy work?

*   All the variables and the model graph is replicated on the replicas.
*   Input is evenly distributed across the replicas.
*   Each replica calculates the loss and gradients for the input it received.
*   The gradients are synced across all the replicas by summing them.
*   After the sync, the same update is made to the copies of the variables on each replica.

Note: You can put all the code below inside a single scope. For illustration purposes, the code here appears as divided into several code cells.


In [0]:
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()

In [0]:
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))

## Setup input pipeline

If you train a model on multiple GPUs, increase the batch size and tune the learning rate to best use the extra computing power.

In [0]:
BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10
train_steps_per_epoch = len(train_images) // GLOBAL_BATCH_SIZE
test_steps_per_epoch = len(test_images) // GLOBAL_BATCH_SIZE

Create the iterators inside a `strategy.scope`:

In [0]:
with strategy.scope():

  train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
  train_iterator = strategy.make_dataset_iterator(train_dataset)
  
  test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 
  test_iterator = strategy.make_dataset_iterator(test_dataset)

## Model Creation

Create a model with either `tf.keras.Sequential` or the Model Subclassing API.

In [0]:
def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

  return model

In [0]:
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

## Define the loss function

Normally, on a single machine with 1 GPU/CPU, you divide loss by the number of examples in the batch of input.

*How should you calculate loss when using a `tf.distribute.Strategy`?*

* For example, let's say you have 4 GPUs and a batch size of 64. One batch of input is distributed across the replicas (4 GPUs), and each replica receives an input of size 16.

* The model on each replica does a forward pass with its respective input and calculates the loss. Now, instead of dividing the loss by the number of examples in its respective input (BATCH_SIZE_PER_REPLICA = 16), divide the loss by the GLOBAL_BATCH_SIZE (64).

*Why do you divide this way?*

* Because after you calculate the gradients on each replica, they are synced across the replicas by **summing** them.

*How do you do this in TensorFlow?*

* If you write a custom training loop, as in this tutorial, you should sum the per example losses and divide the sum by the GLOBAL_BATCH_SIZE: 
`scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)`

* Do not use `tf.reduce_mean`. Doing so divides the loss by actual per replica batch size, and that may vary step to step.

* This reduction and scaling is done automatically in keras `model.compile` and `model.fit`

* If you use `tf.keras.losses` classes (as in the example below), explicitly specify either `NONE` or `SUM` as the loss reduction. 

* `AUTO` and `SUM_OVER_BATCH_SIZE`  are disallowed when used with `tf.distribute.Strategy`. 

* `AUTO` is disallowed because the user should explicitly think about what reduction they want to make sure it is correct in the distributed case. 

* `SUM_OVER_BATCH_SIZE` is disallowed because currently it would only divide by per replica batch size. This leaves the dividing by number of replicas to the user, and it might be easy to miss. Instead, the user should do the reduction themselves.

In [0]:
with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.reduce_sum(per_example_loss) * (1. / GLOBAL_BATCH_SIZE)

## Define the metrics to track loss and accuracy

These metrics track loss and accuracy. Use `.result()` to get the accumulated statistics at any time.

In [0]:
with strategy.scope():
  train_loss = tf.keras.metrics.Mean(name='train_loss')
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

## Training loop

In [0]:
# model and optimizer must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

In [0]:
with strategy.scope():
  # Train step
  def train_step(inputs):
    images, labels = inputs

    with tf.GradientTape() as tape:
      predictions = model(images, training=True)
      loss = compute_loss(labels, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_loss(loss)
    train_accuracy(labels, predictions)

  # Test step
  def test_step(inputs):
    images, labels = inputs

    predictions = model(images, training=False)
    t_loss = loss_object(labels, predictions)

    test_loss(t_loss)
    test_accuracy(labels, predictions)

In [0]:
with strategy.scope():
  # `experimental_run` replicates the provided computation and runs it
  # with the distributed input.

  @tf.function
  def distributed_train():
    return strategy.experimental_run(train_step, train_iterator)

  @tf.function
  def distributed_test():
    return strategy.experimental_run(test_step, test_iterator)

  for epoch in range(EPOCHS):
    # Note: This code is expected to change in the near future.

    # TRAIN LOOP
    # Initialize the iterator
    train_iterator.initialize()
    for _ in range(train_steps_per_epoch):
      distributed_train()

    # TEST LOOP
    test_iterator.initialize()
    for _ in range(test_steps_per_epoch):
      distributed_test()

    if epoch % 2 == 0:
      checkpoint.save(checkpoint_prefix)

    template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
                "Test Accuracy: {}")
    print (template.format(epoch+1, train_loss.result(),
                           train_accuracy.result()*100, test_loss.result(),
                           test_accuracy.result()*100))

    train_loss.reset_states()
    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

## Restore the latest checkpoint and test

A model checkpointed with a `tf.distribute.Strategy` can be restored with or without a strategy.

In [0]:
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)

In [0]:
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)

In [0]:
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))

## Next Steps

Try out the new `tf.distribute.Strategy` API on your models.