# distributed-training

This tutorial covers distributed training on Spell using our integration with [horovod](https://github.com/horovod/horovod), with a code sample in PyTorch.

**Distributed training** is a set of techniques for using GPUs scattered across many different machines for training a machine learning model. `horovod` provides an easy-to-use, performant, cross-platform way of performing distributing model training, making it an essential tool for training very large models.

In this tutorial we will demonstrate an MNIST PyTorch model adapted for usage with `horovod`. We will then execute these training scripts on Spell, using our built-in Horovod integration: `spell run --distributed`.

Note: for code samples using other frameworks checkout the [examples directory](https://github.com/horovod/horovod/tree/master/examples) in the Horovod GH repo.

## prerequisites

* An account on [Spell for Teams](https://spell.run/pricing).
* The `spell` Python package installed in your local environment. Alternatively, you can launch this notebook from a Spell workspace by running the following CLI command:

    ```python
    spell jupyter \
        --lab \
        --github-url https://github.com/spellrun/spell-examples.git \
        distributed-workspace
    ```

## how it works

The simplest practical distributed training strategy is the **parameter server model**. In the parameter-server model, there were typically a number of worker processes and a parameter server (using multiple parameter servers was also possible).

Each worker process would be sent a unique batch of data by the parameter server, perform forward and back propogation on that data, and accumulate some gradients. The parameter process would block until it recieves all of the gradient updates from the worker processes, calculate an average gradient, multiply that by the learning rate, and send that back to the worker processes. The worker processes would apply the update before proceeding to the next batch of training.

![](https://i.imgur.com/luwOpug.png)

Though conceptually simple, this design proved to have significant limitations. The [blog post accompanying the open sourcing of Horovod](https://eng.uber.com/horovod/) does a good job of summarizing the issues. The TLDR Is that the network overhead cost of having so many fan-out connections between worker processes and parameter services pushes real GPU utilization at scale (64+ GPUs) to 50% or less.

Horovod borrows instead from the high performance computing world, using the **ring all-reduce algorithm** to handle gradient updates. This scheme does away with parameter servers, using instead a well-defined sequence of cyclical read and write operations to communicate gradient updates between GPUs directly:

![](https://i.imgur.com/vXpNPqC.png)

In this **all-reduce model** every GPU still has a local copy of the model parameters, and these parameters still need to be kept in sync as training proceeds. But since there are no more parameter servers, network throughput (and API complexity) is much improved.

To learn more refer to the [Concepts](https://horovod.readthedocs.io/en/latest/concepts_include.html) page in the Horovod docs.

## code sample

Using `curl` to download [sample code](https://raw.githubusercontent.com/horovod/horovod/master/examples/pytorch_mnist.py) for training MNIST using Horovod in PyTorch:

In [2]:
!curl https://raw.githubusercontent.com/horovod/horovod/master/examples/pytorch_mnist.py > pytorch_mnist.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  7708  100  7708    0     0  13788      0 --:--:-- --:--:-- --:--:-- 13788


In [4]:
# %load pytorch_mnist.py
import argparse
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import torch.utils.data.distributed
import horovod.torch as hvd

# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                    help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                    help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
                    help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                    help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                    help='SGD momentum (default: 0.5)')
parser.add_argument('--no-cuda', action='store_true', default=False,
                    help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42, metavar='S',
                    help='random seed (default: 42)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                    help='how many batches to wait before logging training status')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
                    help='use fp16 compression during allreduce')
parser.add_argument('--use-adasum', action='store_true', default=False,
                    help='use adasum algorithm to do reduction')


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


def train(epoch):
    model.train()
    # Horovod: set epoch to sampler for shuffling.
    train_sampler.set_epoch(epoch)
    for batch_idx, (data, target) in enumerate(train_loader):
        if args.cuda:
            data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            # Horovod: use train_sampler to determine the number of examples in
            # this worker's partition.
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_sampler),
                100. * batch_idx / len(train_loader), loss.item()))


def metric_average(val, name):
    tensor = torch.tensor(val)
    avg_tensor = hvd.allreduce(tensor, name=name)
    return avg_tensor.item()


def test():
    model.eval()
    test_loss = 0.
    test_accuracy = 0.
    for data, target in test_loader:
        if args.cuda:
            data, target = data.cuda(), target.cuda()
        output = model(data)
        # sum up batch loss
        test_loss += F.nll_loss(output, target, size_average=False).item()
        # get the index of the max log-probability
        pred = output.data.max(1, keepdim=True)[1]
        test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum()

    # Horovod: use test_sampler to determine the number of examples in
    # this worker's partition.
    test_loss /= len(test_sampler)
    test_accuracy /= len(test_sampler)

    # Horovod: average metric values across workers.
    test_loss = metric_average(test_loss, 'avg_loss')
    test_accuracy = metric_average(test_accuracy, 'avg_accuracy')

    # Horovod: print output only on first rank.
    if hvd.rank() == 0:
        print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
            test_loss, 100. * test_accuracy))


if __name__ == '__main__':
    args = parser.parse_args()
    args.cuda = not args.no_cuda and torch.cuda.is_available()

    # Horovod: initialize library.
    hvd.init()
    torch.manual_seed(args.seed)

    if args.cuda:
        # Horovod: pin GPU to local rank.
        torch.cuda.set_device(hvd.local_rank())
        torch.cuda.manual_seed(args.seed)


    # Horovod: limit # of CPU threads to be used per worker.
    torch.set_num_threads(1)

    kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
    # When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
    # issues with Infiniband implementations that are not fork-safe
    if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
            mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
        kwargs['multiprocessing_context'] = 'forkserver'

    train_dataset = \
        datasets.MNIST('data-%d' % hvd.rank(), train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ]))
    # Horovod: use DistributedSampler to partition the training data.
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)

    test_dataset = \
        datasets.MNIST('data-%d' % hvd.rank(), train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ]))
    # Horovod: use DistributedSampler to partition the test data.
    test_sampler = torch.utils.data.distributed.DistributedSampler(
        test_dataset, num_replicas=hvd.size(), rank=hvd.rank())
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size,
                                              sampler=test_sampler, **kwargs)

    model = Net()

    # By default, Adasum doesn't need scaling up learning rate.
    lr_scaler = hvd.size() if not args.use_adasum else 1

    if args.cuda:
        # Move model to GPU.
        model.cuda()
        # If using GPU Adasum allreduce, scale learning rate by local_size.
        if args.use_adasum and hvd.nccl_built():
            lr_scaler = hvd.local_size()

    # Horovod: scale learning rate by lr_scaler.
    optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler,
                          momentum=args.momentum)

    # Horovod: broadcast parameters & optimizer state.
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)

    # Horovod: (optional) compression algorithm.
    compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(optimizer,
                                         named_parameters=model.named_parameters(),
                                         compression=compression,
                                         op=hvd.Adasum if args.use_adasum else hvd.Average)

    for epoch in range(1, args.epochs + 1):
        train(epoch)
        test()

To understand the differences between this "horovod-ified" training script and a normal one, let's step through the changes one at a time.

```python
# Horovod: initialize library.
hvd.init()
torch.manual_seed(args.seed)

if args.cuda:
    # Horovod: pin GPU to local rank.
    torch.cuda.set_device(hvd.local_rank())
    torch.cuda.manual_seed(args.seed)
```

After importing the Horovod PyTorch binding using `import horovod.torch as hvd` we need to call `hvd.init()` to initialize it. All of the state that `horovod` manages will be passed into this script inside of this `hvd` object.

In this first bit of initialization we see the first of these local variables: `hvd.local_rank()`. 

The **local rank** is an ID number assigned to each GPU device on a machine, and it ranges from 0 to `n - 1`, where `n` is the number of GPUs devices on the machine. Horovod launches one copy of this training script for each GPU on the device, so we use `torch.cuda.set_device` to instruct PyTorch to run this code on the specific GPU Horovod has attached this script to.

----

```python
# Horovod: limit # of CPU threads to be used per worker.
torch.set_num_threads(1)
```

PyTorch performs a large number of CPU operations processing training data and moving it to GPU over the course of a training run. This is parallelized across some number of worker processes, each of which uses some number of threads to do its work.

`set_num_threads` controls the thread count per worker. It's recommended to set this value to 1 initially to prevent memory saturation (you can relax this restriction later). Cf. [GH#1314](https://github.com/horovod/horovod/pull/1314).

----

```python
kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
# When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
# issues with Infiniband implementations that are not fork-safe
if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
        mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
    kwargs['multiprocessing_context'] = 'forkserver'
```

This next bit of code is included due to some leaky abstractions. `kwargs` is a set of keyword arguments to be passed to the PyTorch `DataLoader`. See [GH#1824](https://github.com/horovod/horovod/pull/1824) for details. If you're not using Infiniband, you can safely omit this workaround.

----

```python
# Horovod: use DistributedSampler to partition the training data.
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
```

The `sampler` component in `DataLoader` returns an iterable of indices from the dataset to be drawn. The default sampler in PyTorch is sequential, returning the sequence 0, 1, 2, ..., n. Horovod overrides this behavior with its `DistributedSampler`, which handles partitioning the dataset across machines. `DistributedSampler` itself takes two parameters as input: `hvd.size()` (the total number of GPUs, e.g. 16) and `hvd.rank()` (the ID assigned to this device from the overall list, e.g. 0...15).

Note that the sampler also needs to know the current epoch. `train` calls `train_sampler.set_epoch(epoch)` on every training loop to achieve this.

----

```python
# By default, Adasum doesn't need scaling up learning rate.
lr_scaler = hvd.size() if not args.use_adasum else 1

if args.cuda:
    # Move model to GPU.
    model.cuda()
    # If using GPU Adasum allreduce, scale learning rate by local_size.
    if args.use_adasum and hvd.nccl_built():
        lr_scaler = hvd.local_size()

# Horovod: scale learning rate by lr_scaler.
optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler,
                      momentum=args.momentum)
```

Horovod simultaneously trains as many batches as you have GPUs, and the gradient update that is made gets applied to the average of all of these different batch gradients. This means that we can speed up training by multiplying our base learning rate by the number of device could, `hvd.size()`.

If you are using the specialty Adasum learning rate scheduler, which this script supports, there are some special rules to follow. See [here](https://horovod.readthedocs.io/en/latest/adasum_user_guide_include.html) in the Horovod docs.

----

```python
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
```

This training script uses default random initialization for the model weights. Each GPU initializes these random weights seperately, so unless we synchronize the initialized weights between machines the training will diverge.

The device with the rank of 0 typically has special significance in Horovod: it is the device responsible for this synchronization. These two API calls broadcast the model state from this "root" machine to the the other machines in the list, ensuring that they are in sync. Non-root device training scripts will block on this operation until Horovod has performed the sync.

----

```python
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
```

Horovod has built-in cross-platform support for performing gradient vector updates in 16-bit floating point. Reduced precision can greatly reduce network overhead by reducing payload sizes by 50% (versus FP32) or more. Since arithmetic precision is not typically a limiting factor in model performance, it's almost always worth using some sort of compression. See [here](https://github.com/horovod/horovod/blob/master/horovod/torch/compression.py) for the implementation.

----

```python
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(optimizer,
                                     named_parameters=model.named_parameters(),
                                     compression=compression,
                                     op=hvd.Adasum if args.use_adasum else hvd.Average)
```

This is where the magic happens. The Horovod `DistributedOptimizer` wrapper takes the optimizer (SGD in this case) as input, delegates gradient computation to it, averages gradients using all-reduce or all-gather, and then applies those averaged gradients across all devices.

----

```python
def metric_average(val, name):
    tensor = torch.tensor(val)
    avg_tensor = hvd.allreduce(tensor, name=name)
    return avg_tensor.item()

# ...later...
def test():
    # Horovod: average metric values across workers.
    test_loss = metric_average(test_loss, 'avg_loss')
    test_accuracy = metric_average(test_accuracy, 'avg_accuracy')

    # Horovod: print output only on first rank.
    if hvd.rank() == 0:
        print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
            test_loss, 100. * test_accuracy))
```

In the single-machine case, to log the value of a metric we would simply ask for a vector, perform some computation on it, and `print` it.

In the multi-machine case things are more complicated. We need each script's local copy of the value of the metric, and then we would need to average these values to get its cluster mean. `hvd.allreduce` returns the average of the local copies of a named vector. If an average is not appropriate, you can use the similar `hvd.allgather` method to collect the vectors into a local list instead, so that you can reduce the values need.

For clarity in the logs, we log the results in `test()` on the root machine (`hvd.rank() == 0`) only.

## running in a spell run

Spell runs provide a native integration with Horovod. Use the `--distributed n` flag, where `n` is the number of machines you want to use, to enable Horovod in a run.

For testing purposes we recommend starting off with a single-GPU run, e.g. a run with Horovod enabled but no actual model distribution present.

The Spell workspace will not log you in by default. If you are running this notebook from inside of a Spell workspace you will need to run the following command, replacing `YOUR_EMAIL` with your Spell email and `YOUR_PASSWORD` with your Spell password:

In [1]:
!spell login --identity YOUR_EMAIL --password YOUR_PASSWORD

[0mHello, Aleksey Bilogur!
[0m[0m

In [15]:
!spell run --machine-type K80 \
    --github-url https://github.com/spellrun/examples.git \
    --distributed 1 \
    "python distributed/pytorch_mnist.py"

[0m💫 Casting spell #91…
[0m✨ Stop viewing logs with ^C
[1m[36m⭐[0m Machine_Requested… Run created -- waiting for a k80 machine.[0mm^C

[0m✨ Your run is still running remotely.
[0m✨ Use 'spell kill 91' to terminate your run
[0m✨ Use 'spell logs 91' to view logs again
[0m[K[0m[?25h[0m[0m

And now for the distributed experience, a run with `--distributed 2` set:

In [18]:
!spell run --machine-type K80 \
    --github-url https://github.com/spellrun/examples.git \
    --distributed 2 \
    "python distributed/pytorch_mnist.py"

[0m💫 Casting spell #93…
[0m✨ Stop viewing logs with ^C
[1m[36m🌟[0m Machine_Requested… Run created -- waiting for a k80 machine.[0m^C

[0m✨ Your run is still running remotely.
[0m✨ Use 'spell kill 93' to terminate your run
[0m✨ Use 'spell logs 93' to view logs again
[0m[K[0m[?25h[0m[0m

Visiting the run page in the web console we can see a couple of ways in which distributed runs are treated differently from regular ones:

![](https://i.imgur.com/huQzvq2.png)

![](https://i.imgur.com/sWODmav.png)

When executing a distributed run on Spell, the machine with the rank 0 GPU is the *distributed primary* and all other machines are *distributed secondaries*. **Only resources from the distributed primary machine will be saved by the run**. This is in conformance with the Horovod way of doing things: all checkpointing should be handled by the host process for the root GPU.

On the metrics side, Spell's automatic hardware metrics logging gets extended to every machine, and you will have access to menu options allowing you to configure which GPUs and model metrics appear in the metrics viewer. This same view extends to model metrics as well.

One other important difference is the automatic inclusion of the Horovod timeline in the run output. `horovod_timeline.json` is a neat feature allowing you inspect and debug GPU utilization during the training run. You can load this file into `chrome://tracing` to view the timing history of the run:

![](https://i.imgur.com/tkB221X.png)

## running in a single spell run

Note that you can also use Horovod to distribute the training job across GPUs on the same machine, for example, getting a model training on both GPUs on a K80x2 instance:

In [16]:
!spell run --machine-type K80x2 \
    --github-url https://github.com/spellrun/examples.git \
    --distributed 1 \
    "python distributed/pytorch_mnist.py"

[0m💫 Casting spell #92…
[0m✨ Stop viewing logs with ^C
[1m[36m⭐[0m Machine_Requested… Run created -- waiting for a k80x2 machine.[0mm^C

[0m✨ Your run is still running remotely.
[0m✨ Use 'spell kill 92' to terminate your run
[0m✨ Use 'spell logs 92' to view logs again
[0m[K[0m[?25h[0m[0m

This has limited utility simply because the major deep learning libraries have built-in support for data parallelization across GPUs which works well and are much simpler to use than the Horovod API.

## running in a spell workspace

Since Horovod API is baked into our container images, it's possible to use the Horovod toolchain&mdash;specifically, the `horovodrun` CLI command&mdash;to execute a distributed training job from inside of a Spell workspace:

```bash
$ horovodrun -np 4 -H localhost:4 python pytorch_mnist.py
```

This command will launch a training job on your local machine spanning four GPUs (an appropriate choice for a `k80x4` or `v100x4` instance). Adjust the value you pass to `-np` and `-H` as needed based on the number of GPUs you have in your instance.

This feature is extremely useful for testing and/or debugging your `horovod`-based training scripts.