<a href="https://colab.research.google.com/github/tataxverse/tensorflow-specialisation-deeplearning.ai/blob/main/distributed-training/multi_gpu_mirrored_strategy_w4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import tensorflow as tf
import numpy as np
import os

**1. Setup Distribution Strategy**

In [3]:
os.environ["TF_MIN_GPU_MULTIPROCESSOR_COUNT"] = "4" #number of gpu cores

strategy=tf.distribute.MirroredStrategy(cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))

Number of devices: 1


**2. Prepare the Data**

In [4]:
fashion_mnist=tf.keras.datasets.fashion_mnist
(train_images,train_labels),(test_images,test_labels)=fashion_mnist.load_data()

#Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).

train_images=train_images[...,None]
test_images=test_images[...,None]

train_images=train_images/np.float32(255)
test_images=test_images/np.float32(255)

# Batch the input data
BUFFER_SIZE=len(train_images)
BATCH_SIZE_PER_REPLICA=64
GLOBAL_BATCH_SIZE=BATCH_SIZE_PER_REPLICA*strategy.num_replicas_in_sync

# Create Datasets from the batches
train_dataset=tf.data.Dataset.from_tensor_slices((train_images,train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset=tf.data.Dataset.from_tensor_slices((test_images,test_labels)).batch(GLOBAL_BATCH_SIZE)

# Create Distributed Datasets from the datasets
train_dist_dataset=strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset=strategy.experimental_distribute_dataset(test_dataset)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
[1m29515/29515[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
[1m26421880/26421880[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
[1m5148/5148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
[1m4422102/4422102[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 0us/step


**3. Define the Model**

In [5]:
def create_model():
  model=tf.keras.Sequential([
      tf.keras.layers.Conv2D(32,3,activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64,3,activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64,activation='relu'),
      tf.keras.layers.Dense(10,activation='softmax')
  ])
  return model

**4. Configure custom training**

Instead of model.compile(), we're going to do custom training, so let's do that within a strategy scope.

In [6]:
with strategy.scope():

  # Remember -- the map reduce is how the losses get aggregated
  # Set reduction to `none` so we can do the reduction afterwards and divide byglobal batch size.

  loss_object=tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE
  )

  def compute_loss(labels,predictions):

    # Notice that per_example_loss will have an entry per GPU
    # so in this case there'll be 2 -- i.e. the loss for each replica

    per_example_loss=loss_object(labels,predictions)

    print(per_example_loss)
    return tf.nn.compute_average_loss(per_example_loss,global_batch_size=GLOBAL_BATCH_SIZE)

  #We'll just reduce by getting the average of the losses
  test_loss=tf.keras.metrics.Mean(name='test_loss')

  # Accuracy on train and test will be SparseCategorical
  train_accuracy=tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
  test_accuracy=tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

  optimizer=tf.keras.optimizers.Adam()

  # Create the model within the scope
  model=create_model()

**5. Train and Test Steps Functions**

In [7]:

@tf.function #for better performance

def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,)) #Distributes the train_step function to each replica (GPU/CPU). Runs it in parallel on each device.
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) #Aggregates the loss values across replicas (e.g., from all GPUs).

def train_step(inputs):
  images, labels = inputs
  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss

#######################
# Test Steps Functions
#######################
@tf.function #Same as training, just distributes test_step across replicas.
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)

#distributed_train_step  →  strategy.run(train_step)  →  loss computed and gradients applied
#distributed_test_step   →  strategy.run(test_step)   →  metrics updated


In [11]:
epochs=10
for epoch in range(epochs):
  total_loss=0.0 #total_loss is used to sum all batch losses in an epoch
  num_batches=0
  for batch in train_dist_dataset:
    total_loss+=distributed_train_step(batch)
    num_batches+=1
  train_loss=total_loss/num_batches #Loss is accumulated and averaged later as train_loss.

  for batch in test_dist_dataset:
    distributed_test_step(batch)

  template=("[Epoch {}, Batch {}] Loss: {:.2f}, Accuracy: {:.2f}, Test Loss: {:.2f}, Test Accuracy: {:.2f}")
  print(template.format(epoch+1,batch,train_loss,train_accuracy.result()*100,test_loss.result(),test_accuracy.result()*100))
  test_loss.reset_state()
  train_accuracy.reset_state()
  test_accuracy.reset_state()


[Epoch 1, Batch (<tf.Tensor: shape=(16, 28, 28, 1), dtype=float32, numpy=
array([[[[0.],
         [0.],
         [0.],
         ...,
         [0.],
         [0.],
         [0.]],

        [[0.],
         [0.],
         [0.],
         ...,
         [0.],
         [0.],
         [0.]],

        [[0.],
         [0.],
         [0.],
         ...,
         [0.],
         [0.],
         [0.]],

        ...,

        [[0.],
         [0.],
         [0.],
         ...,
         [0.],
         [0.],
         [0.]],

        [[0.],
         [0.],
         [0.],
         ...,
         [0.],
         [0.],
         [0.]],

        [[0.],
         [0.],
         [0.],
         ...,
         [0.],
         [0.],
         [0.]]],


       [[[0.],
         [0.],
         [0.],
         ...,
         [0.],
         [0.],
         [0.]],

        [[0.],
         [0.],
         [0.],
         ...,
         [0.],
         [0.],
         [0.]],

        [[0.],
         [0.],
         [0.],
         ...,
   