<a href="https://colab.research.google.com/github/salim-hbk/ai-ml/blob/main/DistributedTrainingGPU.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Parallel distributed training**

* In this architecture, we create multiple instances of same model, place it in different devices, here device means CPU or accelerators like GPU, TPU, these device can dwell in same computer or distributed across network
* Then we create different slices of datasets to feed into different devices, note that different models in different devices might be initialized with different weights so we need to have aggregate these weights during training
* Mirrored variables are those variables that have common stored values that it contains or updated by the 2 or more models.
* workers is a software where training takes place


In [1]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np

In [10]:
import os
# Note that it generally has a minimum of 8 cores, but if your GPU has
# less, you need to set this. In this case one of my GPUs has 4 cores
os.environ["TF_MIN_GPU_MULTIPROCESSOR_COUNT"] = "4"

# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
# If you have *different* GPUs in your system, you probably have to set up cross_device_ops like this
strategy = tf.distribute.MirroredStrategy(cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
print(f'Number of devices {strategy.num_replicas_in_sync}')

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Number of devices 1


In [7]:
(train_x, train_y), (test_x, test_y) = tf.keras.datasets.fashion_mnist.load_data()

train_x = np.reshape(train_x, [-1, 28,28,1])
test_x =  np.reshape(test_x, [-1, 28,28,1])
train_x = train_x/255.
test_x = test_x/255.

In [11]:
BUFFER_SIZE = len(train_x)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

In [13]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_x, test_y)).batch(GLOBAL_BATCH_SIZE)

Next we need to create distributed dataset to run on parallel devices

In [14]:
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

In [20]:
def base_model():
  input = tf.keras.layers.Input(shape=[28,28,1,])
  x = tf.keras.layers.Conv2D(32, 3, activation='relu')(input)
  x = tf.keras.layers.MaxPooling2D()(x)
  x = tf.keras.layers.Conv2D(64, 3, activation='relu')(x)
  x = tf.keras.layers.MaxPooling2D()(x)
  x = tf.keras.layers.Flatten()(x)
  x = tf.keras.layers.Dense(64, activation='relu')(x)
  output = tf.keras.layers.Dense(10, activation='softmax')(x)
  model = tf.keras.models.Model(inputs=input, outputs=output)
  return model  

* Instead of model.compile, we're going to do custom training, so let's do that
* within a strategy scope
* We will use sparse categorical crossentropy as always. But, instead of having the loss function
* manage the map reduce across GPUs for us, we'll do it ourselves with a simple algorithm.
* the map reduce is how the losses get aggregated
* Set reduction to `none` so we can do the reduction afterwards and divide byglobal batch size.

In [21]:
with strategy.scope():
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)

  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    print(per_example_loss)
    return tf.nn.compute_average_loss(per_example_loss=per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

  test_loss = tf.keras.metrics.Mean(name='test_loss')

  # Accuracy on train and test will be SparseCategoricalAccuracy
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

  optimizer = tf.keras.optimizers.Adam()
  model = base_model()

###########################
# Training Steps Functions
###########################
* `run` replicates the provided computation and runs it
* with the distributed input.

In [30]:
@tf.function
def distributed_train_step(dataset):
  per_replica_losses = strategy.run(train_step, args=(dataset,) )
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)


def train_step(dataset):
  inputs, labels = dataset
  with tf.GradientTape() as tape:
    prediction = model(inputs, training=True)
    loss = compute_loss(labels, prediction)

  gradient =  tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradient, model.trainable_variables))

  train_accuracy.update_state(labels, prediction)
  return loss

@tf.function
def distributed_test_step(test_inputs):
  return strategy.run(test_step, args=(test_inputs,))


def test_step(data_input):
  data, label = data_input
  logits =  model(data)
  loss = loss_object(label, logits)

  test_loss.update_state(loss)
  test_accuracy.update_state(label, logits)


In [31]:
epochs = 10

for epoch in range(epochs):
  total_loss =0.
  num_batches = 0

  for batch in train_dist_dataset:
    total_loss += distributed_train_step(batch)
    num_batches +=1
  train_loss = total_loss/num_batches

  for batch in test_dist_dataset:
    distributed_test_step(batch)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, " "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100, test_loss.result(), test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()

Tensor("sparse_categorical_crossentropy/weighted_loss/Mul:0", shape=(64,), dtype=float32, device=/job:localhost/replica:0/task:0/device:GPU:0)
Tensor("sparse_categorical_crossentropy/weighted_loss/Mul:0", shape=(32,), dtype=float32, device=/job:localhost/replica:0/task:0/device:GPU:0)
Epoch 1, Loss: 0.33434057235717773, Accuracy: 84.86582946777344, Test Loss: 0.33170926570892334, Test Accuracy: 87.93000030517578
Epoch 2, Loss: 0.28858351707458496, Accuracy: 89.30332946777344, Test Loss: 0.303122878074646, Test Accuracy: 88.76000213623047
Epoch 3, Loss: 0.2555476725101471, Accuracy: 90.63500213623047, Test Loss: 0.2796458601951599, Test Accuracy: 89.80000305175781
Epoch 4, Loss: 0.2331286072731018, Accuracy: 91.34333038330078, Test Loss: 0.2780916392803192, Test Accuracy: 89.81000518798828
Epoch 5, Loss: 0.2133655548095703, Accuracy: 92.1816635131836, Test Loss: 0.2587366998195648, Test Accuracy: 90.5999984741211
Epoch 6, Loss: 0.19528432190418243, Accuracy: 92.69667053222656, Test Loss