# 1. Introduction

We implement distributed training based on the Needle framework in our final project. In distributed training, the workload to train a model is split up and shared among multiple devices like GPUs, called nodes. These nodes work in parallel to speed up model training. The two main types of distributed training are data parallelism and model parallelism. In short, data parallelism divides the training data into partitions; model parallelism segments the model into different parts that can run concurrently in different nodes [1]. This project implmements the data parallism apporach. We'll elaborate a bit more about data parallelism in the following sections.

In data parallelism, the training data is divided into partitions, where the number of partitions is equal to the total number of available nodes. The partitions are assigned to the available nodes.
The model is copied in each of these nodes and each nodes operates on its own subset of the partition. Each node calculates the gradients of the model parameters independently. The calculated gradients of the nodes are aggragated to obtain the average gradients. Finally, each node updates the model parameters using the average gradients. 

Here we also give a brief explanation of the mathematical theory of data parallelism. Let $w$ be the parameters of the model; $\frac{\delta{L}}{\delta{w}}$ is the original gradients of the batch of size $n$; $l_i$ is the loss for data point $i$ and $k$ is the number of nodes. Then we have
$$
\frac{\delta{L}}{\delta{w}}=\frac{\delta[\frac{1}{n}\sum_{i=1}^{n}l_i]}{\delta{w}} \\
                              =\frac{1}{n}\sum_{i=1}^{n}\frac{\delta{l_i}}{\delta{w}} \\
                              =\frac{m_1}{n}\frac{\frac{1}{m_1}\sum_{i=1}^{m_1}l_i}{\delta{w}} 
                               +\frac{m_2}{n}\frac{\frac{1}{m_2}\sum_{i=m_1+1}^{m_1+m2}l_i}{\delta{w}}
                               + \dots
                               + \frac{m_k}{n}\frac{\frac{1}{m_k}\sum_{i=m_{k-1}+1}^{m_{k-1}+m_{k}}l_i} {\delta{w}} \\
                              =\frac{m_1}{n}\frac{\delta{l_1}}{\delta{w}}+\frac{m_2}{n}\frac{\delta{l_2}}{\delta{w}}
                              +\dots+\frac{m_k}{n}\frac{\delta{l_k}}{\delta{w}}
$$
where $m_k$ is the number of data points assigned to node $k$, and 
$$
m_1+m_2+\dots+m_{k}=n
$$
If $m_1=m_2=\dots=m_k=\frac{n}{k}$, we have
$$
\frac{\delta{L}}{\delta{w}}=\frac{1}{k}[\frac{\delta{l_1}}{\delta{w}}+\frac{\delta{l_2}}{\delta{w}}+\dots+\frac{\delta{l_k}}{\delta{w}}]
$$
where $\frac{\delta{l_k}}{\delta{w}}$ means the gradients calculated by node $k$ based on the data points $\{m_{k-1}+1,m_{k-1}+2,\dots,m_{k-1}+m_k\}$.
According to the above equation, we could know that the average gradients of all the nodes are equal to the original gradients [2]. 

We have two implementations of distributed trainig, which are based on different communication frameworks, i.e., mpi4py [3] and nccl [4]. The source code of the project can be found here: [TODO]. We intorduce these two implementations in section 2 and section 3 respectively.

To test distributed training, you need to run this notebook with multiple GPUs. Run  `nvidia-smi` to check how many GPUs are available on your machine. 

In [3]:
!nvidia-smi

Mon Jan  9 15:08:52 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.161.03   Driver Version: 470.161.03   CUDA Version: 11.4     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA A100-SXM...  On   | 00000000:4E:00.0 Off |                    0 |
| N/A   25C    P0    50W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   1  NVIDIA A100-SXM...  On   | 00000000:B7:00.0 Off |                    0 |
| N/A   28C    P0    53W / 400W |      0MiB / 40536MiB |      0%      Default |
|       

Clone the cod and install necessary packages:

In [4]:
# from google.colab import drive
# drive.mount('/content/drive')
# %cd /content/drive/MyDrive/

# !git clone https://github.com/jzh18/hw4.git
# !pip3 install --upgrade --no-deps git+https://github.com/dlsys10714/mugrade.git
# !pip3 install pybind11
# !pip3 install mpi4py
# %cd /content/drive/MyDrive/hw4

Prepare data:

In [5]:
# Download the datasets you will be using for this assignment

import urllib.request
import os

!mkdir -p './data/ptb'
# Download Penn Treebank dataset
ptb_data = "https://raw.githubusercontent.com/wojzaremba/lstm/master/data/ptb."
for f in ['train.txt', 'test.txt', 'valid.txt']:
    if not os.path.exists(os.path.join('./data/ptb', f)):
        urllib.request.urlretrieve(ptb_data + f, os.path.join('./data/ptb', f))

# Download CIFAR-10 dataset
if not os.path.isdir("./data/cifar-10-batches-py"):
    urllib.request.urlretrieve("https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz", "./data/cifar-10-python.tar.gz")
    !tar -xvzf './data/cifar-10-python.tar.gz' -C './data'

Compile the code:

In [6]:
!make

-- Found pybind11: /home/x_huzha/.conda/envs/dlsys/lib/python3.8/site-packages/pybind11/include (found version "2.10.2")
  Policy CMP0074 is not set: find_package uses <PackageName>_ROOT variables.
  Run "cmake --help-policy CMP0074" for policy details.  Use the cmake_policy

  Environment variable CUDA_ROOT is set to:

    /software/sse/manual/CUDA/11.3.1_465.19.01

  For compatibility, CMake is ignoring the variable.

-- Determining NCCL version from /software/sse/manual/CUDA/11.3.1_465.19.01/include/nccl.h...
-- NCCL version: 2.11.4

-- Found NCCL (include: /software/sse/manual/CUDA/11.3.1_465.19.01/include, library: /software/sse/manual/CUDA/11.3.1_465.19.01/lib64/libnccl.so)
-- Found cuda, building cuda backend
Mon Jan  9 15:10:15 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.161.03   Driver Version: 470.161.03   CUDA Version: 11.4     |
|-------------------------------+----------------------+----------------------+
| 

# 2. Distributed training with mpi4py

In this section, we introduce the usage and implementation details of distributed trainig based on mpi4py framework.

## 2.1 Usage

In this section, we demostrate how to use distributed training.

In this project, we tried to create the process similar to what horovod provides. The training process will take place in different process at the same time, and each process would communicate with each other through Message Passing Interface (MPI) protocol.

Let's see how it works.

In the file `train_resnet.py`, we use distributed training to train a ResNet9 model. Let's walk through the code in `train_resnet.py` briefly to show how to use distributed training.

Firstly, import the packages we need.
```
import sys
import numpy as np
sys.path.append('./python')
sys.path.append('./apps')
import needle as ndl
from simple_training import train_cifar10, evaluate_cifar10
from models import ResNet9
```

After importing what we need from the basic needle framework, we now can import the ddp (distributed data parallel) from apps:
```
import apps.ddp as ddp
```

Here, we are going to initialize everything we need:

```
# this function initialize the ddp functionality
# and return a desired cuda device
rank, device = ddp.init()

dataset = ndl.data.CIFAR10Dataset("data/cifar-10-batches-py", train=True)

#  this function do the partition for dataset and
#  returns a dataloader and batch_size for the current process
train_dataloader, bsz = ddp.partition_dataset(
    dataset=dataset, batch_size=128, device=device, dtype='float32')

model = ResNet9(device=device, dtype="float32")
#  Before training, we must broadcast the parameters to different process
ddp.broadcast_parameters(model)

model.train()
opt = ndl.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)
#  After defining the optimizer, we need to call this to
#  make the optimizer work for distributed class
opt  = ddp.DistributedOptimizer(opt)

loss_fn = ndl.nn.SoftmaxLoss()
```

After the initialization, the training step is very simple. Here we can see that the training process is similar to what we normally do in needle framework:
```
n_epochs = 1
for i in range(n_epochs):
    if rank == 0:
        print(f'epoch: {i+1}/{n_epochs}')
    for batch in train_dataloader:
        opt.reset_grad()
        X, y = batch
        out = model(X)
        correct = np.sum(np.argmax(out.numpy(), axis=1) == y.numpy())
        loss = loss_fn(out, y)
        loss.backward()
        opt.step()
```

Using pytorch to find how many gpu available:

In [7]:
import torch
num_of_gpus = torch.cuda.device_count()
print(num_of_gpus)

  from .autonotebook import tqdm as notebook_tqdm


3


We have 3 GPUs here. Now, let's train the ResNet model using distributed training with the 3 GPUs!

In [9]:
!mpiexec -np 3 python apps/distribute_training.py

Use cuda: 1
Use cuda: 0
Use cuda: 2
partitioned dataset length: 16666
partitioned dataset length: 16666
partitioned dataset length: 16666
0  correct: 0.34255370214808595  loss: [1.850796]
0  correct: 0.3422536901476059  loss: [1.8461978]
0  correct: 0.35077403096123844  loss: [1.8468144]
Time: 30.70683526992798
0.21972878915156607 [4.4106684]
0.21408856354254172 [4.6044602]
0.22146885875435018 [4.3124213]


## 2.2 Implementation

In this section, we show the implementation details of distributed training in our project. Most of the code relevant to distributed training are located in the file `apps/ddp.py`.


The `DataPartitioner` in the file divides a dataset into multiple partitions with the size specified by users. The code are shown as below:
```
class DataPartitioner(object):
    """ Partitions a dataset into different chuncks. """

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])
```



The `broadcast_parameters` function broadcasts the model parameters to all the nodes. The following shows the code of `broadcast_parameters` function:
```
def broadcast_parameters(model, root_rank=0):
    if is_nccl:
        // use nccl as backend
        ...
    else:
       // use mpi4py as backend
        for p in model.parameters():
            p_data = p.numpy()
            p_data = comm.bcast(p_data, root=0)
            p.data = ndl.Tensor(p_data, device=device, dtype=p.dtype)
```

The `DistributedOptimizer` class uses the all-reduce functionality to aggrate the gradients calculated by each nodes and calculate the mean of these gradients. The model of each node update the model parameters based on the mean gradients. The code of `DistributedOptimizer` are shown below.
```
class DistributedOptimizer(ndl.optim.Optimizer):
    def __init__(self, opt):
        super().__init__(opt.params)
        self.opt = opt

    def step(self):
        self.average_gradients()
        self.opt.step()

    def average_gradients(self):
        for p in self.params:
            if p.grad is None:
                continue
            sendbuf = np.ascontiguousarray(p.grad.numpy())
            recvbuf = np.empty_like(sendbuf, dtype=p.dtype)
            comm.Allreduce(sendbuf, recvbuf, op=MPI.SUM)
            recvbuf = recvbuf / world_size
            p.grad.data = ndl.Tensor(recvbuf, device=device, dtype=p.grad.dtype)
```


In order to select the GPU we want to run the training workloads, we add a function named `SetDevice(int32_t device_id)` in `src/ndarray_backend_cuda.cu`. Users need to specify the device_id when invoking `needle.cuda(device_id)`. For example,
`needle.cuda(1)` return a device which represents GPU 1. The code of `SetDevice(int32_t device_id)` are shown below.
```
void SetDevice(int id)
{
    mess.localRank=id;
    cudaSetDevice(id);
}
```



# 3. Distributed training with NCCL
In section two, we implemented distributed trainging using MPI communication API. This type of communication is very inconvenient, we need to turn the data into numpy, and use CPU to communicate. It doesn't take advantage of multiple GPUs. Therefore it is essential to use the NVIDIA Collective Communication Library(NCCL), which is developed by NVIDIA official.

To enable direct communication between GPUs in NCCL, we should crreate a communicator first. In terms of concrete implementation, first we need to call the `ncclGetUniqueId()` function, it will return an ID, which will be used by all processees and threads to synchronize and understand they are part of the same communicator. Then we can use `ncclCommInitRank()` to create the communicator objects. The key issue is that we need to broadcast ID to all participating threads and processes using any CPU communication system. In the original MPI with CUDA program, we can call the CUDA-based MPI API to finish the broadcast. But in our project, we call CUDA program via Python, MPI is also based on Python. As a result, we can't use the CUDA-based MPI API but we can use the Python-based. 

Our solutions are as follows:

1. Python program calls CUDA API, CUDA program gets the ID and returns it to Python.
2. Python program calls Python-based MPI API to broadcast the ID.
3. All processees and threads get the same ID, calls CUDA API to establish a connection.


The relevant codes arre as follows:

Python code:
```
def init():
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank() # call MPI API to get world_size and rank
    device = ndl.cuda(rank) # choose different GPUs
    print(f'Use cuda: {rank}')

    if rank==0:
        vec = device.get_id() # get ID
    else:
        vec = None
    vec = comm.bcast(vec, root=0) # broadcast ID

    device.init_nccl(vec,rank,size) # establish a connection
    return rank, size, device
```

CUDA code:
```
struct CudaCommAndStream{
    int nRanks,localRank,myRank;
    ncclUniqueId id;
    ncclComm_t comm;
    cudaStream_t s;
}mess;
void SetDevice(int id) # set different device
{
    mess.localRank=id;
    cudaSetDevice(id);
}
std::vector<uint8_t> GetId()
{
    ncclGetUniqueId(&mess.id); # get id 
    auto vec = std::vector<uint8_t>(reinterpret_cast<uint8_t*>(&mess.id),reinterpret_cast<uint8_t*>(&mess.id) + NCCL_UNIQUE_ID_BYTES); # put id into vector
    return vec;
}

void InitNccl(std::vector<uint8_t> vec,int rank,int size) 
{
    mess.nRanks = size;
    mess.myRank = rank;
    std::memcpy(&mess.id, vec.data(), vec.size()); # change vector to id
    ncclCommInitRank(&mess.comm, mess.nRanks, mess.id, mess.myRank); # establish a connection
    cudaStreamCreate(&mess.s);
}
PYBIND11_MODULE(ndarray_backend_cuda, m) {
    ...
    m.def("set_device", SetDevice);
    m.def("get_id", GetId);
    m.def("init_nccl", InitNccl);
}

```

## Usage
Before running, we should install NCCL and modify CMakeLists.txt to find library path of NCCL in order to compile successfully.

In [None]:
!make

Using nvidia-smi to find how many gpu available

In [1]:
!nvidia-smi

Mon Jan  9 07:09:13 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.39       Driver Version: 460.39       CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:1A:00.0 Off |                  Off |
| N/A   27C    P0    24W / 250W |      2MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla P100-PCIE...  Off  | 00000000:1B:00.0 Off |                  Off |
| N/A   25C    P0    25W / 250W |      2MiB / 16280MiB |      0%      Default |
|       

Using mpiexec to run the code

In [10]:
!mpiexec -np {num_of_gpus} python apps/distribute_training.py

Use cuda: 2
Use cuda: 3
Use cuda: 7
Use cuda: 4
Use cuda: 5
Use cuda: 1
Use cuda: 0
Use cuda: 6
partitioned dataset length: 6250
partitioned dataset length: 6250
partitioned dataset length: 6250
partitioned dataset length: 6250
partitioned dataset length: 6250
partitioned dataset length: 6250
partitioned dataset length: 6250
partitioned dataset length: 6250
0  correct: 0.33664  loss: 0  correct: [1.8683679]
0  correct: 0  correct: 0.33888  loss: 0.33664  loss: 0  correct: 0.34624  loss: 0.3392  loss: 0  correct: [1.8647475]
[1.8663205]
0.33904  loss: [1.8584825]
[1.8618884]
0  correct: [1.8806661]
0.3328  loss: Time: 43.870232820510864
[1.8832192]
0  correct: 0.33728  loss: [1.8480227]
0.24752 [5.1031213]
0.25008 [5.1413326]
0.25008 0.25232 [5.1249537]
[5.0528364]
0.24896 [5.10206]
0.2496 [4.9337025]
0.24448 [5.0527062]
0.25712 [5.2262936]


Compare efficiency with simple training

In [9]:
!mpiexec -np 1 python apps/distribute_training.py

Use cuda: 0
partitioned dataset length: 50000
0  correct: 0.34488  loss: [1.8502939]
Time: 49.71820831298828
0.2096 [4.8264384]
