# Distributed

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/lukeconibear/intro_ml/blob/main/docs/05_distributed.ipynb)

In [None]:
# if you're using colab, then install the required modules
import sys

IN_COLAB = "google.colab" in sys.modules
if IN_COLAB:
    pass

```{note}
If you’re in COLAB or have a local CUDA GPU, you can follow along with the more computationally intensive training in this lesson.

For those in COLAB, ensure the session is using a GPU by going to: Runtime > Change runtime type > Hardware accelerator = GPU.
```

Distributing training over multiple devices generally uses either:

- [Data parallelism](https://developers.google.com/machine-learning/glossary/#data-parallelism)
    - Single model copied to multiple devices.
    - Split data over multiple devices.
    - Useful for big data.
- [Model parallelism](https://developers.google.com/machine-learning/glossary/#model-parallelism)
    - Split model over multiple devices.
    - Single data copied to multiple devices.
    - Useful for big models (for some architectures).
    
This lesson focuses on data parallelism.

## [Ray Train](https://docs.ray.io/en/latest/train/train.html)

Ray Train simplifies distributed deep learning for TensorFlow and PyTorch.

It handles the set up for you (e.g., [`TF_CONFIG`](https://www.tensorflow.org/guide/distributed_training#setting_up_the_tf_config_environment_variable) in TensorFlow).

There are a range of examples [here](https://docs.ray.io/en/latest/train/examples.html).

### [TensorFlow (Keras)](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras)

Here is an [MNIST example](https://docs.ray.io/en/latest/train/examples/tensorflow_mnist_example.html):

In [100]:
import argparse
import json
import os

import numpy as np
import ray
import tensorflow as tf
from ray.train import Trainer
from tensorflow.keras.callbacks import Callback

#### [Define callback for reporting](https://docs.ray.io/en/latest/train/user_guide.html#logging-monitoring-and-callbacks)

In [137]:
class TrainReportCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        ray.train.report(**logs)

#### Set up the dataset and model

The dataset will be split (sharded) across the workers.

```{tip}
The default [auto-sharding](https://www.tensorflow.org/tutorials/distribute/input#sharding) by `FILE` can cause warning messages if the data is in one file. Instead, auto-shard by data using: `tf.data.experimental.AutoShardPolicy.DATA`
```

In [115]:
def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    ds_train = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .shuffle(60000)
        .repeat()
        .batch(batch_size)
    )

    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = (
        tf.data.experimental.AutoShardPolicy.DATA
    )
    ds_train = ds_train.with_options(options)

    return ds_train

In [116]:
def build_and_compile_cnn_model(config):
    learning_rate = config.get("lr", 0.001)
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
        metrics=["accuracy"],
    )
    return model

#### Set up the training function for a _single_ worker

You can [configure training](https://docs.ray.io/en/latest/train/user_guide.html#configuring-training) using the `config` parameter.

In [117]:
def train_func(config):
    batch_size = 64
    single_worker_dataset = mnist_dataset(batch_size)
    single_worker_model = build_and_compile_cnn_model(config)
    single_worker_model.fit(
        single_worker_dataset,
        epochs=config["epochs"],
        steps_per_epoch=70,
        verbose=False,
    )

In [118]:
config = {"epochs": 3}

In [119]:
train_func(config)

Epoch 1/3
Epoch 2/3
Epoch 3/3


#### [Update training function](https://docs.ray.io/en/latest/train/user_guide.html#update-training-function)

1. Set the _global_ batch size
    - Each worker will process the same size batch as in the single-worker code.
2. Choose your TensorFlow distributed training strategy.
    - In this example we use the [MultiWorkerMirroredStrategy](https://www.tensorflow.org/guide/distributed_training#multiworkermirroredstrategy) for synchronous training of multiple workers across many machines.
        - For multiple workers on _one_ machine, use [MirroredStrategy](https://www.tensorflow.org/api_docs/python/tf/distribute/MirroredStrategy).
        - In general, the mirrored strategy mirrors the parameters across the workers, ensuring replicas are identical.
    - Within the strategy scope context manager, you build and compile the model.

In [120]:
def train_func(config):
    per_worker_batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)
    steps_per_epoch = config.get("steps_per_epoch", 70)

    tf_config = json.loads(os.environ["TF_CONFIG"])
    num_workers = len(tf_config["cluster"]["worker"])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_cnn_model(config)

    history = multi_worker_model.fit(
        multi_worker_dataset,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        callbacks=[TrainReportCallback()],
        verbose=False,
    )
    results = history.history
    return results

#### [Create Ray Train Trainer](https://docs.ray.io/en/latest/train/user_guide.html#create-ray-train-trainer)

The `Trainer` manages state and training.

In [121]:
def train_tensorflow_mnist(num_workers=1, use_gpu=False, epochs=4):
    trainer = Trainer(backend="tensorflow", num_workers=num_workers, use_gpu=use_gpu)
    trainer.start()
    results = trainer.run(
        train_func=train_func, config={"lr": 1e-3, "batch_size": 64, "epochs": epochs}
    )
    trainer.shutdown()
    print(f"Results: {results[0]}")

#### [Run the training](https://docs.ray.io/en/latest/train/user_guide.html#run-training-function)

Initialise and shutdown the Ray client:

In [139]:
# ray.init()

In [140]:
# cpu
# train_tensorflow_mnist()

# gpu
# train_tensorflow_mnist(use_gpu=True)

In [141]:
# ray.shutdown()

This Python script is in full [here](https://github.com/lukeconibear/intro_ml/blob/main/docs/distributed/tensorflow_ray_train_mnist_example.py).

The job submission script is (also [here](https://github.com/lukeconibear/intro_ml/blob/main/docs/distributed/distributed_ml_on_arc4_cpu.bash)):

```bash
#!/bin/bash
#$ -cwd
#$ -l h_rt=00:30:00
#$ -pe smp 12
#$ -l h_vmem=6G

conda activate intro_ml
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib  # (sometimes needed)

python tensorflow_ray_train_mnist_example.py --num-workers 12 --epochs 100
```

In this simple example using 12 CPUs, the job efficiency (using `qacct -j <JOBID>`):

```
Efficiency = 100 * cpu / (ru_wallclock * slots)
Efficiency = 100 * 10214 / (928 * 12)
Efficiency = 92 %
```

92% is good.

To run on the GPU ([submission script](https://github.com/lukeconibear/intro_ml/blob/main/docs/distributed/distributed_ml_on_arc4_gpu.bash)):
- Replace `#$ -pe smp 4` with `#$ -l coproc_v100=1`.
- Add `--use-gpu=True`.

### [PyTorch](https://pytorch.org/tutorials/beginner/dist_overview.html)

Can also distribute with [Ray Train](https://docs.ray.io/en/latest/train/examples/train_fashion_mnist_example.html).

To share data on a single filesystem, download the dataset once:

```python
Trainer(prepare_data_per_node=False)
```

The default behaviour is to download the data _once per node_. 

#### [DDP Strategy](https://pytorch-lightning.readthedocs.io/en/stable/advanced/model_parallel.html#ddp-optimizations)


```python
# train on 8 GPUs, using the DDP strategy
trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp")
```
...


```python
# train on multiple GPUs across nodes (uses 8 GPUs in total)
trainer = Trainer(accelerator="gpu", devices=2, num_nodes=4)
```

`num_workers`

https://pytorch-lightning.readthedocs.io/en/stable/guides/speed.html#num-workers

In [126]:
import argparse
from typing import Dict

import ray
import torch
from ray.train.callbacks import JsonLoggerCallback
from ray.train.trainer import Trainer
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

#### Set up the dataset and model

In [1]:
# training_data = datasets.FashionMNIST(
#     root="~/data",
#     train=True,
#     download=True,
#     transform=ToTensor(),
# )

# test_data = datasets.FashionMNIST(
#     root="~/data",
#     train=False,
#     download=True,
#     transform=ToTensor(),
# )

In [146]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

#### Define training and validation per epoch

In [147]:
def train_epoch(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset) // ray.train.world_size()
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [148]:
def validate_epoch(dataloader, model, loss_fn):
    size = len(dataloader.dataset) // ray.train.world_size()
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(
        f"Test Error: \n "
        f"Accuracy: {(100 * correct):>0.1f}%, "
        f"Avg loss: {test_loss:>8f} \n"
    )
    return test_loss

#### [Setup distributed training function](https://docs.ray.io/en/latest/train/user_guide.html#update-training-function)

Use `ray.train.torch.prepare_model` to automatically move your model to the right device.

Use `ray.train.torch.prepare_data_loader` utility functions to setup your data for distributed training.

In [149]:
def train_func(config: Dict):
    batch_size = config["batch_size"]
    lr = config["lr"]
    epochs = config["epochs"]

    worker_batch_size = batch_size // ray.train.world_size()

    # Create data loaders.
    train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
    test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)

    train_dataloader = ray.train.torch.prepare_data_loader(train_dataloader)
    test_dataloader = ray.train.torch.prepare_data_loader(test_dataloader)

    # Create model.
    model = NeuralNetwork()
    model = ray.train.torch.prepare_model(model)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    loss_results = []

    for _ in range(epochs):
        train_epoch(train_dataloader, model, loss_fn, optimizer)
        loss = validate_epoch(test_dataloader, model, loss_fn)
        ray.train.report(loss=loss)
        loss_results.append(loss)

    return loss_results

#### [Create Ray Train Trainer](https://docs.ray.io/en/latest/train/user_guide.html#create-ray-train-trainer)

In [150]:
def train_fashion_mnist(num_workers=1, use_gpu=False):
    trainer = Trainer(backend="torch", num_workers=num_workers, use_gpu=use_gpu)
    trainer.start()
    result = trainer.run(
        train_func=train_func,
        config={"lr": 1e-3, "batch_size": 64, "epochs": 4},
        callbacks=[JsonLoggerCallback()],
    )
    trainer.shutdown()
    print(f"Loss results: {result}")

#### [Run the training](https://docs.ray.io/en/latest/train/user_guide.html#run-training-function)

In [142]:
# ray.init()

In [143]:
# cpu
# train_fashion_mnist()

# gpu
# train_fashion_mnist(use_gpu=True)

In [144]:
# ray.shutdown()

This Python script is in full [here](https://github.com/lukeconibear/intro_ml/blob/main/docs/distributed/pytorch_ray_train_fashion_mnist_example.py).

The job submission script is the same as before ([here](https://github.com/lukeconibear/intro_ml/blob/main/docs/distributed/distributed_ml_on_arc4_cpu.bash)), except you use the line:

```bash
python tensorflow_ray_train_mnist_example.py --num-workers 12 --epochs 100
```

In this simple example using 12 CPUs, the job efficiency (using `qacct -j <JOBID>`):

```
Efficiency = 100 * cpu / (ru_wallclock * slots)
Efficiency = 100 * X / (X * 12)
Efficiency = X %
```

...

To run on the GPU ([submission script](https://github.com/lukeconibear/intro_ml/blob/main/docs/distributed/distributed_ml_on_arc4_gpu.bash)):
- Replace `#$ -pe smp 4` with `#$ -l coproc_v100=1`.
- Add `--use-gpu=True`.

## [PyTorch (Lightning)](https://pytorch-lightning.readthedocs.io/en/stable/clouds/cluster.html)


[SLURM](https://pytorch-lightning.readthedocs.io/en/stable/clouds/cluster.html#slurm-managed-cluster)


In [None]:
# train.py
def main(hparams):
    model = LightningTemplateModel(hparams)

    trainer = Trainer(accelerator="gpu", devices=8, num_nodes=4, strategy="ddp")

    trainer.fit(model)


if __name__ == "__main__":
    root_dir = os.path.dirname(os.path.realpath(__file__))
    parent_parser = ArgumentParser(add_help=False)
    hyperparams = parser.parse_args()

    # TRAIN
    main(hyperparams)

```bash
# (submit.sh)
#!/bin/bash -l

# SLURM SUBMIT SCRIPT
#SBATCH --nodes=4
#SBATCH --gres=gpu:8
#SBATCH --ntasks-per-node=8
#SBATCH --mem=0
#SBATCH --time=0-02:00:00

# activate conda env
source activate $1

# debugging flags (optional)
export NCCL_DEBUG=INFO
export PYTHONFAULTHANDLER=1

# on your cluster you might need these:
# set the network interface
# export NCCL_SOCKET_IFNAME=^docker0,lo

# might need the latest CUDA
# module load NCCL/2.4.7-1-cuda.10.0

# run script from above
srun python3 train.py
```

```
sbatch submit.sh
```

## Jupyter Notebook to HPC

It's preferable to use a static job on the HPC. To do this, you could test out different ideas locally in a Jupyter Notebook, then when ready convert this to an executable script (`.py`) and move it over. 

...

## Exercises

```{admonition} Exercise 1

...

```

## {ref}`Solutions <distributed>`

## Key Points

```{important}

- [x] _..._

```

## Further information

### Good practices

- Ensure works on a single workers first, _before_ going distributed.
- Ensure that you need the overhead of distributing over multiple GPUs e.g., could you instead use 1 GPU and model checkpointing?
- Ensure that the problem is complex enough to use multiple GPUs efficiently.
- Batch the dataset with the global batch size e.g., for 8 devices each capable of a btach of 64 use the global batch size of 512 (= 8 * 64).  
- ...

### Other options

- [Horovod](https://horovod.ai/)
- [DeepSpeed](https://www.deepspeed.ai/)
 
### Resources

- ...