In [1]:
#!/usr/bin/env python3

In [2]:
# Input data files are available in the "../data/" directory.
import os
print(os.listdir("../data"))

[]


In [3]:
'''Trains a simple convnet on the MNIST dataset.
   Convert centralized training to distributed training using Horovod
'''

from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K


Horovod is a framework for distribution of neural network training that supports TensorFlow, PyTorch, MXNet, Keras, and Spark. It has advantage over native tensorflow and pytorch distribution schemes because it uses ring all-reduce algorithm, which makes communication between worker nodes more efficient. Conversion from centralized to distributed training scheme becomes a matter of several additional lines of code. Same distributed training script is executed for every worker and horovod manages host-specific configurations (number of workers/GPUs on each host, data distributed to each worker, communication and training coordination, etc.).

In this particular example, we use a Keras implementaion of MNIST classification training. Keras provides a high-level abstraction API for TensorFlow, which is not used directly in centralized training, but is required when we want to migrate to distributed training with Horovod.

We start by including tensorflow and horovod, followed by intialization

In [None]:
import tensorflow as tf
import horovod.keras as hvd

# Initialize Horovod
hvd.init()

In the following, we must extract information about available hardware and pin GPU devices to horovod workers. This is where tensorflow comes into play. While horovod supports both tensorflow v1 and v2, we use the second version as an example here.

First, get the list of all GPUs on your machine and then iterate through available devices to set the memory growth option. If memory growth is enabled for a PhysicalDevice, the runtime initialization will not allocate all memory on the device. Next, we need to make GPU devices needed for horovod workers visible. hvd.local_rank() gives us the number of workers on a host and we make exactly the same amount of GPUs visible, one of each worker in the local_rank. Local_rank for each host is given as an argument during runtime.

In [None]:
#  Get the list of all GPUs on a machine
gpus = tf.config.experimental.list_physical_devices('GPU')

# Set memory growth option to keep GPUs from initializing all memory at once
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

# Set the number of visible devices to be the same as horovod needs
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
    

Setting training parameters, batch size and number of classes stay the same for both centralized and distributed schemes

In [3]:
batch_size = 128
num_classes = 10

As this training script will be run on multiple workers at the same time, we need to choose the number of epochs to be executed on each worker. Since the number of workers may vary and also to efficiently scale the training process, we simply divide the total number of epochs by the number of horovod workers across all hosts.

In [3]:
import math
# Horovod: adjust number of epochs based on number of GPUs.
epochs = int(math.ceil(12.0 / hvd.size()))

Next, we load the MNIST dataset and prepare it for training by setting the correct order of dimensions in the input, normalize input image pixel values to be in [0, 1] range, and convert training and testing labels to be binary class matrices (one-hot encoding).

In [3]:
# input image dimensions
img_rows, img_cols = 28, 28

# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()

if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

# Normalize training data
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

Compose our classification neural network model. Two 2D convolutional layers are followed by max pooling. The output of this feature extraction is then flattened into a vector of size 128 that is put through a fully connected layer with ReLU activation. The final layer of the network is of the size of the number of classes and is activated with the softmax function.

In [3]:
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))
model.summary()

Both data and model preparation are core things in the training process, no matter whether its centralized or distributed. In the centralized training, next steps would be to compile and fit your model, but in order to make the process distributed, we need to add a distributed optimizer and also some callbacks. 

First, we take a regular optimizer like Adadelta, which is a more robust variant of Stochastic Gradient Descent (SGD) and provide it with a learning rate/step size adjusted by the number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.

In [3]:
# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.Adadelta(0.001 * hvd.size())

Then, we wrap the resulting optimizer into a horovod DistributedOptimizer.

In [3]:
# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)

Next, we compile our model with the horovod distributed optimizer and add the experimental_run_tf_function=False option to make sure tensorflow backend uses the horovod distributed optimizer

In [3]:
model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=opt,
              metrics=['accuracy'],
              experimental_run_tf_function=False
             )

After compiling the model, we must specify some horovod callbacks to be used during training. In particular, we want the main worker to broadcast the model (weights) to all other processes to ensure that all workers have the same starting variable, and also averge training metrics among workers after each epoch is finished. 

In [3]:
callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    
    # Horovod: average metrics among workers at the end of every epoch.
    hvd.callbacks.MetricAverageCallback(),
]

With a large dataset and a complex network model, the training process may take a long time and we often want to save some intermediate results of our training in case anything goes wrong during the long training process or to simply reprodice training results. In case of distributed training with Horovod, we only want the main worker to save model and this can be done with the following:

In [None]:
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
    callbacks.append(keras.callbacks.ModelCheckpoint('output/checkpoint-{epoch}.h5'))

As the final step, we attach horovod callbacks, and enable verbosity only for the main worker in the training process.

In [3]:
model.fit(x_train, y_train,
          batch_size=batch_size,
          callbacks=callbacks, # Horovod callbacks
          epochs=epochs,
          verbose=1 if hvd.rank() == 0 else 0, # Make only the main worker verbose 
          validation_data=(x_test, y_test))
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

Distributed Training:
With these last modifications to the model.fit function, we are ready to run this MNIST classification model training in a distributed fashion. Even though this code is written in Python, we can not simply start training with python distributed_train.py. Instead, we must use horovod to start distribute process. As an example, you can run distributed_train.py script on two local GPUs from console with the following command.

In [None]:
horovodrun -np 2 -H localhost:2 python distributed_train.py |& grep -v "Read -1"

horovodrun options:
    -np - number of processes to use
    -H - provide host information, structure is ip_address:worker#(worker# <= GPU#)
    

You can also run this example automatically as a Job, check the mlsteam.yml file

Centralized Training: 
Simply run the centralized training script with python in the terminal. 

In [None]:
python centralized_train.py

You can automatically start training with a new Job by using the mlsteam.yml file. Paste the above code line there.