# Distributing Tensorflow Across Devices and Servers

### Distributed Training with Keras

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow.compat.v2 as tf
import tensorflow_datasets as tfds
import numpy as np
import os
print("Nums GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Nums GPUs Available:  0


In [2]:
#Downloading the dataset where with_info being TRUE states the metadata for the entire dataset.
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']

In [3]:
#Define Distribution strategy object which will be used to create tf.distribute.MirroredStrategy.scope when building a model.
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))





INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


Number of devices: 1


In [4]:
#Setting input pipeline

num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

buffer_size = 10000
batch_size_per_replica = 64
batch_size = batch_size_per_replica * strategy.num_replicas_in_sync

In [5]:
# Normalizing to the 0 - 1 range given the pixel values = 0 - 255
def scale(image, label):
    image = tf.cast(image, tf.float32)
    image / 255
    return image, label

In [6]:
# Shuffle training data and batch it for training
# Keeping in-memory cache of the training data to improve performance

train_dataset = mnist_train.map(scale).cache().shuffle(buffer_size).batch(batch_size)
eval_dataset = mnist_test.map(scale).batch(batch_size)

In [7]:
with strategy.scope():
    model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])

In [8]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 26, 26, 32)        320       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 13, 13, 32)        0         
_________________________________________________________________
flatten (Flatten)            (None, 5408)              0         
_________________________________________________________________
dense (Dense)                (None, 64)                346176    
_________________________________________________________________
dense_1 (Dense)              (None, 10)                650       
Total params: 347,146
Trainable params: 347,146
Non-trainable params: 0
_________________________________________________________________


In [9]:
#Tensorboard writes a log which makes it possible to visualize the graphs.
#Model Checkpoint saves the model after every epoch
#Learning rate keeps changing after every epoch/batch.

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'checkpoints_{epoch}')

In [10]:
# Function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
    if epoch < 3:
        return 0.001
    elif epoch >= 3 and epoch < 7:
        return 0.0001
    else:
    return 0.00001

In [11]:
# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1, model.optimizer.lr.numpy()))

In [12]:

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='.\\logs_1',profile_batch = 100000000),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

In [13]:
#Train and Evaluate
model.fit(train_dataset, epochs=12, callbacks=callbacks)

Epoch 1/12
    938/Unknown - 44s 47ms/step - loss: 0.6543 - accuracy: 0.9089
Learning rate for epoch 1 is 0.0010000000474974513
Epoch 2/12
Learning rate for epoch 2 is 0.0010000000474974513
Epoch 3/12
Learning rate for epoch 3 is 0.0010000000474974513
Epoch 4/12
Learning rate for epoch 4 is 9.999999747378752e-05
Epoch 5/12
Learning rate for epoch 5 is 9.999999747378752e-05
Epoch 6/12
Learning rate for epoch 6 is 9.999999747378752e-05
Epoch 7/12
Learning rate for epoch 7 is 9.999999747378752e-05
Epoch 8/12
Learning rate for epoch 8 is 9.999999747378752e-06
Epoch 9/12
Learning rate for epoch 9 is 9.999999747378752e-06
Epoch 10/12
Learning rate for epoch 10 is 9.999999747378752e-06
Epoch 11/12
Learning rate for epoch 11 is 9.999999747378752e-06
Epoch 12/12
Learning rate for epoch 12 is 9.999999747378752e-06


<tensorflow.python.keras.callbacks.History at 0x187beb2a608>

In [14]:
# Check the checkpoint directory
# !dir {checkpoint_dir}
!dir {'training_checkpoints'}

 Volume in drive C is Windows 10
 Volume Serial Number is E4B6-0ECA

 Directory of C:\Users\User\TensorFlow_1\training_checkpoints

02/22/2020  08:38 PM    <DIR>          .
02/22/2020  08:38 PM    <DIR>          ..
02/22/2020  08:23 PM    <DIR>          assets
02/22/2020  08:38 PM                85 checkpoint
02/22/2020  08:32 PM         4,168,224 checkpoints_1.data-00000-of-00001
02/22/2020  08:32 PM             1,654 checkpoints_1.index
02/22/2020  08:37 PM         4,168,224 checkpoints_10.data-00000-of-00001
02/22/2020  08:37 PM             1,654 checkpoints_10.index
02/22/2020  08:37 PM         4,168,224 checkpoints_11.data-00000-of-00001
02/22/2020  08:37 PM             1,654 checkpoints_11.index
02/22/2020  08:38 PM         4,168,224 checkpoints_12.data-00000-of-00001
02/22/2020  08:38 PM             1,654 checkpoints_12.index
02/22/2020  08:33 PM         4,168,224 checkpoints_2.data-00000-of-00001
02/22/2020  08:33 PM             1,654 checkpoints_2.index
02/22/2020  08:33 PM   

In [15]:
# Model performance is defined by the lastest checkpoint then evaluate

model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
eval_loss, eval_acc = model.evaluate(eval_dataset)

print('\nEval loss: {}, \nEval Accuracy: {}'.format(eval_loss, eval_acc))

    157/Unknown - 5s 30ms/step - loss: 0.0673 - accuracy: 0.9833 5s 31ms/step - loss: 0.0686 - ac
Eval loss: 0.0672861734400711, 
Eval Accuracy: 0.983299970626831


In [16]:
from keras.models import load_model

model.save('./training_checkpoints', save_format='tf')
del model

Using TensorFlow backend.


Instructions for updating:
If using Keras pass *_constraint arguments to layers.


Instructions for updating:
If using Keras pass *_constraint arguments to layers.


INFO:tensorflow:Assets written to: ./training_checkpoints\assets


INFO:tensorflow:Assets written to: ./training_checkpoints\assets


In [17]:
import tensorflow as tf
# returns a compiled model identical to the previous one
new_model = tf.keras.models.load_model('./training_checkpoints')

In [18]:
new_model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 26, 26, 32)        320       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 13, 13, 32)        0         
_________________________________________________________________
flatten (Flatten)            (None, 5408)              0         
_________________________________________________________________
dense (Dense)                (None, 64)                346176    
_________________________________________________________________
dense_1 (Dense)              (None, 10)                650       
Total params: 347,146
Trainable params: 347,146
Non-trainable params: 0
_________________________________________________________________


In [19]:
#Loading without Strategy.scope
unreplicated_model = tf.keras.models.load_model('./training_checkpoints')

unreplicated_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer=tf.keras.optimizers.Adam(),
    metrics=['accuracy'])

eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)

print('\nEval loss: {}, \nEval Accuracy: {}'.format(eval_loss, eval_acc))


    157/Unknown - 4s 28ms/step - loss: 0.0673 - accuracy: 0.9833
Eval loss: 0.0672861734400711, 
Eval Accuracy: 0.983299970626831


In [20]:
#Using strategy.scope function.
with strategy.scope():
    replicated_model = tf.keras.models.load_model('./training_checkpoints')
    replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                           optimizer=tf.keras.optimizers.Adam(),
                           metrics=['accuracy'])

    eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
    print ('\nEval loss: {}, \nEval Accuracy: {}'.format(eval_loss, eval_acc))


    157/Unknown - 6s 36ms/step - loss: 0.0673 - accuracy: 0.9833 6s 36ms/step - loss: 0.0680 - accuracy: 
Eval loss: 0.0672861734400711, 
Eval Accuracy: 0.983299970626831


# Custom Training with tf.distribute.strategy

In [21]:
fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
train_images.shape, test_images.shape

((60000, 28, 28), (10000, 28, 28))

In [22]:
#Getting the image in range between 0 - 1
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

In [23]:
train_images = train_images.reshape(60000,28,28,1) 
test_images = test_images.reshape(10000,28,28,1)

train_images.shape, test_images.shape

((60000, 28, 28, 1), (10000, 28, 28, 1))

In [24]:
# This method auto-detects devices used
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))





INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


Number of devices: 1


In [25]:
#Setting-up an input pipeline
buffer_size = len(train_images)
global_batch_size = batch_size_per_replica * strategy.num_replicas_in_sync
Epochs = 10


#Create the datasets and distribute them
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(buffer_size).batch(global_batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size)

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

In [26]:
# Create a model
def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation = 'relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(64, 3, activation = 'relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation = 'relu'),
        tf.keras.layers.Dense(10)
    ])
    return model

In [27]:
# Create a checkpoint directory to store the checkpoints
checkpoint_dir = './custom_training_checkpoint'
checkpoint_prefix = os.path.join(checkpoint_dir, 'custom_training_checkpoint_{epoch}')
callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),]


In [28]:
#Define a Loss function
#tf.nn.compute_average_loss worked better with many GPU involved
#AUTO and SUM_OVER_BATCH_SIZE can give different values when aggregating the loss
# Reduction = NONE helps when you're using global batch size later.

with strategy.scope():
    loss_object = tf.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction = tf.keras.losses.Reduction.NONE)
    
    def compute_loss(labels, predictions):
        per_example_loss = loss_object(labels, predictions)
        return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

In [29]:
# Define the metrics to track loss and accuracy
# .result() can be used to get accumulated statistics at any time.

with strategy.scope():
    test_loss = tf.keras.metrics.Mean(name = 'test_loss')
    train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name = 'train_accuracy')
    test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name = 'test_accuracy')

In [30]:
# Training Loop
# model and optimizer must be created under strategy scope.

with strategy.scope():
    model = create_model()
    optimizer = tf.keras.optimizers.Adam()
    checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)

In [31]:
with strategy.scope():
    def train_step(inputs):
        images, labels = inputs
        
        with tf.GradientTape() as tape:
            predictions = model(images, training=True)
            loss = compute_loss(labels, predictions)
            
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
        train_accuracy.update_state(labels, predictions)
        return loss
    
    def test_step(inputs):
        images, labels = inputs
        
        predictions = model(images, training=False)
        t_loss = loss_object(labels, predictions)
        
        test_loss.update_state(t_loss)
        test_accuracy.update_state(labels, predictions)    

In [32]:
#Experimental_run_v2 replicates the provided computation and runs it
# with the distributed input.

with strategy.scope():
    @tf.function
    def distributed_train_step(dataset_inputs):
        per_replica_losses = strategy.experimental_run_v2(train_step, args=(dataset_inputs,))
        return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    
    @tf.function
    def distributed_test_step(dataset_inputs):
        return strategy.experimental_run_v2(test_step, args=(dataset_inputs,))
    
    
    #Training Loop
    for epoch in range(Epochs):
        total_loss = 0.0
        num_batches = 0
        for x in train_dist_dataset:
            total_loss += distributed_train_step(x)
            num_batches += 1
        train_loss = total_loss / num_batches
        
    #Testing Loop
    for x in test_dist_dataset:
        distributed_test_step(x)
        
    if epoch %2 == 0:
        checkpoint.save(checkpoint_prefix)
        
    template = ('\nEpoch {}, Loss: {}, Accuracy: {}, \nTest Loss: {}, Test Accuracy: {}')
    print(template.format(epoch + 1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))
    
    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()


Epoch 10, Loss: 0.15721701085567474, Accuracy: 90.46133422851562, 
Test Loss: 0.26926979422569275, Test Accuracy: 90.27000427246094


In [33]:
#Restore the latest checkpoint and test
#A model checkpointed with a tf.distribute.strategy can be restored with or with a strategy.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name= 'eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size)

In [34]:
@tf.function
def eval_step(images, labels):
    predictions = new_model(images, training=False)
    eval_accuracy(labels, predictions)

In [35]:
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
    eval_step(images, labels)

print('Accuracy after model restoration without strategy: {}'.format(eval_accuracy.result()*100))

Accuracy after model restoration without strategy: 11.079999923706055


In [36]:
#Using iterators to go through a number of steps but not the entire dataset.
with strategy.scope():
    for _ in range(Epochs):
        total_loss = 0.0
        num_batches = 0
        train_iter = iter(train_dist_dataset)
        
    for _ in range(10):
        total_loss += distributed_train_step(next(train_iter))
        num_batches += 1
        average_train_loss = total_loss / num_batches
        
    template = ('\nEpoch {}, Loss: {}, \nAccuracy: {}')
    print(template.format(epoch + 1, average_train_loss, train_accuracy.result()*100))
    train_accuracy.reset_states()


Epoch 10, Loss: 0.12422867119312286, 
Accuracy: 96.09375


In [37]:
#Iterating inside a tf.function using the entire input train_dist_dataset.
with strategy.scope():
    @tf.function
    def distributed_train_epoch(dataset):
        total_loss = 0.0
        num_batches = 0
        for x in dataset:
            per_replica_losses = strategy.experimental_run_v2(train_step, args=(x,))
            total_loss += strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
            num_batches += 1
        return total_loss / tf.cast(num_batches, dtype=tf.float32)
        
    for epoch in range(Epochs):
        train_loss = distributed_train_epoch(train_dist_dataset)
        
        template = ('Epoch {}, \nLoss: {}, Accuracy: {}')
        print(template.format(epoch+1, train_loss, train_accuracy.result()*100))
            
        train_accuracy.reset_states()

Epoch 1, 
Loss: 0.1418384462594986, Accuracy: 94.71333312988281
Epoch 2, 
Loss: 0.13096915185451508, Accuracy: 95.17833709716797
Epoch 3, 
Loss: 0.12016792595386505, Accuracy: 95.44166564941406
Epoch 4, 
Loss: 0.10770862549543381, Accuracy: 95.99166870117188
Epoch 5, 
Loss: 0.10154162347316742, Accuracy: 96.22666931152344
Epoch 6, 
Loss: 0.09043881297111511, Accuracy: 96.61166381835938
Epoch 7, 
Loss: 0.08601397275924683, Accuracy: 96.7633285522461
Epoch 8, 
Loss: 0.0738043338060379, Accuracy: 97.25
Epoch 9, 
Loss: 0.07035011053085327, Accuracy: 97.40999603271484
Epoch 10, 
Loss: 0.06455264985561371, Accuracy: 97.57666778564453


In [38]:
# def model_fn(features, labels, mode):
#     layer = tf.keras.layers.Dense(1)
#     logits = layer(features)
    
#     if mode == tf.estimator.ModeKeys.PREDICT:
#         predictions = {'logits' : logits}
#         return tf.estimator.EstimatorSpec(mode, predictions=predictions)
    
#     loss = tf.keras.losses.mean_squared_error(labels=labels,
#                                              predictions=tf.reshape(logits,[]))
#     if mode == tf.estimator.ModeKeys.Eval:
#         return tf.estimator.EstimatorSpec(mode, loss=loss)
    
#     if mode == tf.estimator.ModeKeys.TRAIN:
#         train_op = tf.keras.optimizers.SGD(0.2).minimize(loss_loss_fn())
#         return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

In [39]:
# def input_fn():
#     features = tf.data.Dataset.from_tensor_slices([[1.]]).repeat(100)
#     labels = tf.data.Dataset.from_tensor_slices(1.).repeat(100)
#     return dataset_ops.Dataset.zip((features, labels))

In [40]:
# distribution = tf.distribute.MirroredStrategy()
# config = tf.estimator.RunConfig(train_distribute=distribution)
# classifier = tf.estimator.Estimator(model_fn=model_fn,config=config)
# classsifier.train(input_fn=input_fn)