<a href="https://colab.research.google.com/github/nigoda/machine_learning/blob/main/33_TensorFlow_Distributed_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Type of strategies**

https://www.tensorflow.org/guide/distributed_training

https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/tutorials/distribute/keras.ipynb#scrollTo=Z2XI9Zp9ohCJ

`tf.distribute.Strategy` intends to cover a number of use cases along different axes. Some of these combinations are currently supported and others will be added in the future. Some of these are:

*  Synchronous vs asynchronous training: These are two common ways of distributing training with data parallelism. In sync training, all workers train over different silces of input data in sync, and aggregating gradients at each step. In async training, all workers are independently training over the input data and updating variables asynchronously. Typically sync training is supported via all-reduce and async through parameter server architecture.

*  Hardware platform: Users may want to scale their training onto multiple GPUs on one machine, or multiple machines in a network (with 0 or more GPUs each), or on Cloud TPUs.

In order to support these use cases, there are 6 strategies available.

| Training API          	| MirroredStrategy  	| TPUStrategy         	| MultiWorkerMirroredStrategy     	| CentralStorageStrategy          	| ParameterServerStrategy  	|
|:-----------------------	|:-------------------	|:---------------------	|:---------------------------------	|:---------------------------------	|:--------------------------	|
| **Keras API**             	| Supported         	|  Supported 	| Supported 	| Experimental support 	| Supported planned post 2.4 	|
| **Custom training loop**  	| Supported 	| Supported   	| Supported            	| Experimental support           	| Experimental support         	|
| **Estimator API**         	| Limited Support         	| Not supported           	| Limited Support                       	| Limited Support                       	| Limited Support                	|

Note: [Experimental support](https://www.tensorflow.org/guide/versions#what_is_not_covered) means the APIs are not covered by any compatibilities guarantees.

Note: Estimator support is limited. Basic training and evaluation are experimental, and advanced features—such as scaffold—are not implemented. You should be using Keras or custom training loops if a use case is not covered.

## **Overview**
The `tf.distribute.Strategy` API provides an abstraction for distributing your training across multiple processing units. The goal is to allow user to enable distributed training using existing models and training code, with minimal changes.

This uses the `tf.distribute.MirroredStrategy`, which does in-graph replication with synchronous training on many GPUs on one machine. Essentially, it copies all of the model's variables to each processor. Then, it uses all-reduce to combine the gradients from all processor and applies the combined values to all copies of the model.

`MirroredStategy` is one of several distribution strategy available in TensorFow core. You can read about more strategies at distribution strategy guide.

### **Keras API**
This example uses the tf.keras API to build the model and training loops, see the tf.distribute Strategy with training loops 

## **Import dependencies**


In [2]:
# Import TensorFlow and TensorFlow Datasets

import tensorflow_datasets as tfds
import tensorflow as tf

import os

In [3]:
print(tf.__version__)

2.5.0


## **Download the dataset**
Download the MNIST dataset and load it from TensorFlow Datasets. This return a dataset in `tf.data` format.

Setting `with_info` to `True` includes the metadata for the entire dataset, which is being saved here to info. Among other things, this metadata object includes the number of train ans test examples.

In [4]:
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)

mnist_train, mnist_test = datasets['train'], datasets['test']

[1mDownloading and preparing dataset mnist/3.0.1 (download: 11.06 MiB, generated: 21.00 MiB, total: 32.06 MiB) to /root/tensorflow_datasets/mnist/3.0.1...[0m


local data directory. If you'd instead prefer to read directly from our public
GCS bucket (recommended if you're running on GCP), you can instead pass
`try_gcs=True` to `tfds.load` or set `data_dir=gs://tfds-data/datasets`.



HBox(children=(FloatProgress(value=0.0, description='Dl Completed...', max=4.0, style=ProgressStyle(descriptio…



[1mDataset mnist downloaded and prepared to /root/tensorflow_datasets/mnist/3.0.1. Subsequent calls will reuse this data.[0m


## **Define distibution strategy**
Create a `MirroredStrategy` object. This will handle distribution, and provides a context manager (`tf.distribute.MirroredStrategy.scope`) to build the model inside.


In [5]:
strategy = tf.distribute.MirroredStrategy()





INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


In [6]:
print('Number of device: {}'.format(strategy.num_replicas_in_sync))

Number of device: 1


## **Setup input pipeline**
When training a model with multiple GPUs, you can use the extra computing power effectively by increasing the batch size. In general, use the largest batch size that fits the GPU memory, and tune the learning rate accordingly.

In [7]:
# You can also do info.splits.total_num_examples to get the total
# number of examples in the dataset.

num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

Pixel values, which are 0-255 have to be normalized to the 0-1 range. Define this scale in a function.

In [8]:
def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255

  return image, label

Apply this function to the training and test data, shuffle the training data, and batch it for training.

In [9]:
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

## **Create the model**
Create and compile the Keras model in the context of strategy.scope.

In [10]:
with strategy.scope():
  model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')
  ])

  model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])

INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


## **Define the callbacks**
The callbacks used here are:
*  *TensorBoard*: This callback writes a log for TensorBoard which allows you to visualize the graphs.
*  *Model Checckpoint*: This callback saves the model after every epoch.
* *Learning Rate Scheduler:* Using this callback, you can schedule the learning rate to change after every epoch/batch.

For illustrative purposes, add a print callback to display the *learning rate* in the notebook.

In [11]:
# Define the checkpoint directory to store the checkpoints

checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

In [12]:
# Function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
  if epoch < 3:
    return 1e-3
  elif epoch >=3 and epoch < 7:
    return 1e-4
  else:
    return 1e-5


In [13]:
# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1,
                                                      model.optimizer.lr.numpy()))

In [14]:
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

## **Train and evaluate**
Now, train the model in the usual way calling fit on the model and passing in the dataset created at the begining of the tutorial. This step is the same whether you are distributing the training or not.

In [15]:
model.fit(train_dataset, epochs=12, callbacks=callbacks)

Epoch 1/12
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


  '"`sparse_categorical_crossentropy` received `from_logits=True`, but '
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).











Learning rate for epoch 1 is 0.0010000000474974513
Epoch 2/12

Learning rate for epoch 2 is 0.0010000000474974513
Epoch 3/12

Learning rate for epoch 3 is 0.0010000000474974513
Epoch 4/12

Learning rate for epoch 4 is 9.999999747378752e-05
Epoch 5/12

Learning rate for epoch 5 is 9.999999747378752e-05
Epoch 6/12

Learning rate for epoch 6 is 9.999999747378752e-05
Epoch 7/12

Learning rate for epoch 7 is 9.999999747378752e-05
Epoch 8/12

Learning rate for epoch 8 is 9.999999747378752e-06
Epoch 9/12

Learning rate for epoch 9 is 9.999999747378752e-06
Epoch 10/12

Learning rate for epoch 10 is 9.999999747378752e-06
Epoch 11/12

Learning rate for epoch 11 is 9.999999747378752e-06
Epoch 12/12

Learning rate for epoch 12 is 9.999999747378752e-06


<tensorflow.python.keras.callbacks.History at 0x7f49204577d0>

As you can see below, the checkpoint are getting saved.

In [16]:
# check the checkpoint directory
!ls {checkpoint_dir}

checkpoint		     ckpt_4.data-00000-of-00001
ckpt_10.data-00000-of-00001  ckpt_4.index
ckpt_10.index		     ckpt_5.data-00000-of-00001
ckpt_11.data-00000-of-00001  ckpt_5.index
ckpt_11.index		     ckpt_6.data-00000-of-00001
ckpt_12.data-00000-of-00001  ckpt_6.index
ckpt_12.index		     ckpt_7.data-00000-of-00001
ckpt_1.data-00000-of-00001   ckpt_7.index
ckpt_1.index		     ckpt_8.data-00000-of-00001
ckpt_2.data-00000-of-00001   ckpt_8.index
ckpt_2.index		     ckpt_9.data-00000-of-00001
ckpt_3.data-00000-of-00001   ckpt_9.index
ckpt_3.index


To see how the model perform, load the latest checkpoint and call *evaluate* on the test data.

Call *evaluate* as before using appropriate datasets.

In [17]:
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

eval_loss, eval_acc = model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

  '"`sparse_categorical_crossentropy` received `from_logits=True`, but '


Eval loss: 0.03980964422225952, Eval Accuracy: 0.9861999750137329


To see the output, you can download and view the TensorBoard logs at the terminal.

__$ tensorboard --logdir=path/to/log-directory__

In [18]:
!ls -sh ./logs

total 4.0K
4.0K train


## **Export to SavedModel**
Export the graph and the variables to the platform-agnostic SaveModel format. After your model is saved, you can load it with or without the scope.

In [19]:
path = 'saved_model/'

In [20]:
model.save(path, save_format='tf')

INFO:tensorflow:Assets written to: saved_model/assets


INFO:tensorflow:Assets written to: saved_model/assets


load the model without `strategy.scope.`

In [22]:
unreplicated_model = tf.keras.models.load_model(path)

unreplicated_model.compile(
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer=tf.keras.optimizers.Adam(),
    metrics=['accuracy'])

eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

  1/157 [..............................] - ETA: 32s - loss: 0.0985 - accuracy: 0.9688

  '"`sparse_categorical_crossentropy` received `from_logits=True`, but '


Eval loss: 0.03980964422225952, Eval Accuracy: 0.9861999750137329


Load the model with `strategy.scope.`

In [24]:
with strategy.scope():
  replicated_model = tf.keras.models.load_model(path)
  replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                           optimizer=tf.keras.optimizers.Adam(),
                           metrics=['accuracy'])
  
  eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
  print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))

  '"`sparse_categorical_crossentropy` received `from_logits=True`, but '


Eval loss: 0.03980964422225952, Eval Accuracy: 0.9861999750137329


### Examples and Tutorials
Here are some examples for using distribution strategy with keras fit/compile:
1. [Transformer](https://github.com/tensorflow/models/blob/master/official/nlp/transformer/transformer_main.py) example trained using `tf.distribute.MirroredStrategy`
2. [NCF](https://github.com/tensorflow/models/blob/master/official/recommendation/ncf_keras_main.py) example trained using `tf.distribute.MirroredStrategy`.

More examples listed in the [Distribution strategy guide](../../guide/distributed_training.ipynb#examples_and_tutorials)

## Next steps

* Read the [distribution strategy guide](../../guide/distributed_training.ipynb).
* Read the [Distributed Training with Custom Training Loops](custom_training.ipynb) tutorial.
* Visit the [Performance section](../../guide/function.ipynb) in the guide to learn more about other strategies and [tools](../../guide/profiler.md) you can use to optimize the performance of your TensorFlow models.

Note: `tf.distribute.Strategy` is actively under development and we will be adding more examples and tutorials in the near future. Please give it a try. We welcome your feedback via [issues on GitHub](https://github.com/tensorflow/tensorflow/issues/new).