# Why Distributed Training?
The process of deep learning is very time consuming as there is a need to process huge amounts of data. One has to speed up the training process in order to obtain faster results. Distributed training is one such technology which helps reduce training time. It is based on the idea of training the model using different data in parallel on different machines instead of doing it serially on one machine.

# Distributed training with TensorFlow
`tf.distribute.Strategy` is a TensorFlow API used to distribute training across multiple GPUs, multiple machines or TPUs.\
`tf.distribute.Strategy` can be used with high level API like Keras, and can also be used to define custom training loops (in general, any computation using TensorFlow).





## Types of strategies
`tf.distribute.Strategy` covers a number of use cases along different axes. Some of these are:


*   *Synchronous vs asynchronous training:* In sync training, all workers train over different slices of input data in sync, and aggregating gradients at each step. In async training, all workers are independently training over the input data and updating variables asynchronously. Sync training is supported via all-reduce and async through parameter server architecture.

*   *Hardware platform:* Scaling the training onto multiple GPUs on one machine, or multiple machines in a network (with none or some GPUs) or on Cloud TPUs.



### Parameter Server
Their role is to store the parameters of a machine learning model like weights and serve them to clients which process the data and update these parameters.\
Many machine learning problems rely on large number of data for training and inference. In such big models, the learning and inference in a single machine is not possible. It would be helpful to have a framework that can be used for distributed learning as well as inference. 

### AllReduce
An operation that reduces the target arrays in all processes to a single array and returns the resultant array to all the processes.

## Scenarios





### Mirrored Strategy
`tf.distribute.MirroredStrategy` supports synchronous distributed training on multiple GPUs on one machine. It creates one replica per GPU device. Each variable in the model is mirrored across all the replicas. Together they form a single `MirroredVariable`. These variables are kept in sync with each other by applying identical updates.\
Efficient all-reduce algorithms are used to communicate the variable updates across the devices. All-reduce aggregates tensors across all the devices by adding them up, and makes them available on each device.

### Central Strategy
`tf.distribute.experimental.CentralStorageStrategy` does synchronous training as well. Variables are not mirrored, instead placed on the CPU and operations are replicated across all local GPUs. Update to variables on replicas will be aggregated before being applied to variables.

### MultiWorkerMirroredStrategy
`tf.distribute.experimental.MultiWorkerMirroredStrategy` is very similar to `MirroredStrategy`. It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs. Similar to `MirroredStrategy`, it creates copies of all variables in the model on each device across all workers. 

### TPUStrategy
`tf.distribute.experimental.TPUStrategy` lets one to run TensorFlow training on Tensor Processing Units (TPUs). TPUs are Google's specialized application specific IC designed to accelerate machine learning workloads.\
In terms of distributed training architecture, `TPUStrategy` is the same as `MirrorStrategy` - it implements synchronous distributed training. TPUs provide their own implementation of efficient all-reduce and other collective operations across multiple TPU cores, which are used in `TPUStrategy`. 

### ParameterServerStrategy
`tf.distribute.experimental.ParameterServerStrategy` supports parameter training on multiple machines. In this setup, some machines are designated as workers and some as parameter servers. Each variable of the model is placed on one paramter server. Computation is replicated across all GPUs of workers.


### OneDeviceStrategy
`tf.distribute.OneDeviceStrategy` runs on a single device. This will place all the variables created in its scope on the specified device. Input distributed through this strategy will be prefetched to the specific device.\
One could use this strategy to test the code before switching to other strategies which distributes to multiple machines/devices. 

# Defining Loss
On a single machine, with 1 GPU/CPU, loss is divided by the number of examples in the input batch. But with distributed training, there are many such batches running parallelly.\
Calculating loss using `tf.distribute.Strategy`:


*   Say there are 4 GPUs and batch size of 64. One batch is distributed across the replicas (4 GPUs), with each replica getting an input of size 16.
*   The model on each replica does a forward pass with the respective input and calculates loss. Instead of dividing the loss by number of examples in respective input (BATCH_SIZE_PER_REPLICA=16), it should be divided by the total number of examples in the batch (GLOBAL_BATCH_SIZE=64).\
This is done because after each replica, the gradients are synced across the replicas by summing them.






## Procedure in TensorFlow


*   For a custom training loop, the loss per example must be summed and then divided by the GLOABL_BATCH_SIZE: `scale_loss = tf.reduce_sum(loss)*(1./GLOBAL_BATCH_SIZE)` or `tf.nn.compute_average_loss` with the per example loss, optional sample weights and GLOBAL_BATCH_SIZE  as arguements and returns the scaled loss.
*   If using regularization losses in the model then the loss value has to be scaled by the number of replicas. This could be done by using the `tf.nn.scale_regularization_loss` function.
* Using `tf.mean` is not recommended as this divides the loss by BATCH_SIZE_PER_REPLICA which may vary for each step.
* In keras `model.compile` and `model.fit`, the reduction and scaling is automatically done.
* If using `tf.keras.losses` classes, the loss reduction needs to be explicitly specified to `NONE` or `SUM`.


# References

1. https://www.tensorflow.org/guide/distributed_training
2.   https://lambdalabs.com/blog/introduction-multi-gpu-multi-node-distributed-training-nccl-2-0/
3.  https://missinglink.ai/guides/tensorflow/tensorflow-distributed-training-introduction-tutorials/



# Examples

## Custom training with tf.distribute.Strategy
The tutorial demonstrates using of `tf.distribute.Strategy` with custom training loops. A simple CNN model on fashion MNIST dataset is trained.  The fashion MNIST dataset contains 60000 train images of size 28 x 28 and 10000 test images of size 28 x 28.\

Custom training loops are used to train the model because they give flexibility and a greater control on training. Moreover, it is easier to debug the model and the training loop.

In [0]:
from __future__ import absolute_import, division, print_function, unicode_literals

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)


### Download the fashion MNIST dataset

In [0]:
fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)


### Create a strategy to distribute the variables and the graph

Working of `tf.distribute.MirroredStrategy`:

*   All variables and model graph is replicated on the replicas
*   Input is evenly distributed on the replicas 
* Each replica calculates the loss and gradients for the input it received
* The gradients are synced across all replicas by summing them
* After sync, the same update is made to the copies of the variables on each replica.



In [0]:
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()


In [0]:
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))


### Setting up input pipeline

In [0]:
BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10


Create the datasets and distribute them:

In [0]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)


### Model creation

In [0]:
def create_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

  return model


In [0]:
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")


### Loss function

In [0]:
with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      reduction=tf.keras.losses.Reduction.NONE)
  # or loss_fn = tf.keras.losses.sparse_categorical_crossentropy
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)


### Metrics to track loss and accuracy

These metrics track the test loss and training and test accuracy. The `.result()` can be used to get the accumulated statistics anytime.

In [0]:
with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')


### Training loop

In [0]:
# model and optimizer must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)


In [0]:
with strategy.scope():
  def train_step(inputs):
    images, labels = inputs

    with tf.GradientTape() as tape:
      predictions = model(images, training=True)
      loss = compute_loss(labels, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_accuracy.update_state(labels, predictions)
    return loss 

  def test_step(inputs):
    images, labels = inputs

    predictions = model(images, training=False)
    t_loss = loss_object(labels, predictions)

    test_loss.update_state(t_loss)
    test_accuracy.update_state(labels, predictions)


In [0]:
with strategy.scope():
  # `experimental_run_v2` replicates the provided computation and runs it
  # with the distributed input.
  @tf.function
  def distributed_train_step(dataset_inputs):
    per_replica_losses = strategy.experimental_run_v2(train_step,
                                                      args=(dataset_inputs,))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                           axis=None)
 
  @tf.function
  def distributed_test_step(dataset_inputs):
    return strategy.experimental_run_v2(test_step, args=(dataset_inputs,))

  for epoch in range(EPOCHS):
    # TRAIN LOOP
    total_loss = 0.0
    num_batches = 0
    for x in train_dist_dataset:
      total_loss += distributed_train_step(x)
      num_batches += 1
    train_loss = total_loss / num_batches

    # TEST LOOP
    for x in test_dist_dataset:
      distributed_test_step(x)

    if epoch % 2 == 0:
      checkpoint.save(checkpoint_prefix)

    template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
                "Test Accuracy: {}")
    print (template.format(epoch+1, train_loss,
                           train_accuracy.result()*100, test_loss.result(),
                           test_accuracy.result()*100))

    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()


### Restoring the latest checkpoint and test
A model checkpointed with `tf.distribute.Strategy` can be restored with or without strategy.

In [0]:
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)


In [0]:
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)


In [0]:
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))


## Distributed training with Keras
This example uses `tf.keras` API to build the model and training loop. 

### Import

In [0]:
from __future__ import absolute_import, division, print_function, unicode_literals

# Import TensorFlow and TensorFlow Datasets
try:
  !pip install -q tf-nightly
except Exception:
  pass

import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os


### Download dataset

Setting `with_info` to `True` includes the metadata for the entire dataset, which is being saved here in `info`. The metadata object includes the number of train and test examples and other things.

In [0]:
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)

mnist_train, mnist_test = datasets['train'], datasets['test']


### Define distribution strategy
Create a `MirroredStrategy` object. This will handle distribution, and provides a context manager (`tf.distribute.MirroredStrategy.scope`) to build the model inside.

In [0]:
strategy = tf.distribute.MirroredStrategy()


### Setup the input pipeline
When training the model with multiple GPUs, the extra computing power can be used by effectively increasing the batch size. The largest batch size that could fit in the GPU memory could be used and the learning rate be tuned accordingly.

In [0]:
# You can also do info.splits.total_num_examples to get the total
# number of examples in the dataset.

num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync


Pixel values, which are 0-255, have to be normalized to the 0-1 range. Define this scale in a function.

In [0]:
def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255

  return image, label


Apply this function to the training and test data, shuffle the training data, and batch it for training. Notice we are also keeping an in-memory cache of the training data to improve performance.

In [0]:
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)


### Model creation
Create and compile Keras model in `strategy.scope`.

In [0]:
with strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])

  model.compile(loss='sparse_categorical_crossentropy',
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])


### Defining callbacks
The callbacks used here are:
* *TensorBoard*: This callback writes a log for TensorBoard which allows the visualization of the graphs.
* *Model Checkpoint*: This callback saves the model after every epoch.
* *Learning Rate Scheduler*: Using this callback, the learning rate can be scheduled to change after every epoch/batch.


In [0]:
# Define the checkpoint directory to store the checkpoints

checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")


In [0]:
# Function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
  if epoch < 3:
    return 1e-3
  elif epoch >= 3 and epoch < 7:
    return 1e-4
  else:
    return 1e-5


In [0]:
# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1,
                                                      model.optimizer.lr.numpy()))


In [0]:
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]


### Train and evaluate
The model is trained using `model.fit` the usual way by passing in the dataset created at the beginning. This step is the same whether the training is distributed or not.

In [0]:
model.fit(train_dataset, epochs=12, callbacks=callbacks)


To see how the model perform, load the latest checkpoint and call `evaluate` on the test data.

Call `evaluate` as before using appropriate datasets.

In [0]:
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

eval_loss, eval_acc = model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))


To see the output, the TensorBoard logs can be downloaded and viewed at the terminal.

In [0]:
$ tensorboard --logdir=path/to/log-directory


In [0]:
!ls -sh ./logs


### Export to Saved Model
Export the graph and the variables to the platform-agnostic SavedModel format. After the model is saved, it can be loaded with or without the scope.

In [0]:
path = 'saved_model/'

In [0]:
model.save(path, save_format='tf')

Load the model without `strategy.scope`.

In [0]:
unreplicated_model = tf.keras.models.load_model(path)

unreplicated_model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=tf.keras.optimizers.Adam(),
    metrics=['accuracy'])

eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))


Load the model with `strategy.scope`.

In [0]:
with strategy.scope():
  replicated_model = tf.keras.models.load_model(path)
  replicated_model.compile(loss='sparse_categorical_crossentropy',
                           optimizer=tf.keras.optimizers.Adam(),
                           metrics=['accuracy'])

  eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
  print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
