In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
from time import perf_counter
from model_toy import get_toy_ResNet

root_logs = os.path.join('logs', 'custom')

# tf.debugging.set_log_device_placement(True)
physical_devices = tf.config.get_visible_devices('GPU')
tf.config.set_visible_devices(physical_devices[:2], 'GPU')
tf.config.get_visible_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'),
 PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]

In [2]:
from sklearn.model_selection import train_test_split
(x_train_full, y_train_full), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
x_train_full, x_test = np.float32(x_train_full/255.), np.float32(x_test/255.)
x_train, x_val,  y_train, y_val  = train_test_split(x_train_full, y_train_full)

# Training

In [3]:
batch_size = 1024
epochs = 10

train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(2048).batch(batch_size)
val_ds   = tf.data.Dataset.from_tensor_slices((x_val, y_val)).shuffle(2048).batch(32)

## Keras built-in methods
First let's check with a single device

In [4]:
toy_res = get_toy_ResNet()
toy_res.compile(loss='sparse_categorical_crossentropy',
              optimizer="RMSProp",
              metrics=["accuracy"])

In [5]:
history = toy_res.fit(train_ds, batch_size=batch_size, epochs=epochs, verbose=1,
                      validation_data=(x_val, y_val))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Now with all the devices available.

In [6]:
mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
    dist_res = get_toy_ResNet()
    dist_res.compile(loss='sparse_categorical_crossentropy',
                     optimizer="RMSProp",
                     metrics=["accuracy"])
    
history = dist_res.fit(x=x_train, y=y_train, batch_size=batch_size, epochs=epochs, verbose=1,
                      validation_data=(x_val, y_val))

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
Epoch 1/10
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:GPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:GPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:GPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:GPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:GPU:0 then broadcast to ('/

# Custom training

In [7]:
mirrored_strategy = tf.distribute.MirroredStrategy()

# Model definition - as before
with mirrored_strategy.scope():
    dist_res = get_toy_ResNet()
    optimizer = tf.keras.optimizers.RMSprop()

# Change to distributed dataset
dist_train_ds = mirrored_strategy.experimental_distribute_dataset(train_ds)

@tf.function
def train_step(x_dist, y_dist):
    def step_fn(x_batch, y_batch):
        with tf.GradientTape() as tape:
            y_pred = dist_res(x_batch, training=True)
            cross_entropy = tf.keras.losses.sparse_categorical_crossentropy(
                y_true=y_batch, y_pred=y_pred)
            loss = tf.reduce_sum(cross_entropy) * (1.0 / batch_size)
        
        grads = tape.gradient(loss, dist_res.trainable_variables)
        optimizer.apply_gradients(list(zip(grads, dist_res.trainable_variables)))
        return cross_entropy
    # Run the step_fn in parallel
    per_example_losses = mirrored_strategy.run(step_fn, args=(x_dist, y_dist))
    # AllReduce
    mean_loss = mirrored_strategy.reduce(
        tf.distribute.ReduceOp.MEAN, per_example_losses, axis=0)
    return mean_loss

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


In [8]:
with mirrored_strategy.scope():
    for epoch in range(epochs):
        start = perf_counter()
        batch_losses = []
        for x_dits_batch, y_dits_batch in dist_train_ds:
            batch_loss = train_step(x_dits_batch, y_dits_batch)
            batch_losses.append(batch_loss.numpy())
            
        print("Epoch %d/%d. %.3fs\tLast batch loss: %.3f\t Mean batch loss: %.3f"%
                  (epoch, epochs, perf_counter()-start, batch_losses[-1], np.mean(batch_losses)))

Epoch 0/10. 11.515s	Last batch loss: 2.018	 Mean batch loss: 2.190
Epoch 1/10. 1.214s	Last batch loss: 1.695	 Mean batch loss: 1.848
Epoch 2/10. 1.212s	Last batch loss: 1.565	 Mean batch loss: 1.642
Epoch 3/10. 1.223s	Last batch loss: 1.402	 Mean batch loss: 1.504
Epoch 4/10. 1.197s	Last batch loss: 1.366	 Mean batch loss: 1.380
Epoch 5/10. 1.192s	Last batch loss: 1.265	 Mean batch loss: 1.284
Epoch 6/10. 1.217s	Last batch loss: 1.162	 Mean batch loss: 1.200
Epoch 7/10. 1.182s	Last batch loss: 1.089	 Mean batch loss: 1.136
Epoch 8/10. 1.193s	Last batch loss: 1.057	 Mean batch loss: 1.091
Epoch 9/10. 1.184s	Last batch loss: 1.041	 Mean batch loss: 1.036


# AllReduce ops

You can change AllReduce algorithm used setting `cross_device_ops` to:
- `tf.distribute.NcclAllReduce` (the default one)
- `tf.distribute.HierarchicalCopyAllReduce`
- `tf.distribute.ReductionToOneDevice` 

In [12]:
mirrored_strategy = tf.distribute.MirroredStrategy(cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

with mirrored_strategy.scope():
    dist_res = get_toy_ResNet()
    dist_res.compile(loss='sparse_categorical_crossentropy',
                     optimizer="RMSProp",
                     metrics=["accuracy"])
    
history = dist_res.fit(x=x_train, y=y_train, batch_size=batch_size, epochs=epochs, verbose=1,
                      validation_data=(x_val, y_val))

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
Epoch 1/10
INFO:tensorflow:batch_all_reduce: 27 all-reduces with algorithm = hierarchical_copy, num_packs = 1
INFO:tensorflow:batch_all_reduce: 27 all-reduces with algorithm = hierarchical_copy, num_packs = 1
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [13]:
mirrored_strategy = tf.distribute.MirroredStrategy(cross_device_ops=tf.distribute.ReductionToOneDevice())

with mirrored_strategy.scope():
    dist_res = get_toy_ResNet()
    dist_res.compile(loss='sparse_categorical_crossentropy',
                     optimizer="RMSProp",
                     metrics=["accuracy"])
    
history = dist_res.fit(x=x_train, y=y_train, batch_size=batch_size, epochs=epochs, verbose=1,
                      validation_data=(x_val, y_val))

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
Epoch 1/10
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:GPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:GPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:GPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1').
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
