# Horovod

[Horovod](https://github.com/horovod/horovod) is an open source tool originally developed by Uber to support their need for faster deep learning model training. It is a distributed deep learning training framework that works with TensorFlow, Keras, PyTorch, and Apache MXNet. Uber decided to develop a solution that uses [MPI](https://en.wikipedia.org/wiki/Message_Passing_Interface) for distributed process communication, and the [NVIDIA Collective Communications Library (NCCL)](https://developer.nvidia.com/nccl) for its highly optimized implementation of reductions across distributed processes and nodes.

In this session we will take a well known deep learning image classification model, distribute the training data (Fashion-MNIST) across multiplte GPUs on multiple nodes and train the model.

### Horovod's MPI Roots

Horovod assigns a unique numerical ID or **rank** (an MPI concept) to each process executing the program. This rank can be accessed programmatically. As you will see below when writing Horovod code, by identifying a process's rank programmatically in the code we can take steps such as:

- Pin that process to its own exclusive GPU.
- Utilize a single rank for broadcasting values that need to be used uniformly by all ranks.
- Utilize a single rank for collecting and/or reducing values produced by all ranks.
- Utilize a single rank for logging or writing to disk.

Before we go into modifications required to turn our serial training implementation into a parallel implementation, let's take a look at the fashion_mnist.py file to familiarize ourselves with the task at hand.

### The Fashion-MNIST Dataset

The [Fashion-MNIST dataset](https://github.com/zalandoresearch/fashion-mnist) is a response to the traditional MNIST dataset, which is often referred to as the "hello world" of machine learning. The original MNIST dataset consists of 60,000 pictures of handwritten digits, 0-9. One of the downsides of this dataset is its simplicity. Good performance of a model on the dataset does not indicate that the model will perform well on a more complicated set of images.

The Fashion-MNIST dataset was created to be a moderately more complex image classification challenge. It follows the same format as the original MNIST set, with 10 categories and 60,000 training images, each 28x28 pixels (plus 10,000 testing images). We'll be training on this dataset for this lab. 

<img src="./images/Fashion MNIST.png"> 

## Modify the Training Script

You are going to start making modifications to the training script. In case you want to go back to the original Python file, I've provided a copy for you named fashion_mnist_original.py

### 1. Initialize Horovod and Select the GPU to Run On

Let's begin by importing Horovod by it's common alias `hvd`. We will be using Tensorflow with Keras. Horovod has a backend for each implementation it supports, including Tensorflow and Keras.

**Exercise**: Add `import horovod.tensorflow.keras as hvd` to the training script and initialize Horovod before the argument parsing:

```python
# Horovod: initialize Horovod.
hvd.init()
```

(look for the `TODO: Step 1` lines).

With Horovod, which can run multiple processes across multiple GPUs, you typically use a single GPU per training process. Horovod uses much of the MPI nomenclature. The concept of a **rank** in MPI is of a unique process ID.
Schematically, let's look at how MPI can run multiple GPU processes across multiple nodes. Note how each process, or rank, is pinned to a specific GPU:

<img src="https://user-images.githubusercontent.com/16640218/53518255-7d5fc300-3a85-11e9-8bf3-5d0e8913c14f.png" width="400"></img>

With this method we do not have to deal with placing specific data on specific GPUs. Instead, you just specify which GPU you would like to use in the beginning of your script. 

On the NVIDIA platform, CUDA, if we have N GPUs they are uniquely numbered from 0 to N-1.

```python
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
```

`list_physical_devices` returns an array of the GPUs (GPUS are "devices", in TensorFlow) that this TensorFlow process can see and from which we must select one to use.

In this example we are using the `set_memory_growth` option. This tells TensorFlow to start with the minimum amount of GPU memory needed to start, and to allocate more on demand (like when the network is initialized). This is not strictly related to Horovod, but is a commonly used option when working with GPUs in TensorFlow. Since we need to request a fixed amount of resources on the VSC, this does not have an effect (it doesn't hurt though and is general good practice).

Now let's modify the above code such that Horovod can automatically do the right thing for any number of training processes. Programmatically, we can arbitrarily select the GPU that corresponds to the Horovod rank and use that one. Since we might be using multiple nodes, and the Horovod rank is a unique identifier across all ranks in the training process, we want to identify our rank locally on the node, which is provided by the `local_rank` specifier. Then we provide the `local_rank` to the function `set_visible_devices` which controls the set of GPUs that are available to that rank:

```python
# Horovod: pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
```

You can have a look at the documentation for both functions:

In [None]:
import horovod.tensorflow.keras as hvd

In [None]:
?hvd.rank

In [None]:
?hvd.local_rank

**Exercise**: with this knowledge in hand, edit `fashion_mnist.py` to pin one GPU to each rank using its local rank ID, immediately after where you have already initialized Horovod.

Look for `TODO: Step 1` lines in the code. If you get stuck, refer to `solutions/fashion_mnist_after_step_01.py`.

### 2. Print Verbose Logs Only on the First Worker

We do not want that all N TensorFlow processes print their progress the output. We only want to see the state of the output once at any given time. To accomplish this, we can arbitrarily select a single rank to display the training progress. By convention, we typically call rank 0 the "root" rank and use it for logistical work such as I/O when only one rank is required.

**Exercise**: Edit `fashion_mnist.py` so that you only set `verbose = 1` if it is the first worker (with rank equal to 0) executing the code.

Look for `TODO: Step 2` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_02.py`.

### 3. Add Distributed Optimizer

If we left it at that, each process would be running completely independently, which is not data parallel training, but multiple processes running serial training at the same time. The key step to make the training data parallel is to average out gradients across all workers, so that all workers are updating with the same gradients and thus moving in the same direction. Horovod implements an operation that averages gradients across workers. Deploying this in your code is very straightforward and just requires wrapping an existing optimizer (`tensorflow.keras.optimizers.Optimizer`) with a Horovod distributed optimizer (`horovod.tensorflow.keras.DistributedOptimizer`).

In [None]:
?hvd.DistributedOptimizer

**Exercise**: wrap the optimizer (`opt` in `fashion_mnist.py`) with a Horovod distributed optimizer.

Look for `TODO: Step 3` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_03.py`.

### 4. Initialize Random Weights on Only One Processor

Data parallel stochastic gradient descent, at least in its traditionally defined sequential algorithm, requires weights to be synchronized between all processors. We already know that this is accomplished for backpropagation by averaging out the gradients among all processors prior to the weight updates. Then the only other required step is for the weights to be synchronized initially. Assuming we start from the beginning of the training, this means that every processor needs to have the same random weights.

The first worker needs to broadcast parameters to the rest of the workers.  We will use `horovod.tensorflow.keras.callbacks.BroadcastGlobalVariablesCallback` to make this happen. Execute the following cell to get more information about the method:

In [None]:
?hvd.callbacks.BroadcastGlobalVariablesCallback

**Exercise**: append this callback to our list of callbacks. Note the argument required for this callback, the rank of the root worker.Introducing this callback causes a TensorFlow warning, which you can disregard.

Look for `TODO: Step 4` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_04.py`.

### 5. Modify Training Loop to Execute Fewer Steps Per Epoch

As it stands, we are running the same number of steps per epoch for the serial training implementation. But since we have increased the number of workers by a factor of N, that means we're doing N times more work (when we sum the amount of work done over all processes). Our target was to get the *same* answer in less time (that is, to speed up the training), so we want to keep the total amount of work done the same (that is, to process the same number of examples in the dataset). This means we need to do a factor of N *fewer* steps per epoch, so the number of steps goes to `steps_per_epoch / number_of_workers`.

We will also speed up validation by validating `3 * num_test_iterations / number_of_workers` steps on each worker. While we could just do `num_test_iterations / number_of_workers` on each worker to get a linear speedup in the validation, the multiplier **3** provides over-sampling of the validation data and helps to increase the probability that every validation example will be evaluated.

In [None]:
?hvd.size

**Exercise**: modify the `steps_per_epoch` and `validation_steps` arguments for `model.fit_generator` to follow the plan just outlined. This environment uses Python 3, and each of these arguments expect integers, so take care to round any potential floating point values down to the nearest integer.

Look for `TODO: Step 5` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_05.py`.

### 6. Average Validation Results Among Workers

Since we are not validating the full dataset on each worker anymore, each worker will have different validation results. To improve validation metric quality and reduce variance, we will average validation results among all workers.

To do so, we can use `horovod.keras.callbacks.MetricAverageCallback`. Execute the following cell to get more information:

In [None]:
?hvd.callbacks.MetricAverageCallback

**Exercise**: average the metrics among workers at the end of every epoch by injecting `MetricAverageCallback` after `BroadcastGlobalVariablesCallback`. Please note that this callback must be in the list before other metrics-based callbacks, `ReduceLROnPlateau`, `TensorBoard`, etc.

Look for `TODO: Step 6` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_06.py`.

### 7. Do Checkpointing Logic Only Using the Root Worker

Checkpointing is a common activity in production DL training (for simplicity we are not relying on it in this lab, however). This is extremely important for "defensive I/O" purposes (so that if a process or node fails, we don't lose all our work), and is also useful for being able to chart the progress of our training run. Let's think about how that works in the multi-process context.

The most important issue is that there can be a race condition while writing the checkpoint to a file. If every rank finishes the epoch at the same time, they might be writing to the same filename, and this could result in corrupted data. But more to the point, we don't even need to do this: by construction in synchronous data parallel SGD, every rank has the same copy of the weights at all times, so only one worker needs to write the checkpoint. As usual, our convention will be that the root worker (rank 0) handles this.

For the same reason, if we are restarting from a checkpoint later on, we don't need every rank to read in the checkpoint -- only one rank needs to do so, and then it can broadcast the data to all the other workers. (In that case, all the workers do still need to instantiate the same model as the one in the checkpoint.) We also often don't want every rank to read in the checkpoint -- at large enough scale, having thousands of processes all read from the same file on disk can be inefficient. You might also be in a situation where only one server node has the data available, so the broadcast is necessary in that case.

We already encountered broadcasting in the form of a callback in Step 4. But with Horovod we can take direct control and broadcast (that is, send some data from one processor to every other processor) a specific scalar or tensor.

In [None]:
?hvd.broadcast

The training script has a function `restart_epoch()` which looks for the latest checkpoint we have created and returns the epoch number corresponding to that checkpoint. By default, every rank is resuming from epoch 0 (look for the `resume_from_epoch` variable), which means start from the beginning without loading a checkpoint. We can run the `restart_epoch()` function to actively look for checkpoints, but we only need to do this on one rank, and then we can have that same rank broadcast that epoch number to all other ranks, and also read in the checkpoint and broadcast the checkpointed weights (this is already handled by the `BroadcastGlobalVariablesCallback` we implemented in Step 4).

**Exercise**: edit `fashion_mnist.py` so that after

```python
resume_from_epoch = 0
```

you:

(1) update the `resume_from_epoch` variable on rank 0 (the root process) using the `restart_epoch()` function;

(2) broadcast the value of this data to all other processes; and,

(3) uncomment the checkpointing callbacks, and make sure they're only appended on rank 0.

Use the docstring printed above to assist your work in getting the syntax right.

Look for `TODO: Step 7` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_07.py`.

If you make any mistakes, you can simply delete the logs directory and try again.

In [None]:
!rm -rf logs

In this short introduction, we will leave it at that and try to train our model in parallel. For that we will use the the fashion_mnist_solution.py file. We just need to write or complete our job script.
However, should you be interested in the rest of the adjustments, feel free to read through the remaining notebook in your own time.