In [1]:
import numpy as np
import tensorflow as tf
import time

tf.__version__

'1.14.0'

Get a list of physical devices visible to the runtime

In [2]:
cpus = tf.config.experimental.list_physical_devices('CPU') # get a list of cpus
print(cpus)

[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]


Splits the CPU into 2 virtual devices

In [3]:
try:
    tf.config.experimental.set_virtual_device_configuration(cpus[0],
                                                            [tf.config.experimental.VirtualDeviceConfiguration(),
                                                             tf.config.experimental.VirtualDeviceConfiguration()])
    virtual_cpus = tf.config.experimental.list_logical_devices('CPU')
except RuntimeError as e:
    print(e)
print(virtual_cpus)

[LogicalDevice(name='/job:localhost/replica:0/task:0/device:CPU:0', device_type='CPU'), LogicalDevice(name='/job:localhost/replica:0/task:0/device:CPU:1', device_type='CPU')]


Create a strategy to distribute the variables and the graph

In [4]:
strategy = tf.distribute.MirroredStrategy(devices=[cpu.name for cpu in virtual_cpus])

W1219 14:51:47.002589 19160 cross_device_ops.py:1177] Not all devices in `tf.distribute.Strategy` are visible to TensorFlow.


In [5]:
BATCH_SIZE_PER_REPLICA = 128
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync # 128 * 2 = 256
N_CLASSES = 10
EPOCHS = 10

Load dataset

In [6]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

In [7]:
def preprocess_fn(x, y):
    '''
    Normalize input images and one-hot encode the labels 
    '''
    return (tf.cast(x, tf.float32) / 255., tf.one_hot(y, N_CLASSES))

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).batch(GLOBAL_BATCH_SIZE).map(preprocess_fn)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)

In [8]:
with strategy.scope():
    iterator = train_dist_dataset.make_initializable_iterator()
    iterator_init = iterator.initialize()
    next_element = iterator.get_next()

A simple neural network for demonstration

In [9]:
def mnist_model():
    input_layer = tf.keras.Input(shape=(28, 28)) # (None, 28, 28)
    x = tf.keras.layers.Flatten()(input_layer) # (None, 28*28)
    x = tf.keras.layers.Dense(1024, activation='relu')(x) # (None, 1024)
    output_layer = tf.keras.layers.Dense(N_CLASSES, activation='softmax')(x) # (None, N_CLASSES)
    
    return tf.keras.Model(input_layer, output_layer)

Create the model and optimizer inside the strategy's scope

In [10]:
with strategy.scope():
    model = mnist_model()
    optimizer = tf.train.GradientDescentOptimizer(0.001)

W1219 14:51:48.413801 19160 deprecation.py:506] From f:\anaconda3\envs\tensorflow1.14\lib\site-packages\tensorflow\python\ops\init_ops.py:1251: calling VarianceScaling.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


Training loop

In [11]:
with strategy.scope():
    def train_step(inputs):
        '''
        inputs: "per-replica" values, such as those produced by a "distributed Dataset"
        '''
        images, labels = inputs
        preds = model(images)
        per_sample_loss = tf.keras.losses.CategoricalCrossentropy(reduction='none')(labels, preds) # (batch_size,)
        loss = tf.reduce_sum(per_sample_loss) / tf.cast(GLOBAL_BATCH_SIZE, per_sample_loss.dtype) # scalar
        train_op = optimizer.minimize(loss)
        
        # make sure `loss` will only be returned after `train_op` have executed
        with tf.control_dependencies([train_op]): 
            return tf.identity(loss)

<b>Note</b>: `tf.control_dependencies` documentation explicitly stated that "The control dependencies context applies only to ops that are constructed within the context. Merely using an op or tensor in the context does not add a control dependency". So we have to use `tf.identity` as a workaround (to make an extra op within the control dependency context).

In [12]:
with strategy.scope():
    def distributed_train_step(dataset_inputs):
        per_replica_losses = strategy.experimental_run_v2(train_step, args=(dataset_inputs,))
        mean_loss = strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses)
        return mean_loss

In [13]:
with strategy.scope():
    global_init = tf.global_variables_initializer()
    
    loss_op = distributed_train_step(next_element)
    with tf.Session() as sess:
        sess.run(global_init)
        for epoch in range(EPOCHS):
            sess.run(iterator_init)
            start = time.time()
            while True:
                try:
                    loss_result = sess.run(loss_op)
                except tf.errors.OutOfRangeError:
                    print('Epoch {} finished! - Loss: {:.2f} - Time elapsed: {:.2f}'.format(epoch, loss_result, time.time()-start))
                    break

Epoch 0 finished! - Loss: 2.07 - Time elapsed: 3.65
Epoch 1 finished! - Loss: 1.76 - Time elapsed: 3.26
Epoch 2 finished! - Loss: 1.60 - Time elapsed: 3.27
Epoch 3 finished! - Loss: 1.40 - Time elapsed: 3.22
Epoch 4 finished! - Loss: 1.26 - Time elapsed: 3.37
Epoch 5 finished! - Loss: 1.16 - Time elapsed: 3.39
Epoch 6 finished! - Loss: 1.07 - Time elapsed: 3.29
Epoch 7 finished! - Loss: 1.01 - Time elapsed: 3.46
Epoch 8 finished! - Loss: 0.98 - Time elapsed: 3.37
Epoch 9 finished! - Loss: 0.88 - Time elapsed: 3.35
