# Introduction to distributed machine learning
Author: Tomasz Kanas

In this introductory class we will learn basic components of distributed machine learning:

- Distributed Data Parallel (DDP) learning
- Distributed messaging
- Running distributed jobs

## Distributed Data Parallel

DDP is an algorithm that uses Data Parallelism to distribute learning. It means, that every GPU stores a copy of all model parameters, but the data is distributed. After each batch the average gradient among all GPUs is computed and distributed, so that all GPUs can perform optimizer step in parallel and end up with the same state.

Fortuantely DDP is already implemented in pytorch, so using it is very simple. Consider this simple MNIST example:

In [None]:
import os
import time
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.nn.parallel import DistributedDataParallel as DDP

class NN(nn.Module):  # inherits nn.Module

    def __init__(self, input_size, num_classes):  # input size = 28x28 = 784 for mnist
        super(NN, self).__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.fc2 = nn.Linear(50, num_classes)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

batch_size = 64
input_size = 784
num_classes = 10
learning_rate = 0.001
num_epochs = 5

train_dataset = datasets.MNIST(root='dataset/', train=True, transform=transforms.ToTensor(), download=True)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = datasets.MNIST(root='dataset/', train=False, transform=transforms.ToTensor(), download=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)

device = torch.device('cpu') # For this example cpu will be enough

# The only changes needed to distribute learning
# To initialize process group we need to specify: communication backend, rank (id of the process), world size (number of processes)
# For colab:
WORLD_SIZE = 1
RANK = 0
# Pytorch expects two environment variables, let's just set them here for now
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("gloo", rank=RANK, world_size=WORLD_SIZE)

model = DDP(NN(input_size=input_size, num_classes=num_classes)).to(device)

# End of changes

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

start_time = time.time()

for epoch in range(num_epochs):
    print(f'Epoch: {epoch}')
    for batch_idx, (data, targets) in enumerate(train_loader):
        data = data.to(device=device)
        targets = targets.to(device=device)
        # print(data.shape)  # => [64 , 1, 28, 28] => 64 : num_images, 1 -> num_channels, (28,28): (height, width)
        data = data.reshape(data.shape[0], -1)  # Flatten
        #if epoch == 0 and batch_idx == 0:
        #    print(data.shape)

        scores = model(data)
        loss = criterion(scores, targets)

        optimizer.zero_grad()
        loss.backward()

        optimizer.step()

end_time = time.time()
print(f"Learning took {end_time - start_time}s")

def check_accuracy(loader, model):

    if loader.dataset.train:
        print("Accuracy on training data")
    else:
        print("Accuracy on testing data")

    num_correct = 0
    num_samples = 0
    model.eval()
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device=device)
            y = y.to(device=device)
            x = x.reshape(x.shape[0],-1)

            scores = model(x)
            _, predictions = scores.max(1)

            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)

        print(f'Got {num_correct}/{num_samples} with accuracy {float(num_correct)/float(num_samples)*100: .2f}')
    model.train()

check_accuracy(train_loader, model)
check_accuracy(test_loader, model)


Epoch: 0
Epoch: 1
Epoch: 2
Epoch: 3
Epoch: 4
Learning took 55.39989709854126
Accuracy on training data
Got 58230/60000 with accuracy  97.05
Accuracy on testing data
Got 9640/10000 with accuracy  96.40


To distribute learning, PyTorch requires several additional values:
- WORLD_SIZE and RANK are just the number of workers and index of the current worker. Those values are usually set as environment variables by the launching script.
- MASTER_ADDR and MASTER_PORT need to be set to the IP/hostname of one of the workers and a free port on this worker. It will be used by other workers to exchange metadata (e.g. IPs) needed to form process group.
- Communication backend (here `gloo`): PyTorch supports 3 backends `gloo` and `mpi` are mostly used for CPU-CPU communication, while `nccl` is for GPU-GPU communication. Most popular setting is: `cpu:gloo,cuda:nccl`.

  For more information see https://pytorch.org/docs/stable/distributed.html, and for performance comparison: https://mlbench.github.io/2020/09/08/communication-backend-comparison/.

## Running distributed tasks

### 1. Locally using torchrun
As you might have noticed running this code on colab makes little sense, as we are running it on only single CPU.

The simplest way to run real distributed task is `torchrun`. You can try running the script above on many cores locally on your computer. To do this perform the following steps:
  
- Copy the example above to a local file. Let's say you have named it `ddp.py`.

- Set the `WORLD_SIZE` and `RANK` values from the environment variables with the same names (`WORLD_SIZE` and `RANK` respectively).

- Remove setting the `MASTER_ADDR` and `MASTER_PORT` environment variables - torchrun will set them for us.

Finally (assuming you have pytorch already installed) you can run the command:
```
torchrun --nnodes 1 --nproc-per-node 4 ddp.py
```

The first run will most likely fail, because every thread is trying to download the dataset into the same file, so all but one will fail. This can be prevented by asking only one worker to download the dataset and blocking remaining ones untill it finishes. But we can also just let it fail the first time and run the script again...

### 2. On computers in the labs using MPI

MPI (Message Passing Interface) is the message-passing standard widely used in distributed computing. There are several libraries implementing MPI, that usually come with a script for running applications in the distributed environment. We can use it to run our learning script on several computers in the labs. To do this you need to:

- If you are working on your personal computer, you need to establish ssh connection to any computer in the lab: firstly connect to the `students.mimuw.edu.pl` machine, then find any computer in the lab that is turned on using `lk_booted_pcs` command, and ssh to it from students machine.

- Create a virtual environment and install pytorch on it (if you don't have it already):
```
python -m venv venv
source venv/bin/activate
pip install torch
```

- Copy the code above to a file, remove the `MASTER_ADDR` and `MASTER_PORT` setting and set the `WORLD_SIZE` and `RANK` using respectively `OMPI_COMM_WORLD_SIZE` and `OMPI_COMM_WORLD_RANK` environment variables.

- Create a shell script `run_training.sh` that firstly sources your virtial environment and then runs the python script. It should look similar to (there should be no space between # and !):
```
#!/bin/bash
source venv/bin/activate
python3 ddp.py
```

- Find few (2-3 should be enough) other lab computers (using the `lk_booted_pcs`). Try to ssh to them - it sometimes doesn't work, then you need to find other computer. Moreover, MPI requires a passwordless ssh access to all hosts and the lab computers will remember your password for some time after a successfull ssh attempt. Save their hostnames (along with the hostname of your computer) to the `hosts` file. It should look like:
```
red01
orange07
purple11
```
If you want to, you can add `max_slots = x` option after each hostname, where `x` is the maximum number of workers you want to spawn on ths computer, e.g.:
```
red01 max_slots = 2
orange07 max_slots = 1
purple11 max_slots = 3
```

- Finally run the following command, where `-np 4` is the total number of workers (don't use too many - other people may be working on the same computer), `MASTER_ADDR=pink00` should be substituted by one of the hostnames in your hosts file and `MASTER_PORT=1234` need to be set to any free port on this computer.
```
mpirun --hostfile hosts -np 4 -x MASTER_ADDR=pink00 -x MASTER_PORT=1234 ./run_training.sh
```

Again, you may need to restart after downloading the data...

### Excercise 1

Run the training for several (at least 3) different configurations of the number of hosts and workers. Write down the processing times of those runs. Can you explain those numbers?

### 3. On Entropy using Slurm

Slurm is a scheduler, that is often used to manage resources in clusters and supercomputers. Thus, it is often used to run distributed machine learning jobs. You have most likely worked with it, as it is used in our faculty cluster - entropy. Let's run through how you can schedule a distributed learning task using slurm.

- As previously, you need to copy the example code above and save it on entropy under `ddp.py` name. Moreover you need to create virtual environment and instal pytorch.

- There are few ways to schedule tasks using slurm. We will here use batch file. So create file `slurm-ddp.sh` with the following contents (there are no spaces after #):

```sh
#!/usr/bin/env bash
#
#SBATCH --job-name=bml_lab1
#SBATCH --partition=common
#SBATCH --qos=your_quos
#SBATCH --time=5
#SBATCH --output=output.txt
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=2

export MASTER_PORT=12340
export WORLD_SIZE=${SLURM_NPROCS}

echo "NODELIST="${SLURM_NODELIST}
echo "WORLD_SIZE="${SLURM_NPROCS}

master_addr=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)
export MASTER_ADDR=$master_addr
echo "MASTER_ADDR="$MASTER_ADDR

source ~/venv/bin/activate

srun python3 ddp.py
```
  
  Where in `--qos=your_quos` substitute your_quos for (any) your QoS, you can find it using the `entropy_account_info` command.

  You can change the `--nodes` and `--ntasks-per-node` (be reasonable - we don't want to overcroud the cluster).

- Finally run `sbatch slurm-ddp.sh` (and retry after it fails on downloading dataset...).


NOTE: When working on the cluster, It is actually a good practice, to store input data in local storage. On most clusters (including entropy) the `/home` partition is shared among all nodes, so every time you want to access a file on another node, this file needs to be send over the network, what causes delays and uses the precious network bandwidth. So, to cultivate good practices, change the `root='dataset/'` on both dataset loading lines to `root='/local_storage_1/your_login/dataset/'`. And again, restart after dataset downloading fails (unless you use single task per node).

Let us now explain a bit what we have done to run a task on entropy and some other features of slurm that may get handy.

There are 3 commands that allow to schedule jobs in slurm:
- `srun` takes command as a parameter and executes this command on each worker after receiving the allocation.
- `salloc` will spawn console after receiving the allocation. User can run tasks from this console using the `srun` command.
- `sbatch` takes a shell script as an argument and executes this script (once - on the first allocated node) after receiving the allocation. The tasks can be run from this script using the `srun` command.

Every one of those commands has numerous flags that allow user to specify required resources. Some of the most often used ones are:
- `-t, --time`: the amount of time after which task will be killed.
- `--partition`: partitions in slurm are different job queues that may have access to different resources. For example entropy has separate partitions for A6000 and A100 GPUs.
- `--qos`: QoS for the job - QoS limits amount of certain resources available to the job.
- `-N, --nodes`: the number of nodes to run the job on. Can specify either: minimal number of nodes, range minimal-maximal number of nodes, or comma separated list of different node numbers.
- `-w, --nodelist`: a comma-separated list of the names of requested nodes
- `-n, --ntasks`: the number of tasks to run
- `--ntasks-per-node`, `--ntasks-per-core`, `--ntasks-per-gpu`, `--ntasks-per-socket`: pretty self explanatory
- `-G, --gpus`: the number of GPUs required for the job. Can also specify in a format: "gpu_type1:number,gpu_type2:number,...". NOTE: The allocation has to contain at least one GPU per node, or one of each GPU type per node if types are used. Use [heterogeneous jobs](https://slurm.schedmd.com/heterogeneous_jobs.html) if you need different nodes with different GPUs
- `--gpus-per-node`, `--gpus-per-socket`, `--gpus-per-task`: as above
- `--gres`: comma-delimited list of *generic consumable resources*. The format for each entry in the list is "name[[:type]:count]", for example "gpu:a100:7". The available generic consumable resources is configurable by the system administrator. A list of available generic consumable resources will be printed and the command will exit if the option argument is "help".

The full list of flags can be found in the [documentation](https://slurm.schedmd.com/sbatch.html).

Note, that if you are using `srun` inside an existing allocation (created either by `salloc` or `sbatch`), then you don't need to specify the parameters again - by default `srun` will execute the command once for every requested task.

Also, as you might have noticed, if you are using `sbatch`, the flags can be specified inside the batch script - then they have to be specified at the beginnging of the file (there can't be any uncommented line before), and the specification must start with `#SBATCH` followed by the flag and its value. The flags specified as parameters have priority over those specified in the batch script.

Some other usefull slurm commands include:
- `scancel` cancels submitted job
- `sattach` attaches standard output of a job to the console
- `sinfo` reports the state of partitions and nodes managed by Slurm
- `squeue` prints the cluster queue

More commands and some usefull examples can be found here: https://slurm.schedmd.com/quickstart.html

### Excercise 2

Schedule a job such that every task will invoke `/bin/hostname` where:

a) There is exactly one task on each of 3 nodes.

b) There are 9 tasks and 3 nodes.

c) Run 3 tasks on arnold and 3 on bruce

d) Ask `sbatch` or `salloc` for 3 nodes and 6 tasks. Invoke srun 2 times: one with 2 nodes and 2 tasks per node, second without any parameters.

Have you noticed something unusual in the results? Do you understand why they look this way?

### Excercise 3

Run the training for several (at least 4) different allocations in the entropy. Write down the processing times of those runs. Can you explain those numbers? How do they compare to the runs in the labs?

## Distributed communication in pytorch

To implement distributed algorithms, one needs a way of communication between processes. For this reason, Pytorch contains a `torch.distributed` library, that is similar, but a bit simpler than MPI. In the example above you have already seen how it is initialized (`dist.init_process_group`). Let's now do a quick overview of the basic features of this library.
- Point-to-point communication:
  - `send(tensor, destination)`: Sends the tensor to the destination rank and blocks until the tensor is received.
  - `recv(tensor, source=None)`: Receives tensor from the source rank (or any rank if unspecified). Blocks until the tensor is received.
  - `isend(tensor, destination)`, `irecv(tensor, source=None)`: Send and receive asynchronously. Return an object with `is_complete()` and `wait()` methods.

- Collective communication:
  - `broadcast(tensor, source)`: If my rank is equal to source, then broadcasts tensor, otherwise receives broadcasted tensor.
  - `reduce(tensor, destination, operation)`: Performs operation (one of values of the `ReduceOp` enum, `SUM` by default) on all tensors. The result will be available (in the `tensor` variable) only in destination.
  - `all_reduce(tensor, operation)`: As in `reduce`, but result is available in all the workers.
  - `gather(tensor, gather_list, destination)`: gathers tensors from all ranks into `gather_list` on the destination rank (`gather_list` needs to be allocated with enough length to accomodate all tensors).
  - `all_gather(output_tensor, input_tensor)`: like `gather`, but every worker receives the result in `output_tensor` (also needs to be preallocated).
  - `scatter(tensor, scatter_list, source)`: distributes exactly one tensor from the `scatter_list` on the source rank into the (pre-allocated) `tensor` variable on every worker (including source).
  - `barrier()`: blocks the process until all workers reach the barrier.

There are more available methods, and the presented methods have also other, optional parameters (e.g. all collective methods have `async_op` argument that makes them asynchronous if set to `True`). Full list of methods and parameters can be found in the [documentation](https://pytorch.org/docs/stable/distributed.html).

Below there are two simple usage examples. You can test them on either lab computers or entropy.

In [None]:
import os
import time
import torch
import torch.distributed as dist

WORLD_SIZE = int(os.environ['WORLD_SIZE'])
RANK = int(os.environ['RANK']) # int(os.environ['SLURM_PROCID'])
dist.init_process_group("gloo", rank=RANK, world_size=WORLD_SIZE)

if RANK == 0:
  print(f"Rank 0: Sending 42 to rank 1")
  dist.send(torch.tensor(42.0), 1)
elif RANK == 1:
  message = torch.zeros(1)
  dist.recv(message, 0)
  print(f"Rank 1: Received {message.item()} from rank 0")


In [None]:
import os
import time
import torch
import torch.distributed as dist

WORLD_SIZE = int(os.environ['WORLD_SIZE'])
RANK = int(os.environ['RANK']) # int(os.environ['SLURM_PROCID'])
dist.init_process_group("gloo", rank=RANK, world_size=WORLD_SIZE)

message = torch.tensor(RANK)
print(f"Rank {RANK}: Sending {message.item()}")
dist.reduce(message, 0, dist.ReduceOp.SUM)

if RANK == 0:
  print(f"Rank 0: Sum of all messages is {message.item()}")


### Excercise 4

a) Measure the time it takes to send the message between two ranks when they are:
  - on the same node
  - on a different nodes
  
  You can choose to do it in the lab or entropy. Repeat measurment many times to decrease variability.

b) Implement `reduce` using only point-to-point communication.

c) Implement `all_reduce` using only point-to-point communication.

d) Measure the time of your implementations against the library ones (you can choose to do it in the lab o entropy).

e) (Optional) Fix the problem with downloading the data in the example - we want the run to succeed also when there is no already downloaded data. Bonus: fix it also in the case when we are using local storage on entropy (and there is more than 1 worker on a node). Tip: `SLURM_LOCALID` environment variable may be handy.