## Training a Model Using Multiple GPU Cards

The design of multiple GPU training is inspired by [the official cifar
tutorial with multiple gpu option](https://www.tensorflow.org/tutorials/deep_cnn#training_a_model_using_multiple_gpu_cards).

To be more precise we do the following:

1. We pin all the model variables onto CPU device.
2. All operations are pinned onto the GPUs. Each GPUs has a replica of all the operations.
3. On each iteration varialbes of the model (weights) are being trasnfered to each GPU device and each device processes a separate batch in parallel.
4. After gradients for each batch have been computed, they are being transfered back to the main memory where they are averaged and the model variables are being updated.

The decision to store the model parameters on the CPU is caused by the fact that transferring the data between
devices is [costly](http://stackoverflow.com/questions/34428850/variables-on-cpu-training-gradients-on-gpu
). The parameters are being moved with the help of DMA (Direct memory access). The speed of transfer (according to this [resource](:http://timdettmers.com/2015/03/09/deep-learning-hardware-guide/)) is 12 GB/s. Average model size is 553 MB ([VGG-16](http://stackoverflow.com/questions/28232235/how-to-calculate-the-number-of-parameters-of-convolutional-neural-networkscnns)). This justifies the decision to store model parameters on the main memory. If we store the model on the main memory we have to pass $gpuNumber * 2$ messages after computing gradients. If we would have decided to store parameters on each gpu, we would have to pass $(gpuNumber-1)^2$ messages. We opt for the first option where the number of messages being sent is linear in the number of available gpu cards.

In case of segmentation, we minimize the pixel-wise cross entropy loss which we also sum over the whole batch. This loss is separable, meaning that gradient of sum is a sum of gradients. This way we can split our batch into equal sum parts and compute gradients for them in parallel on a separate gpu devices. After that we can sum these gradients and apply them to our variables to update them. The synchronization is performed by tensorflow implicitly (the system waits for all gpus to finish computing gradients before proceeding).

Below you can see the example code, which we used to measure the speed up of our training with one gpu card and four gpu cards. No saving of models or logging of errors was performed for simplicity of the experiments. We also provide the speed up plots for our experiments. All the experiments were performed on the FCN-32s model which is based on VGG-16 networks architecture.

Below, we demonstrate the approach that we used for our multiple gpu card training, speedup and time plots. All the experiments were held with the batch size equal to 10 (meaning that each gpu processed batch of size of 10 in parallel). We carried out our experiments with 1, 2, 3 and 4 GPU cards. We achieved $3.3$ speedup with $4$ GPUs. To compare the processing time each setup (1, 2, 3 or 4 GPUs) had to process a fixed number of batches (160 in our experiment). We measured the time that it took for each setup to complete the task. It should be stated that we could have gotten a better estimates of speedup if we would have run our experiments for a longer time. Overall, we got resutls that are similar to the [official tensorflow multi-gpu benchmark](https://www.tensorflow.org/performance/benchmarks). Below, you can find the code for 1 and 4 gpu training setup and all the plots. The code has the respective output timing in the cells.

![title](multiple_gpu_arch.png)

![title](multiple_gpu_time.png)

![title](multiple_gpu_speedup.png)

In [1]:
%matplotlib inline

import tensorflow as tf
import numpy as np
import skimage.io as io
import os, sys
from matplotlib import pyplot as plt


# Add a path to a custom fork of TF-Slim
# Get it from here:
# https://github.com/warmspringwinds/models/tree/fully_conv_vgg
sys.path.append("my_models/slim/")

# Add path to the cloned library
sys.path.append("tf-image-segmentation/")

checkpoints_dir = 'checkpoints'


slim = tf.contrib.slim
vgg_checkpoint_path = os.path.join(checkpoints_dir, 'vgg_16.ckpt')

from tf_image_segmentation.utils.tf_records import read_tfrecord_and_decode_into_image_annotation_pair_tensors
from tf_image_segmentation.models.fcn_32s import FCN_32s, extract_vgg_16_mapping_without_fc8

from tf_image_segmentation.utils.pascal_voc import pascal_segmentation_lut

from tf_image_segmentation.utils.training import get_valid_logits_and_labels

from tf_image_segmentation.utils.augmentation import (distort_randomly_image_color,
                                                      flip_randomly_left_right_image_with_annotation,
                                                      scale_randomly_image_with_annotation_with_fixed_size_output)

image_train_size = [384, 384]
number_of_classes = 21
tfrecord_filename = 'pascal_augmented_train.tfrecords'
pascal_voc_lut = pascal_segmentation_lut()
class_labels = pascal_voc_lut.keys()

# Also represents the number of gpus -- should have enough of them on the current
# workstation
num_clones = 4

import time


def average_gradients(tower_grads):
    """Calculate the average gradient for each shared variable across all towers.
    Note that this function provides a synchronization point across all towers.
    Args:
    tower_grads: List of lists of (gradient, variable) tuples. The outer list
      is over individual gradients. The inner list is over the gradient
      calculation for each tower.
    Returns:
     List of pairs of (gradient, variable) where the gradient has been averaged
     across all towers.
    """
    average_grads = []
    for grad_and_vars in zip(*tower_grads):
        # Note that each grad_and_vars looks like the following:
        #   ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))
        grads = []
        for g, _ in grad_and_vars:
          # Add 0 dimension to the gradients to represent the tower.
          expanded_g = tf.expand_dims(g, 0)

          # Append on a 'tower' dimension which we will average over below.
          grads.append(expanded_g)

        # Average over the 'tower' dimension.
        grad = tf.concat(0, grads)
        grad = tf.reduce_mean(grad, 0)

        # Keep in mind that the Variables are redundant because they are shared
        # across towers. So .. we will just return the first tower's pointer to
        # the Variable.
        v = grad_and_vars[0][1]
        grad_and_var = (grad, v)
        average_grads.append(grad_and_var)
    return average_grads


filename_queue = tf.train.string_input_producer(
    [tfrecord_filename], num_epochs=1)

image, annotation = read_tfrecord_and_decode_into_image_annotation_pair_tensors(filename_queue)

# Various data augmentation stages
image, annotation = flip_randomly_left_right_image_with_annotation(image, annotation)

# image = distort_randomly_image_color(image)

resized_image, resized_annotation = scale_randomly_image_with_annotation_with_fixed_size_output(image, annotation, image_train_size)


resized_annotation = tf.squeeze(resized_annotation)

image_batch, annotation_batch = tf.train.shuffle_batch( [resized_image, resized_annotation],
                                             batch_size=10,
                                             capacity=3000,
                                             num_threads=4,
                                             min_after_dequeue=1000)


batch_queue = slim.prefetch_queue.prefetch_queue(
                  [image_batch, annotation_batch], capacity= 2 * num_clones)

variables_device = '/cpu:0'

with tf.device(variables_device):
    
    opt = tf.train.AdamOptimizer(learning_rate=0.000001)

    # Puts weights onto cpu
    with slim.arg_scope([slim.model_variable, slim.variable],
                         device=variables_device):
        
        tower_grads = []
        losses = []
        
        for gpu_number in xrange(num_clones):
            # Get different name for each copy which is being placed
            # on separate gpus.
            with tf.name_scope('gpu_number_{}_ops'.format(gpu_number)):

                # Actually place all operations on gpu
                with tf.device('/gpu:{}'.format(gpu_number)):

                    images_batch, annotation_batch = batch_queue.dequeue()

                    # here we actually want to also define loss
                    upsampled_logits_batch, vgg_16_variables_mapping = FCN_32s(image_batch_tensor=images_batch,
                                                                               number_of_classes=number_of_classes,
                                                                               is_training=True)
                    
                    # Reuse variables for the next tower.
                    tf.get_variable_scope().reuse_variables()

                    valid_labels_batch_tensor, valid_logits_batch_tensor = get_valid_logits_and_labels(annotation_batch_tensor=annotation_batch,
                                                                                         logits_batch_tensor=upsampled_logits_batch,
                                                                                        class_labels=class_labels)

                    cross_entropies = tf.nn.softmax_cross_entropy_with_logits(logits=valid_logits_batch_tensor,
                                                                              labels=valid_labels_batch_tensor)

                    # Normalize the cross entropy -- the number of elements
                    # is different during each step due to mask out regions
                    cross_entropy_sum = tf.reduce_mean(cross_entropies)

                    grad_vars = opt.compute_gradients(cross_entropy_sum)
                    
                    tower_grads.append(grad_vars)
                    losses.append(cross_entropy_sum)
                    
    averaged_grads_vars = average_gradients(tower_grads)
    
    averaged_loss = tf.reduce_mean(losses)
                
    # Here we apply our gradient on the cpu, by
    # collecting the gradients from each gpu and averaging them
    # and finally applying them at the end
    # todo: add global step to count the actual step number
    apply_grad_op = opt.apply_gradients(averaged_grads_vars)
    
    
# Variable's initialization functions
vgg_16_without_fc8_variables_mapping = extract_vgg_16_mapping_without_fc8(vgg_16_variables_mapping)


init_fn = slim.assign_from_checkpoint_fn(model_path=vgg_checkpoint_path,
                                         var_list=vgg_16_without_fc8_variables_mapping)

global_vars_init_op = tf.global_variables_initializer()

#The op for initializing the variables.
local_vars_init_op = tf.local_variables_initializer()

combined_op = tf.group(local_vars_init_op, global_vars_init_op)


with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True))  as sess:
    
    sess.run(combined_op)
    init_fn(sess)

    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    
    
    start = time.time()    

    for i in xrange(40):
    
        cross_entropy, _ = sess.run([ averaged_loss,
                                      apply_grad_op
                                    ])
        # Loss is going down -- was checked,
        # comment out to make sure
        # print( str(i) + " Current loss: "  + str(cross_entropy))
    
    end = time.time()
    print(end - start)
    
    coord.request_stop()
    coord.join(threads)   

79.6332910061


In [1]:
%matplotlib inline

import tensorflow as tf
import numpy as np
import skimage.io as io
import os, sys
from matplotlib import pyplot as plt


# Add a path to a custom fork of TF-Slim
# Get it from here:
# https://github.com/warmspringwinds/models/tree/fully_conv_vgg
sys.path.append("my_models/slim/")

# Add path to the cloned library
sys.path.append("tf-image-segmentation/")

checkpoints_dir = 'checkpoints'


slim = tf.contrib.slim
vgg_checkpoint_path = os.path.join(checkpoints_dir, 'vgg_16.ckpt')

from tf_image_segmentation.utils.tf_records import read_tfrecord_and_decode_into_image_annotation_pair_tensors
from tf_image_segmentation.models.fcn_32s import FCN_32s, extract_vgg_16_mapping_without_fc8

from tf_image_segmentation.utils.pascal_voc import pascal_segmentation_lut

from tf_image_segmentation.utils.training import get_valid_logits_and_labels

from tf_image_segmentation.utils.augmentation import (distort_randomly_image_color,
                                                      flip_randomly_left_right_image_with_annotation,
                                                      scale_randomly_image_with_annotation_with_fixed_size_output)

image_train_size = [384, 384]
number_of_classes = 21
tfrecord_filename = 'pascal_augmented_train.tfrecords'
pascal_voc_lut = pascal_segmentation_lut()
class_labels = pascal_voc_lut.keys()

# Also represents the number of gpus -- should have enough of them on the current
# workstation
num_clones = 1

import time


def average_gradients(tower_grads):
    """Calculate the average gradient for each shared variable across all towers.
    Note that this function provides a synchronization point across all towers.
    Args:
    tower_grads: List of lists of (gradient, variable) tuples. The outer list
      is over individual gradients. The inner list is over the gradient
      calculation for each tower.
    Returns:
     List of pairs of (gradient, variable) where the gradient has been averaged
     across all towers.
    """
    average_grads = []
    for grad_and_vars in zip(*tower_grads):
        # Note that each grad_and_vars looks like the following:
        #   ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))
        grads = []
        for g, _ in grad_and_vars:
          # Add 0 dimension to the gradients to represent the tower.
          expanded_g = tf.expand_dims(g, 0)

          # Append on a 'tower' dimension which we will average over below.
          grads.append(expanded_g)

        # Average over the 'tower' dimension.
        grad = tf.concat(0, grads)
        grad = tf.reduce_mean(grad, 0)

        # Keep in mind that the Variables are redundant because they are shared
        # across towers. So .. we will just return the first tower's pointer to
        # the Variable.
        v = grad_and_vars[0][1]
        grad_and_var = (grad, v)
        average_grads.append(grad_and_var)
    return average_grads


filename_queue = tf.train.string_input_producer(
    [tfrecord_filename], num_epochs=1)

image, annotation = read_tfrecord_and_decode_into_image_annotation_pair_tensors(filename_queue)

# Various data augmentation stages
image, annotation = flip_randomly_left_right_image_with_annotation(image, annotation)

# image = distort_randomly_image_color(image)

resized_image, resized_annotation = scale_randomly_image_with_annotation_with_fixed_size_output(image, annotation, image_train_size)


resized_annotation = tf.squeeze(resized_annotation)

image_batch, annotation_batch = tf.train.shuffle_batch( [resized_image, resized_annotation],
                                             batch_size=10,
                                             capacity=3000,
                                             num_threads=4,
                                             min_after_dequeue=1000)


batch_queue = slim.prefetch_queue.prefetch_queue(
                  [image_batch, annotation_batch], capacity= 2 * num_clones)

variables_device = '/cpu:0'

with tf.device(variables_device):
    
    opt = tf.train.AdamOptimizer(learning_rate=0.000001)

    # Puts weights onto cpu
    with slim.arg_scope([slim.model_variable, slim.variable],
                         device=variables_device):
        
        tower_grads = []
        losses = []
        
        for gpu_number in xrange(num_clones):
            # Get different name for each copy which is being placed
            # on separate gpus.
            with tf.name_scope('gpu_number_{}_ops'.format(gpu_number)):

                # Actually place all operations on gpu
                with tf.device('/gpu:{}'.format(gpu_number)):

                    images_batch, annotation_batch = batch_queue.dequeue()

                    # here we actually want to also define loss
                    upsampled_logits_batch, vgg_16_variables_mapping = FCN_32s(image_batch_tensor=images_batch,
                                                                               number_of_classes=number_of_classes,
                                                                               is_training=True)
                    
                    # Reuse variables for the next tower.
                    tf.get_variable_scope().reuse_variables()

                    valid_labels_batch_tensor, valid_logits_batch_tensor = get_valid_logits_and_labels(annotation_batch_tensor=annotation_batch,
                                                                                         logits_batch_tensor=upsampled_logits_batch,
                                                                                        class_labels=class_labels)

                    cross_entropies = tf.nn.softmax_cross_entropy_with_logits(logits=valid_logits_batch_tensor,
                                                                              labels=valid_labels_batch_tensor)

                    # Normalize the cross entropy -- the number of elements
                    # is different during each step due to mask out regions
                    cross_entropy_sum = tf.reduce_mean(cross_entropies)

                    grad_vars = opt.compute_gradients(cross_entropy_sum)
                    
                    tower_grads.append(grad_vars)
                    losses.append(cross_entropy_sum)
                    
    averaged_grads_vars = average_gradients(tower_grads)
    
    averaged_loss = tf.reduce_mean(losses)
                
    # Here we apply our gradient on the cpu, by
    # collecting the gradients from each gpu and averaging them
    # and finally applying them at the end
    # todo: add global step to count the actual step number
    apply_grad_op = opt.apply_gradients(averaged_grads_vars)
    
    
# Variable's initialization functions
vgg_16_without_fc8_variables_mapping = extract_vgg_16_mapping_without_fc8(vgg_16_variables_mapping)


init_fn = slim.assign_from_checkpoint_fn(model_path=vgg_checkpoint_path,
                                         var_list=vgg_16_without_fc8_variables_mapping)

global_vars_init_op = tf.global_variables_initializer()

#The op for initializing the variables.
local_vars_init_op = tf.local_variables_initializer()

combined_op = tf.group(local_vars_init_op, global_vars_init_op)


with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True))  as sess:
    
    sess.run(combined_op)
    init_fn(sess)

    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    
    
    start = time.time()    

    for i in xrange(160):
    
        cross_entropy, _ = sess.run([ averaged_loss,
                                      apply_grad_op
                                    ])
        # Loss is going down -- was checked,
        # comment out to make sure
        # print( str(i) + " Current loss: "  + str(cross_entropy))
    
    end = time.time()
    print(end - start)
    
    coord.request_stop()
    coord.join(threads)   

263.757616043
