<img src="./images/DLI_Header.png">

# Scaling Deep Learning Workflows with Horovod

[Horovod](https://github.com/horovod/horovod) is a distributed deep learning training framework. It is available for TensorFlow, Keras, PyTorch, and Apache MXNet. In this lab you will learn about what Horovod is and how to use it, by distributing across multiple GPUs the training of an image classification model.

## Lab Outline

The progression of this lab is as follows:

- A high level introduction to Horovod, including its ties to the parallel computing protocol MPI, and the additional details that must be taken into account when using a parallel computing framework like Horovod.
- An overview and initial run of the existing code base that you will be refactoring with Horovod, which is a classification model using Keras and the Fashion-MNIST dataset, currently built to run on a single GPU.
- A multi-step refactor of the existing code base so that it uses Horovod to run distributed across this environment's available GPUs, introducing Horovod concepts and techniques throughout.
- A final run of the refactored and distributed code base, with discussion of its speed up.

This lab draws heavily on content provided in the [Horovod tutorials](https://github.com/horovod/tutorials).

## Learning Objectives

By the time you complete this lab you will be able to:

- Discuss what Horovod is, how it works, and why it is an effective tool for distributed training.
- Use Horovod to refactor or build deep learning models that train distributed across multiple GPUs.

## Introduction to Horovod

[Horovod](https://github.com/horovod/horovod) is an open source tool originally [developed by Uber](https://eng.uber.com/horovod/) to support their need for faster deep learning model training across their many engineering teams. It is part of a growing ecosystem of approaches to distributed training, including for example [Distributed TensorFlow](https://www.tensorflow.org/deploy/distributed). Uber decided to develop a solution that utilized [MPI](https://en.wikipedia.org/wiki/Message_Passing_Interface) for distributed process communication, and the [NVIDIA Collective Communications Library (NCCL)](https://developer.nvidia.com/nccl) for its highly optimized implementation of reductions across distributed processes and nodes. The resulting Horovod package delivers on its promise to scale deep learning model training across multiple GPUs and multiple nodes, with only minor code modification and intuitive debugging.

Since its inception in 2017 Horovod has matured significantly, extending its support from just TensorFlow to Keras, PyTorch, and Apache MXNet. Horovod is extensively tested and has been used on some of the largest DL training runs done to date, for example, supporting **exascale** deep learning on the [Summit system, scaling to over **27,000 V100 GPUs**](https://arxiv.org/pdf/1810.01993.pdf):

![horovod scaling](./images/horovod_scaling.jpg)

Let's import Horovod now so that we can query it later on. The convention is to import it as `hvd`. In this lab we will be using Keras, and Horovod has a backend for each implementation it supports, including Keras.

In [2]:
import horovod.tensorflow.keras as hvd

### Horovod's MPI Roots

Horovod's connection to MPI runs deep, and for programmers familiar with MPI programming, much of what you program to distribute model training with Horovod will feel very familiar. For those unfamiliar with MPI programming, a brief discussion of some of the conventions and considerations required when distributing processes with Horovod, or MPI, is worthwhile.

Horovod, as with MPI, strictly follows the [Single-Program Multiple-Data (SPMD) paradigm](https://en.wikipedia.org/wiki/SPMD) where we implement the instruction flow of multiple processes in the same file/program. Because multiple processes are executing code in parallel, we have to take care about [race conditions](https://en.wikipedia.org/wiki/Race_condition) and also the synchronization of participating processes.

Horovod assigns a unique numerical ID or **rank** (an MPI concept) to each process executing the program. This rank can be accessed programmatically. As you will see below when writing Horovod code, by identifying a process's rank programmatically in the code we can take steps such as:

- Pin that process to its own exclusive GPU.
- Utilize a single rank for broadcasting values that need to be used uniformly by all ranks.
- Utilize a single rank for collecting and/or reducing values produced by all ranks.
- Utilize a single rank for logging or writing to disk.

As you work through this course, keep these concepts in mind and especially that Horovod will be sending your single program to be executed in parallel by multiple processes. Keeping this in mind will support your intuition and understanding about why we do what we do with Horovod, even though you will only be making edits to a single program.

## Baseline: Train the Model

Before we go into modifications required to turn our serial training implementation into a parallel implementation, please make sure you can train the single GPU version of the model. We'll just run a few epochs with a relatively large batch size. This will take a few minutes, so go ahead and start the training, then read ahead to understand what model and dataset we are using. Take note of how long the training took when it is done.

In [3]:
!python fashion_mnist.py --epochs 5 --batch-size 512

Epoch 1/5
Images/sec: 1117.51
Cumulative training time after epoch 1: 53.72
Epoch 2/5
Images/sec: 1614.36
Cumulative training time after epoch 2: 90.89
Epoch 3/5
Images/sec: 1603.92
Cumulative training time after epoch 3: 128.3
Epoch 4/5
Images/sec: 1601.78
Cumulative training time after epoch 4: 165.76
Epoch 5/5
Images/sec: 1599.71
Cumulative training time after epoch 5: 203.28
Cumulative training time: 203.28
Test loss: 12.923389720916749
Test accuracy: 0.1991


## Overview of Existing Model Files

On the left hand side of this lab environment, you will see a file directory with this notebook, a Python file and a `solutions` directory.

The file `fashion_mnist.py` contains the Keras model that does not have any Horovod code while `solutions/fashion_mnist_solution.py` has all the Horovod features added. In this tutorial, we will guide you to transform `fashion_mnist.py` into `solutions/fashion_mnist_solution.py` step-by-step. As you work through exercises to complete this task, you can, if needed compare your code with the `solutions/fashion_mnist_after_step_N.py` files that correspond to the step you are at.

Take a couple of minutes to read through the `fashion_mnist.py`, familiarizing yourself with all that is happening in this original implementation. We assume your prerequisite understanding of deep learning is sufficient to understand what this initial code is doing; however, if anything is unfamiliar to you, consider taking the time to look up unfamiliar terms or methods.

### The Fashion-MNIST Dataset

The [Fashion-MNIST dataset](https://github.com/zalandoresearch/fashion-mnist) is a response to the traditional MNIST dataset, which is often referred to as the "hello world" of machine learning. The original MNIST dataset consists of 60,000 pictures of handwritten digits, 0-9. One of the downsides of this dataset is its simplicity. Good performance of a model on the dataset does not indicate that the model will perform well on a more complicated set of images.

The Fashion-MNIST dataset was created to be a moderately more complex image classification challenge. It follows the same format as the original MNIST set, with 10 categories and 60,000 training images, each 28x28 pixels (plus 10,000 testing images). We'll be training on this dataset for this lab. 

<img src="./images/Fashion MNIST.png"> 

### The Wide ResNet Model

We'll be using a Wide Residual Network to train on this dataset, which is a convolutional neural network proven to perform very well in image classification challenges. Feel free to take some time to learn more about [wide residual networks](https://arxiv.org/abs/1605.07146), the original [residual networks](https://arxiv.org/abs/1512.03385) they are based on, or about [convolutional neural networks](https://developer.nvidia.com/discover/convolutional-neural-network) in general.

<img src="./images/wideresnet.png"> 

In the early days of CNNs, the community drove towards very deep models (many tens or hundreds of layers), but as computing power advanced and algorithms improved, in particular after the idea of the residual block was demonstrated, it became more desirable to swing back towards shallower networks with wider layers, which was the primary innovation of the WideResNet family of models. The WideResNet-16-10 we will use below can achieve with O(10 million) parameters accuracy that is competitive with much deeper networks with more parameters.

## Modify the Training Script

We are going to start making modifications to the training script. Before we do, let's make a copy of it on disk -- that way, if you make a mistake and want to back up to the beginning, you have a reference copy to refer to.

In [4]:
!cp fashion_mnist.py fashion_mnist_original.py

Double-click `fashion_mnist.py` in the left pane to open it in the editor.

### 1. Initialize Horovod and Select the GPU to Run On

Naturally we need to start by importing Horovod. 

**Exercise**: Add `import horovod.keras as hvd` to the training script and initialize Horovod before the argument parsing:

```python
# Horovod: initialize Horovod.
hvd.init()
```

(look for the `TODO: Step 1` lines).

With Horovod, which can run multiple processes across multiple GPUs, you typically use a single GPU per training process. Part of what makes Horovod simple to use is that it utilizes MPI, and as such, uses much of the MPI nomenclature. The concept of a **rank** in MPI is of a unique process ID. In this lab we will be using the term "rank" extensively. If you would like to know more about MPI concepts that are utilized heavily in Horovod, please refer to [the Horovod documentation](https://github.com/horovod/horovod/blob/master/docs/concepts.rst).

Schematically, let's look at how MPI can run multiple GPU processes across multiple nodes. Note how each process, or rank, is pinned to a specific GPU:

<img src="https://user-images.githubusercontent.com/16640218/53518255-7d5fc300-3a85-11e9-8bf3-5d0e8913c14f.png" width="400"></img>

With this method we do not have to deal with placing specific data on specific GPUs. Instead, you just specify which GPU you would like to use in the beginning of your script. 

Before we get there, let's refresh ourselves on how to work with multiple GPUs on a node. On the NVIDIA platform, CUDA, if we have N GPUs they are uniquely numbered from 0 to N-1. In this lab we won't worry about how the numbering is selected or whether the order matters. Here is an example of working with multiple GPUs:

```python
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
```

`list_physical_devices` returns an array of the GPUs (GPUS are "devices", in TensorFlow parlance) that this TensorFlow process is allowed to know about, from which we must select one to use. (CUDA enables you to restrict which GPUs a process can see, if you desire, but by default all GPUs on the server are visible to TensorFlow.) If you don't specify a GPU, then TensorFlow will use the first one in the list, which won't work in our data parallel use case where every process needs to use its own GPU.

If you want to use multiple GPUs from this list, you can do so by manually controlling which device to use (see TensorFlow's [documentation](https://www.tensorflow.org/guide/gpu) for more information). Typically one would do so when manual control over the distribution of data is needed, and a common use case is model parallelism (which Horovod [does not natively support](https://github.com/horovod/horovod/issues/96)). In this lab we will only be using one GPU per rank.

In this example we are using the `set_memory_growth` option. This tells TensorFlow to start with the minimum amount of GPU memory needed to start, and to allocate more on demand (like when the network is initialized). This is not strictly related to Horovod, but is a commonly used option when working with GPUs in TensorFlow.

Now let's test your understanding of how this works. First, let's identify how many GPUs are on our node:

In [1]:
!nvidia-smi

Thu Nov 11 19:06:07 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.33.01    Driver Version: 440.33.01    CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla V100-SXM2...  On   | 00000000:00:1B.0 Off |                    0 |
| N/A   35C    P0    36W / 300W |      0MiB / 16160MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla V100-SXM2...  On   | 00000000:00:1C.0 Off |                    0 |
| N/A   35C    P0    36W / 300W |      0MiB / 16160MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  Tesla V100-SXM2...  On   | 00000000:00:1D.0 Off |                    0 |
| N/A   

The "!" prefix means that we execute the above in the terminal; now let's do this in actual terminal. Open a new launcher (File > New Launcher in the menu bar), select the "Terminal" option, execute the `nvidia-smi` command there, and verify it provides the same output. Notice that there is a "GPU-Util" column, which measures the GPU's utilization. It tells you what fraction of the last second the GPU was in use. We can thus easily monitor GPU activity by regularly checking this output. One way to do that is using the Linux utility [watch](https://en.wikipedia.org/wiki/Watch_(Unix)): `watch -n 5 nvidia-smi` will set up a loop that refreshes the `nvidia-smi` output every 5 seconds. (You can also use the option `nvidia-smi --loop=5` to do this directly in the tool.) Make sure you run that in the separate terminal window, not here in the notebook, because the notebook can only run one process at a time. You can type Ctrl+C in the terminal to end the loop later.

**Exercise**: set up `nvidia-smi` to regularly monitor the GPU activity in a terminal as above, and then here in the notebook start a training process. Then switch back to the terminal and watch the GPU activity. Can you verify that only one GPU is used? Does it match the GPU ID you asked for in the training script? Also, keep an eye out for other utilization metrics like power consumption and memory usage.

In [8]:
!horovodrun -np 1 python fashion_mnist.py --epochs 1 --batch-size 512

Images/sec: 1083.22
Cumulative training time after epoch 1: 55.4
Cumulative training time: 55.4
Test loss: 11.354074382781983
Test accuracy: 0.2848


Now let's modify the above code such that Horovod can automatically do the right thing for any number of training processes. Programmatically, we can arbitrarily select the GPU that corresponds to the Horovod rank and use that one. Since we might be using multiple nodes, and the Horovod rank is a unique identifier across all ranks in the training process, we want to identify our rank locally on the node, which is provided by the `local_rank` specifier. Then we provide the `local_rank` to the function `set_visible_devices` which controls the set of GPUs that are available to that rank:

```python
# Horovod: pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
```

Check out the documentation for both functions:

In [9]:
?hvd.rank

[0;31mSignature:[0m [0mhvd[0m[0;34m.[0m[0mrank[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
A function that returns the Horovod rank of the calling process.

Returns:
  An integer scalar with the Horovod rank of the calling process.
[0;31mFile:[0m      /usr/local/lib/python3.6/dist-packages/horovod/common/basics.py
[0;31mType:[0m      method


In [10]:
?hvd.local_rank

[0;31mSignature:[0m [0mhvd[0m[0;34m.[0m[0mlocal_rank[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
A function that returns the local Horovod rank of the calling process, within the
node that it is running on. For example, if there are seven processes running
on a node, their local ranks will be zero through six, inclusive.

Returns:
  An integer scalar with the local Horovod rank of the calling process.
[0;31mFile:[0m      /usr/local/lib/python3.6/dist-packages/horovod/common/basics.py
[0;31mType:[0m      method


**Exercise**: with this knowledge in hand, edit `fashion_mnist.py` to pin one GPU to each rank using its local rank ID, immediately after where you have already initialized Horovod.

Again, look for `TODO: Step 1` lines in the code. If you get stuck, refer to `solutions/fashion_mnist_after_step_01.py`.

**Exercise**: Now let's test that you got this right too. Run the training script for just one epoch to make sure everything works. We'll get into the habit of launching with the Horovod job launcher, `horovodrun`, even though for a single process run this is unnecessary. Using `nvidia-smi` in the terminal, verify that only one training process is running, and note which GPU is being used. Is it the one you expect?

In [11]:
!horovodrun -np 1 python fashion_mnist.py --epochs 1 --batch-size 512

Images/sec: 1081.44
Cumulative training time after epoch 1: 55.49
Cumulative training time: 55.49
Test loss: 2.6879443407058714
Test accuracy: 0.4592


`horovodrun` is a script that launches N copies of the training script, where N is the argument to `-np`. (For those familiar with MPI, it is a thin wrapper around `mpirun`, and in fact it is straightforward to distribute the training using `mpirun` with the right flags.) We'll be using it later to coordinate the training process. Because the processes are launched in the MPI environment, they can communicate between each other through a standardized API that Horovod handles for us, though we haven't instructed the training script to actually coordinate yet; at present we will just launch N independent copies of the same training script. We can try this out now by running as many processes (ranks) as there are GPUs. Watch the output and see if the training process appears to be coordinated -- does the training actually work? Do all of the processes run on the GPU you expect them to, and does that match the output of `nvidia-smi`?

In [12]:
num_gpus = 4

In [13]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 1 --batch-size 512

Images/sec: 1090.81
Cumulative training time after epoch 1: 55.05
Cumulative training time: 55.05

Images/sec: 1088.01
Cumulative training time after epoch 1: 55.18
Cumulative training time: 55.18

Images/sec: 1082.48
Cumulative training time after epoch 1: 55.47
Cumulative training time: 55.47

Images/sec: 1080.73
Cumulative training time after epoch 1: 55.58
Cumulative training time: 55.58
Test loss: 2.70180059671402
Test accuracy: 0.4339
Test loss: 12.90794243812561
Test accuracy: 0.2
Test loss: 1.208978646993637
Test accuracy: 0.6021
Test loss: 12.972663974761963
Test accuracy: 0.1871


### 2. Print Verbose Logs Only on the First Worker

You probably noticed that all N TensorFlow processes printed their progress to stdout (standard output). This results in confusing output -- we only want to see the state of the output once at any given time. To accomplish this, we can arbitrarily select a single rank to display the training progress. By convention, we typically call rank 0 the "root" rank and use it for logistical work such as I/O when only one rank is required.

**Exercise**: Edit `fashion_mnist.py` so that you only set `verbose = 1` if it is the first worker (with rank equal to 0) executing the code.

Look for `TODO: Step 2` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_02.py`.

Re-run the training session to make sure that you now see the expected output. We'll run for 3 epochs this time for comparison to the next exercise. While it's running, you can start working on Step 3.

In [14]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 3 --batch-size 512

Epoch 1/3
Images/sec: 1081.7
Cumulative training time after epoch 1: 55.51
Epoch 2/3
Images/sec: 1608.47
Cumulative training time after epoch 2: 92.82
Epoch 3/3
Images/sec: 1609.67
Cumulative training time after epoch 3: 130.1
Cumulative training time: 130.1
Test loss: 12.929070806503296
Test accuracy: 0.1991


### 3. Add Distributed Optimizer

In the previous two sections we ran with multiple processes, but each process was running completely independently -- this is not data parallel training, it is just multiple processes running serial training at the same time. The key step to make the training data parallel is to average out gradients across all workers, so that all workers are updating with the same gradients and thus moving in the same direction. Horovod implements an operation that averages gradients across workers. Deploying this in your code is very straightforward and just requires wrapping an existing optimizer (`keras.optimizers.Optimizer`) with a Horovod distributed optimizer (`horovod.keras.DistributedOptimizer`).

In [15]:
?hvd.DistributedOptimizer

[0;31mSignature:[0m
[0mhvd[0m[0;34m.[0m[0mDistributedOptimizer[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0moptimizer[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mname[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mdevice_dense[0m[0;34m=[0m[0;34m''[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mdevice_sparse[0m[0;34m=[0m[0;34m''[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mcompression[0m[0;34m=[0m[0;34m<[0m[0;32mclass[0m [0;34m'horovod.tensorflow.compression.NoneCompressor'[0m[0;34m>[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msparse_as_dense[0m[0;34m=[0m[0;32mFalse[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
An optimizer that wraps another keras.optimizers.Optimizer, using an allreduce to
average gradient values before applying gradients to model weights.

Args:
    optimizer: Optimizer to use for computing gradients and applying updates.
    name: Optional name prefix for

**Exercise**: wrap the optimizer (`opt` in `fashion_mnist.py`) with a Horovod distributed optimizer.

Look for `TODO: Step 3` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_03.py`.

Re-run the training now and see if you get a reasonable answer. Is the accuracy any better? Note that we are only training for a few epochs, and the results will depend on the initial random weights, so do not draw any strong conclusions here.

In [16]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 3 --batch-size 512

Epoch 1/3
Images/sec: 1032.74
Cumulative training time after epoch 1: 58.13
Epoch 2/3
Images/sec: 1573.41
Cumulative training time after epoch 2: 96.27
Epoch 3/3
Images/sec: 1556.52
Cumulative training time after epoch 3: 134.83
Cumulative training time: 134.83
Test loss: 0.7015880823135376
Test accuracy: 0.767


### 4. Initialize Random Weights on Only One Processor

Data parallel stochastic gradient descent, at least in its traditionally defined sequential algorithm, requires weights to be synchronized between all processors. We already know that this is accomplished for backpropagation by averaging out the gradients among all processors prior to the weight updates. Then the only other required step is for the weights to be synchronized initially. Assuming we start from the beginning of the training (we'll handle checkpoint/restart later), this means that every processor needs to have the same random weights.

In a previous section, we mentioned that the first worker would broadcast parameters to the rest of the workers.  We will use `horovod.keras.callbacks.BroadcastGlobalVariablesCallback` to make this happen. Execute the following cell to get more information about the method:

In [17]:
?hvd.callbacks.BroadcastGlobalVariablesCallback

[0;31mInit signature:[0m [0mhvd[0m[0;34m.[0m[0mcallbacks[0m[0;34m.[0m[0mBroadcastGlobalVariablesCallback[0m[0;34m([0m[0mroot_rank[0m[0;34m,[0m [0mdevice[0m[0;34m=[0m[0;34m''[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
Keras Callback that will broadcast all global variables from root rank
to all other processes during initialization.

This is necessary to ensure consistent initialization of all workers when
training is started with random weights or restored from a checkpoint.
[0;31mInit docstring:[0m
Construct a new BroadcastGlobalVariablesCallback that will broadcast all
global variables from root rank to all other processes during initialization.

Args:
    root_rank: Rank that will send data, other ranks will receive data.
    device: Device to be used for broadcasting. Uses GPU by default
            if Horovod was build with HOROVOD_GPU_BROADCAST.
[0;31mFile:[0m           /usr/local/lib/python3.6/dist-packages/horovod/tensorflow/

**Exercise**: append this callback to our list of callbacks. Note the argument required for this callback, the rank of the root worker. Note that introducing this callback causes a TensorFlow warning, which you can disregard.

Look for `TODO: Step 4` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_04.py`.

Again, run the training session for just a few epochs to make sure things work, and notice if it affected the outcome.

In [18]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 3 --batch-size 512

Epoch 1/3
Images/sec: 1004.99
Cumulative training time after epoch 1: 59.74
Epoch 2/3
Images/sec: 1562.34
Cumulative training time after epoch 2: 98.15
Epoch 3/3
Images/sec: 1573.49
Cumulative training time after epoch 3: 136.29
Cumulative training time: 136.29
Test loss: 0.6424222737550735
Test accuracy: 0.7811


### 5. Modify Training Loop to Execute Fewer Steps Per Epoch

As it stands, we are running the same number of steps per epoch for the serial training implementation. But since we have increased the number of workers by a factor of N, that means we're doing N times more work (when we sum the amount of work done over all processes). Our target was to get the *same* answer in less time (that is, to speed up the training), so we want to keep the total amount of work done the same (that is, to process the same number of examples in the dataset). This means we need to do a factor of N *fewer* steps per epoch, so the number of steps goes to `steps_per_epoch / number_of_workers`.

We will also speed up validation by validating `3 * num_test_iterations / number_of_workers` steps on each worker. While we could just do `num_test_iterations / number_of_workers` on each worker to get a linear speedup in the validation, the multiplier **3** provides over-sampling of the validation data and helps to increase the probability that every validation example will be evaluated.

In [19]:
?hvd.size

[0;31mSignature:[0m [0mhvd[0m[0;34m.[0m[0msize[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
A function that returns the number of Horovod processes.

Returns:
  An integer scalar containing the number of Horovod processes.
[0;31mFile:[0m      /usr/local/lib/python3.6/dist-packages/horovod/common/basics.py
[0;31mType:[0m      method


**Exercise**: modify the `steps_per_epoch` and `validation_steps` arguments for `model.fit_generator` to follow the plan just outlined. This environment uses Python 3, and each of these arguments expect integers, so take care to round any potential floating point values down to the nearest integer.

Look for `TODO: Step 5` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_05.py`.

In [20]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 3 --batch-size 512

Epoch 1/3
Images/sec: 2067.52
Cumulative training time after epoch 1: 29.07
Epoch 2/3
Images/sec: 5680.95
Cumulative training time after epoch 2: 39.63
Epoch 3/3
Images/sec: 5631.59
Cumulative training time after epoch 3: 50.29
Cumulative training time: 50.29
Test loss: 0.974322971701622
Test accuracy: 0.6612


### 6. Average Validation Results Among Workers

Since we are not validating the full dataset on each worker anymore, each worker will have different validation results. To improve validation metric quality and reduce variance, we will average validation results among all workers.

To do so, we can use `horovod.keras.callbacks.MetricAverageCallback`. Execute the following cell to get more information:

In [21]:
?hvd.callbacks.MetricAverageCallback

[0;31mInit signature:[0m [0mhvd[0m[0;34m.[0m[0mcallbacks[0m[0;34m.[0m[0mMetricAverageCallback[0m[0;34m([0m[0mdevice[0m[0;34m=[0m[0;34m''[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
Keras Callback that will average metrics across all processes at the
end of the epoch. Useful in conjuction with ReduceLROnPlateau,
TensorBoard and other metrics-based callbacks.

Note: This callback must be added to the callback list before the
ReduceLROnPlateau, TensorBoard or other metrics-based callbacks.
[0;31mInit docstring:[0m
Construct a new MetricAverageCallback that will average metrics
across all processes at the end of the epoch.

Args:
    device: Device to be used for allreduce. Uses GPU by default
            if Horovod was build with HOROVOD_GPU_ALLREDUCE.
[0;31mFile:[0m           /usr/local/lib/python3.6/dist-packages/horovod/tensorflow/keras/callbacks.py
[0;31mType:[0m           type
[0;31mSubclasses:[0m     


**Exercise**: average the metrics among workers at the end of every epoch by injecting `MetricAverageCallback` after `BroadcastGlobalVariablesCallback`. Please note that this callback must be in the list before other metrics-based callbacks, `ReduceLROnPlateau`, `TensorBoard`, etc.

Look for `TODO: Step 6` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_06.py`.

In [22]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 3 --batch-size 512

Epoch 1/3
Images/sec: 2109.68
Cumulative training time after epoch 1: 28.48
Epoch 2/3
Images/sec: 5885.99
Cumulative training time after epoch 2: 38.74
Epoch 3/3
Images/sec: 5713.81
Cumulative training time after epoch 3: 49.31
Cumulative training time: 49.35
Test loss: 1.2392199099063874
Test accuracy: 0.5809


### 7. Do Checkpointing Logic Only Using the Root Worker

Checkpointing is a common activity in production DL training (for simplicity we are not relying on it in this lab, however). This is extremely important for "defensive I/O" purposes (so that if a process or node fails, we don't lose all our work), and is also useful for being able to chart the progress of our training run. Let's think about how that works in the multi-process context.

The most important issue is that there can be a race condition while writing the checkpoint to a file. If every rank finishes the epoch at the same time, they might be writing to the same filename, and this could result in corrupted data. But more to the point, we don't even need to do this: by construction in synchronous data parallel SGD, every rank has the same copy of the weights at all times, so only one worker needs to write the checkpoint. As usual, our convention will be that the root worker (rank 0) handles this.

For the same reason, if we are restarting from a checkpoint later on, we don't need every rank to read in the checkpoint -- only one rank needs to do so, and then it can broadcast the data to all the other workers. (In that case, all the workers do still need to instantiate the same model as the one in the checkpoint.) We also often don't want every rank to read in the checkpoint -- at large enough scale, having thousands of processes all read from the same file on disk can be inefficient. You might also be in a situation where only one server node has the data available, so the broadcast is necessary in that case.

We already encountered broadcasting in the form of a callback in Step 4. But with Horovod we can take direct control and broadcast (that is, send some data from one processor to every other processor) a specific scalar or tensor.

In [23]:
?hvd.broadcast

[0;31mSignature:[0m [0mhvd[0m[0;34m.[0m[0mbroadcast[0m[0;34m([0m[0mvalue[0m[0;34m,[0m [0mroot_rank[0m[0;34m,[0m [0mname[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Perform a broadcast on a tensor-compatible value.

Arguments:
    value: A tensor-compatible value to reduce.
           The shape of the input must be identical across all ranks.
    root_rank: Rank of the process from which global variables will be
               broadcasted to all other processes.
    name: Optional name for the constants created by this operation.
[0;31mFile:[0m      /usr/local/lib/python3.6/dist-packages/horovod/tensorflow/keras/__init__.py
[0;31mType:[0m      function


The training script has a function `restart_epoch()` which looks for the latest checkpoint we have created and returns the epoch number corresponding to that checkpoint. By default, every rank is resuming from epoch 0 (look for the `resume_from_epoch` variable), which means start from the beginning without loading a checkpoint. We can run the `restart_epoch()` function to actively look for checkpoints, but we only need to do this on one rank, and then we can have that same rank broadcast that epoch number to all other ranks, and also read in the checkpoint and broadcast the checkpointed weights (this is already handled by the `BroadcastGlobalVariablesCallback` we implemented in Step 4).

**Exercise**: edit `fashion_mnist.py` so that after

```python
resume_from_epoch = 0
```

you:

(1) update the `resume_from_epoch` variable on rank 0 (the root process) using the `restart_epoch()` function;

(2) broadcast the value of this data to all other processes; and,

(3) uncomment the checkpointing callbacks, and make sure they're only appended on rank 0.

Use the docstring printed above to assist your work in getting the syntax right.

Look for `TODO: Step 7` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_07.py`.

In [24]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 3 --batch-size 512 --use-checkpointing

Epoch 1/3
2021-11-11 19:31:27.974364: E tensorflow/core/platform/default/device_tracer.cc:70] CUPTI error: CUPTI could not be loaded or symbol could not be found.
Images/sec: 2108.65
Cumulative training time after epoch 1: 28.52
Epoch 2/3
Images/sec: 5708.0
Cumulative training time after epoch 2: 39.38
Epoch 3/3
Images/sec: 5924.9
Cumulative training time after epoch 3: 49.89
Cumulative training time: 50.18
Test loss: 2.9833685874938967
Test accuracy: 0.3482


Now let's test to make sure we did the checkpointing correctly by running for another 2 epochs, making sure that we start at epoch 4 here.

In [25]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 5 --batch-size 512 --use-checkpointing

Epoch 4/5
2021-11-11 19:32:23.424203: E tensorflow/core/platform/default/device_tracer.cc:70] CUPTI error: CUPTI could not be loaded or symbol could not be found.
Images/sec: 1802.19
Cumulative training time after epoch 4: 33.3
Epoch 5/5
Images/sec: 2168.59
Cumulative training time after epoch 5: 62.06
Cumulative training time: 62.39
Test loss: 1.1462138891220093
Test accuracy: 0.5784


If you make any mistakes, you can simply delete the logs directory and try again.

In [None]:
!rm -rf logs

## Make necessary algorithmic adjustments

So far we've just gone through the mechanics of how to do distributed training. But we haven't discussed what algorithm adjustments need to be made when you are training at larger scale.

### 8. Increase the learning rate

Given a fixed batch size per GPU, the effective batch size for training increases when you use more GPUs, since we average out the gradients among all processors. [Standard practice](https://arxiv.org/abs/1404.5997) is to scale the learning rate by the same factor that you have scaled the batch size -- that is, by the number of workers present. This can be done so that the training script does not change for single-process runs, since in that case you just multiply by 1.

The reason we do this is that the error of a mean of *n* samples (random variables) with finite variance *sigma* is approximately sigma/sqrt(n) when *n* is large (see the [central limit theorem](https://en.wikipedia.org/wiki/Central_limit_theorem)). Hence, learning rates should be scaled at least with sqrt(k) when using *k* times bigger batch sizes in order to preserve the variance of the batch-averaged gradient. In practice we use linear scaling, often out of convenience, although in different circumstances one or the other may be superior in practice.

**Exercise**: Scale the learning rate by the number of workers, and look at the effect on the training accuracy, if any.

Look for `TODO: Step 8` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_08.py`.

In [26]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 3 --batch-size 512

Epoch 1/3
Images/sec: 2037.53
Cumulative training time after epoch 1: 29.48
Epoch 2/3
Images/sec: 5910.99
Cumulative training time after epoch 2: 39.7
Epoch 3/3
Images/sec: 5537.51
Cumulative training time after epoch 3: 50.57
Cumulative training time: 50.73
Test loss: 1.5546909868717194
Test accuracy: 0.4402


### 9. Add learning rate warmup

As it stands in `fashion_mnist.py`, we are using `keras.callbacks.LearningRateScheduler` along with the user-defined `lr_schedule` function, to reduce the learning rate (LR) by a factor of 10 on the 15th, 25th and 35th epochs:

```python
def lr_schedule(epoch):
    if epoch < 15:
        return args.base_lr
    if epoch < 25:
        return 1e-1 * args.base_lr
    if epoch < 35:
        return 1e-2 * args.base_lr
    return 1e-3 * args.base_lr

callbacks.append(keras.callbacks.LearningRateScheduler(lr_schedule))

```

Many models are sensitive to using a large learning rate immediately after initialization and can benefit from learning rate warmup. We saw earlier that we typically scale the learning rate linear with batch sizes. But if the batch size gets large enough, then the learning rate will be very high, and the network tends to diverge, especially in the very first few iterations. We counteract this by gently ramping the learning rate to the target learning rate.

In practice, the idea is to start training with a lower learning rate and [gradually raise it to a target learning rate](https://arxiv.org/abs/1706.02677) over a few epochs. Horovod has the convenient `horovod.keras.callbacks.LearningRateWarmupCallback` for the Keras API that implements that logic. By default it will, over the first 5 epochs, gradually increase the learning rate from *initial learning rate* / *number of workers* up to *initial learning rate*. Execute the following cell to get more information:

In [27]:
?hvd.callbacks.LearningRateWarmupCallback

[0;31mInit signature:[0m
[0mhvd[0m[0;34m.[0m[0mcallbacks[0m[0;34m.[0m[0mLearningRateWarmupCallback[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mwarmup_epochs[0m[0;34m=[0m[0;36m5[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmomentum_correction[0m[0;34m=[0m[0;32mTrue[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msteps_per_epoch[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mverbose[0m[0;34m=[0m[0;36m0[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
Implements gradual learning rate warmup:

    `lr = initial_lr / hvd.size()` ---> `lr = initial_lr`

`initial_lr` is the learning rate of the model optimizer at the start of the training.

This technique was described in the paper "Accurate, Large Minibatch SGD: Training
ImageNet in 1 Hour". See https://arxiv.org/pdf/1706.02677.pdf for details.

Math recap:

.. math::

    epoch &= full\_epochs + \frac{batch}{steps\_per\_epoch}

    lr'(e

We can also swap out `keras.callbacks.LearningRateScheduler` for `horovod.keras.callbacks.LearningRateScheduleCallback`. Execute the following cell to get more information:

In [28]:
?hvd.callbacks.LearningRateScheduleCallback

[0;31mInit signature:[0m
[0mhvd[0m[0;34m.[0m[0mcallbacks[0m[0;34m.[0m[0mLearningRateScheduleCallback[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mmultiplier[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mstart_epoch[0m[0;34m=[0m[0;36m0[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mend_epoch[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mstaircase[0m[0;34m=[0m[0;32mTrue[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmomentum_correction[0m[0;34m=[0m[0;32mTrue[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msteps_per_epoch[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
LearningRateScheduleCallback sets learning rate between epochs `start_epoch` and
`end_epoch` to be `initial_lr * multiplier`.  `multiplier` can be a constant or
a function `f(epoch) = lr'`.

If `multiplier` is a function and `staircase=True`, learning rate adjustment will
happen at the beginning 

We can still pass `lr_schedule` as the `multiplier` argument to `horovod.keras.callbacks.LearningRateScheduleCallback`. However this callback is invoked every epoch and we do not want it to conflict with `horovod.keras.callbacks.LearningRateWarmupCallback`. So we will also need to set its `start_epoch` argument such that it is only invoked after the warmup period.

**Exercise**: Add learning rate warmup to our training script.

First, register a new `warmup-epochs` argument using the following code:
```python
parser.add_argument('--warmup-epochs', type=float, default=5,
                    help='number of warmup epochs')
```

Second, using `args.warmup_epochs` as the `warmup_epochs` argument, implement a learning rate warmup. Please also set the `verbose` argument to `verbose`.

Third, replace `keras.callbacks.LearningRateScheduler` with `horovod.keras.callbacks.LearningRateScheduleCallback`, using `lr_schedule` as the `multiplier` argument, and taking care to not start the callback until after the warmup epochs have completed.

Look for `TODO: Step 9` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_09.py`.

In [29]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 3 --batch-size 512

Epoch 1/3
Images/sec: 1658.04
Cumulative training time after epoch 1: 36.19
Epoch 2/3
Images/sec: 5484.47
Cumulative training time after epoch 2: 47.19
Epoch 3/3
Images/sec: 5270.21
Cumulative training time after epoch 3: 58.67
Cumulative training time: 58.7
Test loss: 0.6973326444625855
Test accuracy: 0.7489


### 10. Change the optimizer

You will likely find that as you scale to multiple GPUs and the resulting overall batch size increases, accuracy of the network will suffer. A series of optimizers have been created to address this problem, and allow for scaling to very large batch sizes and learning rates. In this exercise we'll be using the [NovoGrad optimizer](https://arxiv.org/abs/1905.11286). NovoGrad has the standard form of an update to the weights,

\begin{equation*}
  \large
  \Delta \mathbf{w} = -\lambda\, \mathbf{m}
\end{equation*}

but the $\mathbf{m}$ term appropriately normalizes the gradients to avoid the [vanishing gradient (or exploding gradient) problem](https://en.wikipedia.org/wiki/Vanishing_gradient_problem), using a gradient-averaging scheme similar to how SGD uses momentum to do that normalization. NovoGrad ensures that the learning rate is scaled appropriately on each layer, which empirically is [important in the large batch regime](https://arxiv.org/abs/1708.03888). If you are interested in continuing this exploration after this course, the [LAMB optimizer](https://arxiv.org/abs/1904.00962) is another extremely promising recent method worth exploring, which is very similar to NovoGrad in that it combines both [Adam](https://arxiv.org/abs/1412.6980), a popular variant of SGD, and layer-wise learning rates.

**Exercise**: Use the NovoGrad optimizer.

Replace the SGD optimizer with the NovoGrad optimizer and pass in the learning rate multiplied by the number of ranks. 

Look for `TODO: Step 10` in `fashion_mnist.py`. If you get stuck, refer to `solutions/fashion_mnist_after_step_10.py`.

In [30]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 3 --batch-size 512

Epoch 1/3
Images/sec: 1647.81
Cumulative training time after epoch 1: 36.41
Epoch 2/3
Images/sec: 5338.6
Cumulative training time after epoch 2: 47.7
Epoch 3/3
Images/sec: 5605.01
Cumulative training time after epoch 3: 58.58
Cumulative training time: 58.63
Test loss: 0.7952311754226684
Test accuracy: 0.7349


## Check your work

Congratulations!  If you made it this far, your `fashion_mnist.py` should now be fully distributed. To verify, compare `fashion_mnist.py` to `fashion_mnist_solution.py`, and make any changes you might have missed.

If you haven't already, run a 1 GPU training session and compare it to a training session using all of the GPUs you have. Train for a decent number of epochs and compare both accuracy and time to solution. Ideally you'd get through the epochs faster by a factor of the number of GPUs you have, but for a model this small you probably won't get perfectly linear scaling like that. Still, you probably got a nice speedup.

In [31]:
!horovodrun -np 1 python fashion_mnist.py --epochs 20 --batch-size 512

Epoch 1/20
Images/sec: 883.84
Cumulative training time after epoch 1: 67.89
Epoch 2/20
Images/sec: 1393.82
Cumulative training time after epoch 2: 110.96
Epoch 3/20
Images/sec: 1403.09
Cumulative training time after epoch 3: 153.75
Epoch 4/20
Images/sec: 1396.81
Cumulative training time after epoch 4: 196.73
Epoch 5/20
Epoch 5: finished gradual learning rate warmup to 0.01.

Images/sec: 1398.63
Cumulative training time after epoch 5: 239.66
Epoch 6/20
Images/sec: 1406.01
Cumulative training time after epoch 6: 282.36
Epoch 7/20
Images/sec: 1404.09
Cumulative training time after epoch 7: 325.11
Early stopping after epoch 7
Cumulative training time: 325.14
Test loss: 0.3724208980798721
Test accuracy: 0.8776


In [32]:
!horovodrun -np $num_gpus python fashion_mnist.py --epochs 20 --batch-size 512

Epoch 1/20
Images/sec: 1594.34
Cumulative training time after epoch 1: 37.64
Epoch 2/20
Images/sec: 5722.18
Cumulative training time after epoch 2: 48.17
Epoch 3/20
Images/sec: 5102.01
Cumulative training time after epoch 3: 60.08
Epoch 4/20
Images/sec: 5492.39
Cumulative training time after epoch 4: 71.05
Epoch 5/20
Epoch 5: finished gradual learning rate warmup to 0.04.

Images/sec: 5282.69
Cumulative training time after epoch 5: 82.5
Epoch 6/20
Images/sec: 5634.26
Cumulative training time after epoch 6: 93.32
Epoch 7/20
Images/sec: 5630.12
Cumulative training time after epoch 7: 104.01
Epoch 8/20
Images/sec: 5701.54
Cumulative training time after epoch 8: 114.62
Epoch 9/20
Images/sec: 5283.42
Cumulative training time after epoch 9: 126.19
Epoch 10/20
Images/sec: 5626.82
Cumulative training time after epoch 10: 136.92
Epoch 11/20
Images/sec: 5649.98
Cumulative training time after epoch 11: 147.56
Early stopping after epoch 11
Early stopping after epoch 11
Early stopping after epoch 1

Now you're fully prepared to take your own training model and distribute it across many GPUs!

<img src="./images/DLI_Header.png">