# Distributed Gradient Descent with Horovod 
### Explained using Multivariate Linear Regression in PyTorch and Python

by <a href="mailto:carl.osipov@gmail.com"><b>Carl Osipov</b></a>, based on the materials from his <a href="https://bit.ly/cnml-ebook"><b>"Cloud Native Machine Learning"</b></a>.

In this notebook, you can learn about:

* gradient descent with gradient accumulation 
* deep learning with training datasets that do not fit in node memory
* gradient accumulation in parameter server-based vs ring-based (for example Horovod) gradient descent
* reduce-scatter and reduce-all phases in Horovod
* Horovod as an implementation for distributed data parallel training in deep learning

Why should you care? Are you:

* informing deep learning platform buy vs. build decision for your project, team, or organization
* scaling up your deep learning models to out-of-memory training datasets
* understanding or troubleshooting distributed deep learning training
* researching the direction of high performance distributed deep learning 
* working on federated deep learning or bandwidth-constrained (IoT) training at cloud's edge

This notebook builds on the gradient descent, automatic differentiation, tensors, and other concepts introduced in:

* <a href="https://colab.research.google.com/github/osipov/manning/blob/master/autodiff/Solution_Autodiff_Algorithm.ipynb">Automatic Differentiation Explained</a>
* <a href="https://web.archive.org/web/20200925082637/https://www.cs.fsu.edu/~xyuan/paper/09jpdc.pdf">Bandwidth Optimal All-reduce Algorithms</a>
* <a href="https://web.archive.org/web/20200413112232/https://andrew.gibiansky.com/blog/machine-learning/baidu-allreduce/">Bringing HPC Techniques to Deep Learning</a>

### Import a standard set of libraries and initialize the pseudo-random number generator

In [None]:
import torch as pt
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

SEED = 42
pt.manual_seed(SEED)
np.random.seed(SEED)

pt.__version__, np.__version__, pd.__version__

### Create a training dataset of features for multivariate linear regression

* `TRAINING_DATASET_SIZE` is pre-set to a default of 1,000
* Assume that `FEATURES` is set to 4

In [None]:
TRAINING_DATASET_SIZE = 1000
FEATURES = 4

Create a NumPy nd-array named `X_numpy` with a shape of `(TRAINING_DATASET_SIZE, FEATURES)`:

* the array should contain feature values such that the values in each column are from a normal (bell-shaped) distribution. 
* the means of the feature values should be `0.0` for the first column, `1.0` for the second column, and so on.
* the feature values across columns should be uncorrelated.

**Hint:** You can use `np.random.multivariate_normal` to create `X_numpy`

### Check that a column's feature values are normal

**Hint:** You can use the `matplotlib.pyplot.hist` method.

### Confirm that the feature values are uncorrelated across columns

**Hint:** You can use the `pandas.DataFrame.corr` method

### Create a PyTorch tensor `X_train` from `X_numpy`
* You can use PyTorch's `from_numpy` function to re-use the in-memory data allocated by a NumPy nd-array.
* Ensure that the resulting PyTorch tensor has a shape of `(TRAINING_DATASET_SIZE, FEATURES)`

Note that the `COEFFICIENTS` tensor is pre-defined as `[5, 3, 2, 1]` just to make the linear regression easier to visualize in the upcoming cells.


In [None]:
COEFFICIENTS = pt.tensor([5, 3, 2, 1])
COEFFICIENTS

### Create a `y_train` tensor using `COEFFICIENTS` and `X_train`


Gradient descent will attempt to recover the `COEFFICIENTS` using just the `y_train` and `X_train` values.

* Ensure that `y_train` has a shape of `(TRAINING_DATASET_SIZE)`

### Visualize the 5 dimensions of the linear regression problem

Use the scatter plot to visualize the 4 dimensions of the `X_train` as 3 spatial and 1 size dimensions. The 1 dimension of `y_train` is visualized as the color on the scatter plot.

* the `SCALE` setting is configured to 1,000 for a better looking visualization and does not change the original multivariate linear regression problem.

* the `STRIDE` setting is configured to 25 by default. You can choose more data points out of `TRAINING_DATASET_SIZE` to visualize by setting `STRIDE` to be closer to 1, or fewer data points to visualize by using a `STRIDE` value closer to `TRAINING_DATASET_SIZE`.

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

figure = plt.figure(figsize = (16, 9))
axis = Axes3D(figure, rect = [0, 0, .95, 1], elev=48, azim=134)

SCALE = 1_000
y_view, X_view = SCALE * y_train, SCALE * X_train

STRIDE = 25
pc = axis.scatter(X_view[::STRIDE, 0], X_view[::STRIDE, 1], X_view[::STRIDE, 2], s = X_view[::STRIDE, 3], c = y_view[::STRIDE], cmap='autumn', alpha=0.4)
figure.colorbar(pc, fraction=0.01)
axis.set_xlabel('x0'), axis.set_ylabel('x1'), axis.set_zlabel('x2');

### Create random values for the initial model parameters
* create a `w_numpy` nd-array shaped `(FEATURES, 1)` with the model parameter values
* you can use normally distributed `randn` values or try a more complex initialization scheme

### Instantiate a differentiable tensor `w` as the model


* the `w` tensor should use `w_numpy` and use `pt.float64` as the `dtype`
* recall that `requires_grad` must be `True` for a PyTorch tensor to be differentiable
* you can use `torch.randn` to initialize the values or try a more complex scheme like `torch.nn.init.kaiming_uniform_`
* implement the `forward` and `mse` functions for the model

### Solve for the coefficients using ordinary <a href="https://en.wikipedia.org/wiki/Gradient_descent">gradient descent</a>

For consistency, with the upcoming parts of the notebook use 

* `EPOCHS = 400`
* `LEARNING_RATE = 0.01`

In [None]:
EPOCHS = 400
LEARNING_RATE = 0.01

Recover the coefficients from the data using ordinary gradient descent
* the entire training dataset is used to compute the gradients per iteration of gradient descent

**What if the training dataset doesn't fit in-memory of the node?**

### Gradient Descent using Gradient Accumulation

* Single node, in-memory implementation
* For example, an in-memory **shard** (a part of the training dataset) can consist of 250 training examples

<img src="https://i.imgur.com/cBgNUL2.png"/>

* Use `IN_MEMORY_SHARD_SIZE` of `250` to demonstrate node's memory limitation


In [None]:
IN_MEMORY_SHARD_SIZE = 250

w = pt.tensor(w_numpy, requires_grad=True, dtype=pt.float64)

from torch.utils.data import TensorDataset, DataLoader

### Single node gradient descent with gradient accumulation

* Use `TensorDataset` and `DataLoader` from the `torch.utils.data` package
* Use `IN_MEMORY_SHARD_SIZE / TRAINING_DATASET_SIZE` to adjust the loss function for the shard size 
* Demonstrate gradient accumulation vs mini-batch gradient descent explaining shards vs. training batches.

**Can gradient accumulation help distribute gradient descent?**

# Parameter server-based distributed (multi-node) gradient descent

<img src="https://i.imgur.com/Y1wDyRs.png"/>

* straightforward distributed architecture, popularized by TensorFlow 1.x and Google Cloud AI Platform
* **Distributed Data Parallel** machine learning training approach where the same model parameters are used across all nodes during training
* model **MUST** fit in-memory of the worker and parameter nodes
* parallel processing (shard to gradients) improves gradient step latency vs sequential gradient accumulation
* bandwidth inefficient (bottlenecked) due to many-to-one (worker nodes to parameter nodes) gradient transfer and one-to-many (parameter nodes to worker nodes) model parameter transfer.


**Can we do better than the parameter server-based approach?**

### Intuition behind ring-based distributed gradient descent

* Introduce `NODES` as `TRAINING_DATASET_SIZE // IN_MEMORY_SHARD_SIZE` for distributed gradient descent

In [None]:
NODES = TRAINING_DATASET_SIZE // IN_MEMORY_SHARD_SIZE

GRADIENTS = [5, 3, 2, 1]
GRADIENTS
node_to_gradients = dict(zip(range(NODES), GRADIENTS))
node_to_gradients

#### All-Gather Operation Example (not Horovod)

<img src="https://i.imgur.com/vHfxzQt.png"/>

### Demonstrate 3 iterations with 4 nodes

* Assuming iteration 0 has node 1 start the communication, after 3 iterations the gradients should be accumulated (reduced) on node 0.

# Horovod: A ring-based distributed gradient descent

* **Distributed Data Parallel** approach using (1) reduce-scatter and (2) all-gather phases
* more bandwidth efficient than parameter server based approaches and plain all-gather
* not to be confused with <a href="https://github.com/horovod/horovod">Hovorod</a> the machine learning framework
* also not to be confused with <a href="https://i.imgur.com/TVDynOb.jpeg">Horovod the Slavic folk dance</a> which inspired the name

### Partitioning gradients into "segments"

* having a **segment** of the gradients enables Horovod to decrease the amount of bandwidth needed to complete the reduce-all operation, in other words, to deliver the sum of the gradients for each shard to every node in the ring.

* by default there are as many segments  as nodes

<img src="https://i.imgur.com/dyj2LMU.png"/>


# Gradient descent with Horovod

* create a list of tensors `W` with `NODES` tensors based on `w_numpy` values
* recall that each of the tensors in the list `W` must have `requires_grad` set to `True`

## Perform a single forward pass of gradient descent for every node
* you use `zip` on `range(NODES)` and `train_dl`

### Output the gradient segments on each node

### Output the target sum of the gradients as the target for Horovod

## Horovod Phase 1: Reduce Scatter

In [None]:
%%html
<video  width="960" height="720" autoplay loop muted playsinline controls>
    <source src="https://i.imgur.com/IV6jBwL.mp4" type="video/mp4">
</video>

In [None]:
#horovod phase 1: reduce scatter


### Output the gradient segments on each node

## Output just the reduced (accumulated) segment on each node

## Horovod Phase 2: All Gather

In [None]:
%%html
<video  width="960" height="720" autoplay loop muted playsinline controls>
    <source src="https://i.imgur.com/VU87GmZ.mp4" type="video/mp4">
</video>

In [None]:
#horovod phase 2: all gather


# Bringing it all together

In [None]:
W = [pt.tensor(w_numpy, requires_grad=True) for _ in range(NODES)]

EPOCHS = 400
for epoch in range(EPOCHS): 
  
  #compute per batch gradients on each node
  for node, (y_shard, X_shard) in zip(range(NODES), train_dl):
    y_est = forward(W[node], X_shard)
    loss = (IN_MEMORY_SHARD_SIZE / TRAINING_DATASET_SIZE) * mse(y_shard, y_est)
    loss.backward()

  #horovod phase 1: reduce-scatter
  for iter in range(NODES - 1):
    for node in range(NODES):
      seg = (node - iter - 1) % NODES
      grad = W[node].grad[seg]

      next_node = (node + 1) % NODES
      W[next_node].grad[seg] += grad

  #horovod phase 2: all-gather
  for iter in range(NODES - 1):
    for node in range(NODES):
      seg = (node - iter) % NODES
      grad = W[node].grad[seg]

      next_node = (node + 1) % NODES
      W[next_node].grad[seg] = grad

  #perform a step of gradient descent
  for node in range(NODES):
    W[node].data -= LEARNING_RATE * W[node].grad
    W[node].grad = None

for node in range(NODES):
  print(W[node].data)

Copyright 2020 CounterFactual.AI LLC. All Rights Reserved.

Licensed under the GNU General Public License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. 

You may obtain a copy of the License at

https://github.com/osipov/smlbook/blob/master/LICENSE

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.