# Distributed Data Parallelism
---

In this notebook, you learn and understand the concept of distributed data parallelism. The content will also work you through various approaches to distributed data parallel training, which include stochastic gradient descent, model synchronization, hyperparameter tuning, parameter server architecture, and All-Reduce. Lastly, you will also be able to learn how to implement these concepts with `torchrun` and `slurm` using `multi-GPU` and `multi-node` computing.

#### Overview
The standard practice for speeding up the model training process is parallel execution. The most popular in-parallel model training is called `data parallelism`. In data parallel training, each GPU/node holds the full copy of a model. Then, it partitions the input data into disjoint subsets, where each GPU/node is only responsible for model training on one of the input partitions. Since each GPU only trains its local model on a subset (not the whole set) of the input data, we need to conduct a procedure called model synchronization periodically. Model synchronization ensures that, after each training iteration, all the GPUs involved in this training job are on the same page. This guarantees that the model copies that are held on different GPUs have the same parameter values.  `Data parallelism` can also be applied at the model serving stage. Given that the fully trained model may need to serve many inference tasks, splitting the inference input data can also reduce the end-to-end model serving time. One significant difference compared to data parallel training is that in data parallel inference, all the GPUs/nodes involved in a single job do not need to communicate anymore, which means that the model synchronization phase during data parallel training is completely removed. 

### Fundamentals of Data Parallelism

Let’s examine some fundamental theories about data parallel training, such as `stochastic gradient descent (SGD)` and `model synchronization`. But before that, let's examine the system architecture for data-parallel training. 

<center><img src="images/data-parallel-training-workflow.png" width="350px" height="350px" alt-text="workflow"/></center>
<center>Simplified workflow of data parallel training</center></br>

The simplified workflow for data-parallel training is depicted in the diagram above, emphasizing two bandwidths (the `data loading bandwidth` and the `model training bandwidth`). As we can see, the main difference between single-GPU training and data parallel training is that we split the data loading bandwidth between multiple workers/GPUs (shown as blue arrows in the diagram). In data parallel training, different batches of input data are trained on different GPUs. Consequently, none of the GPUs can see the full training data. Thus, traditional gradient descent optimization cannot be applied here. A stochastic approximation of gradient descent is needed, which can be used in the single-GPU case. One popular stochastic approximation method is SGD. Also, besides the three usual steps in sing-GPU training (data loading, training, and model update), data-parallel training includes an additional step called `model synchronization,` a concept that collects and aggregates local gradients that have been generated by different GPUs/nodes.

#### Stochastic gradient descent (SGD)

Theoretically, traditional gradient descent (GD) for single-GPU training can be estimated by calculating the gradients from each data point of the training dataset, where `g_i` is the gradient. The `i-th` training data point can be calculated as follows: 
`g_i = dL(w_i) / dw`  
Then, we sum up all the gradients that have been calculated by all the training data points (g_all += g_i) and then do a single-step model update with `w = w - a*g_all` 

```python

for i in dataset:
    g_all += g_i
    w = w - a*g_all 
```
However, in data parallel training, each GPU can only see part of (not the full) training dataset, which makes traditional GD optimization impossible since we cannot calculate g_all in this case. Thus SGD is used. With SGD, instead of updating the model weights (w) after generating the gradients from all the training data, SGD allows for model weight updates using a single or a few training samples (for example, a mini-batch). With this, workers in data-parallel training can update their model weights using their local (not global) training samples. 

```python
for i in dataset:
    w = w - a*g_i 
```

This implies that the model parameters of different workers can differ after each training iteration. Therefore, periodic `model synchronization` is needed to guarantee that all the workers are on the same page, meaning that they maintain the model parameters after each training iteration. 

#### Model synchronization 

Model synchronization is needed to force all the workers to have the same view of the model parameters. Let's consider a simple four-GPU setting in a data-parallel training job. Each GPU maintains a copy of the full ML model locally inside its on-device memory. Let's assume all the GPUs are initialized with the same model parameters. After the first training iteration, each GPU will generate its local gradients as `∇𝑊𝑖`, where `i` refers to the`i-th` GPU. Given that they are training on different local training inputs, all the gradients from different GPUs may differ. To guarantee that all four GPUs have the same model updates, we need to conduct model synchronization before the model parameter updates. Model synchronization does two things: 

- Collects and sums up all the gradients from all the GPUs in use, as shown here:

```python

  ∇𝑊 = ∇𝑊1 + ∇𝑊2 + ∇𝑊3+ . . . +∇𝑊𝑁 
```
  
- Broadcasts the aggregated gradients to all the GPUs

Then, we can use these aggregated gradients, ∇𝑊, for the model updates, which guarantees that the updated model parameters remain the same after this first data parallel training iteration. This process is repeated to guarantee that the model parameters remain the same after every training iteration in a particular data parallel training job.  Empirically, this model synchronization mainly has two different variations for the real system implementations: the `parameter server architecture` and the `All-Reduce architecture.` 

#### Hyperparameter tuning 

- Global batch size: The global batch size refers to how many training samples will be loaded into all the GPUs for training simultaneously. In data parallel training, this global batch size is the first hyperparameter that must be searched or fine-tuned. If the global batch size is too large, the training model may not converge. If it is too small, it is just a waste of distributed computational resources.
  
- Learning rate adjustment: Recent research suggests that, for large-batch data parallel training, we should have a warmup stage at the beginning of the training stage. This warmup policy suggests that we start data parallel training with a relatively small learning rate. After this warmup period, we should gradually increase the learning rate for several epochs of training and then stop increasing it by defining a peak learning rate.

- Model synchronization schemes: This scheme is required to initialize a group of processes to run our data parallel training job in a distributed manner. Each process will be responsible for handling model synchronization on one machine or one GPU.  A PyTorch example to initialize a process group using model synchronization backend is as follows:

```python
	torch.distributed.init_process_group(backend='nccl', init_method = '...', world_size = N, timeout = M) 
```
PyTorch mainly supports three different communication backends: `NCCL (GPU only), Gloo (CPU and partial support GPU), and MPI (CPU only)`. 



### Parameter Server Architecture

The summary of the `parameter server` paradigm is that model parameters are kept in one place (a centralized node). Whenever a GPU/node needs to conduct model training, it pulls the parameters from the centralized node, trains the model, and then pushes back model updates to the centralized node. Model consistency is guaranteed since all the GPUs/nodes pull from the same centralized node. The `parameter server architecture` mainly consists of two roles: `parameter server` and `worker`. The `parameter server` is the master node in the traditional Master/Worker architecture. Workers are the computer nodes or GPUs responsible for model training. The total training data is split among all the workers. Each worker trains its local model with the assigned training data partition. The duties of the parameter server are twofold:

- Aggregate model updates from all the workers.
- Update the model parameters held on the parameter server. 

<center><img src="images/parameter-server-arch.png" width="350px" height="350px" alt-text="workflow"/></center>

The screenshot above is a simplified parameter server architecture with two workers and one parameter server in the system. The whole system works through the following four stages: 

1. Pull Weights: All the workers pull the model parameters/weights from the centralized parameter server.
1. Push Gradients: Each worker trains its local model with its local training data partition and generates local gradients. Then, all the workers push their local gradients to the centralized parameter server.
1. Aggregate Gradients: After collecting all the gradients sent from the worker nodes, the parameter server sums up all the gradients.
1. Model Update: Once the aggregated gradients have been calculated, the parameter server uses them to update the model parameters on this centralized server.

These four steps are executed among the parameter server and workers for each training iteration and looped through for the whole model training process. However, the communication (pull weights and push gradients) in the parameter server architecture can often be the training bottleneck. Details on this are beyond the scope of this notebook.




### All-Reduce Architecture 

The idea of the All-Reduce paradigm is that every GPU/node keeps a copy of the model parameters so the model copies are forced to synchronize periodically. Each GPU trains its local model replica using its own training data partition. After each training iteration, the model replicas that are held on different GPUs can be different since they are trained with different input data. Therefore, a global synchronization step is injected after each training iteration. This averages the parameters that are held on different GPUs so that model consistency can be guaranteed in this fully distributed manner. In the `All-Reduce` Architecture, every node/GPU is equivalent, and all of them are worker nodes/GPUs. The burden of implementing communication protocols is left to standard collective communication libraries like NCCL. The All-Reduce paradigm is borrowed from the traditional Message Passing Interface (MPI) domain. Before illustrating All-Reduce, let’s briefly look at the Reduce collective primitive. The Reduce operator `(+)` is used to aggregate the values from different nodes/GPUs and store them in a single node/GPU. That is, there is a node/GPU that maintains the aggregated value. Some of the most common Reduce operators are `Sum,` `Averaging,` and `Multiplication.` 

<center><img src="images/all-reduce-primitive.png" width="300px" height="300px" alt-text="all-reduce"/></center>
<center>The All-Reduce primitive in a three-worker setting<center/><br/>
    
All-Reduce allows all the nodes/GPUs to get the same aggregated value. The All-Reduce function allows all the workers to get the aggregated gradients from all the worker nodes. This gradient aggregation is the model synchronization procedure in the All-Reduce architecture. It guarantees that all the workers use the same gradient to update the model in the current training iteration. All-Reduce uses all-to-all communication. Every worker needs to send their value to all the other workers. The All-reduce function is implemented as `Ring All-Reduce.` Ring All-Reduce has been implemented using `NVIDIA NCCL,` `Uber Horovod,` and `Facebook Gloo.` 

#### Ring All-Reduce Illustration

For simplicity purposes, let’s illustrate with three workers as follows.

<center><img src="images/RingAll-reduce.png" width="700px" height="700px" alt-text="all-reduce"/></center>


- **Step 1**: `Worker 1` has a value of `a`, `Worker 2` has a value of `b`, and `Worker 3` has a value of `c.`  
- **Step 2**: `Worker 1` has a value of `a`. `Worker 1` passes this value, `a`, to `Worker 2`. `Worker 2` gets `a+b.` `Worker 3` still has a value of `c.`
- **Step 3**: `Worker 1` has a value of `a`. `Worker 2` has a value of `a+b,` which it passes to `Worker 3.` `Worker 3` now has a value of `a+b+c.`
- **Step 4**: `Worker 3` passes `a+b+c` to `Worker 1`. `Worker 1` now has `a+b+c.` `Worker 2` now has `a+b`. `Worker 3` now has `a+b+c.`
- **Step 5**: `Worker 1,` who has `a+b+c,` passes `a+b+c` to `Worker 2.` `Worker 2` now has `a+b+c,` and `Worker 3` has `a+b+c` as well.




### Building a Data Parallel Training

This section walks you through the process of implementing data parallelism using two different settings: single-machine multi-GPU and multi-machine/nodes multi-GPU. Also, we will illustrate how to checkpoint the model and its relevant metadata during training. We will focus on using a simple CNN as our model and MNIST as our dataset. The data parallel training pipeline of each worker consists of six steps:

1. Input Pre-Processing: Given the raw training input data, we need to pre-process it. Common input pre-processing techniques include image crop, image flip, input data normalization, and many more.
2. Input Data Partition: Split the whole input dataset into multiple chunks and assign each chunk to one accelerator for the model training process.
3. Data Loading: Load the data partition into the accelerators we use to train the model.
4. Training: Train the model locally with its training input data.
5. Model Synchronization: After generating the local gradient, synchronize it with the other worker nodes.
6. Model Update: After getting the synchronized gradients, update the local model parameters with the aggregated gradients.
7. Repeat Steps 4 to 6 for the successive training iterations
    
For the implementation, we will consider two hardware settings. First, we will use a single machine with multiple GPUs. In this setting, all the in-parallel training tasks can be launched using either a single process or multiple processes. The second type is multiple machines/nodes with multiple GPUs. In this setting, we must configure the network communication portals among all the machines. We also need to form a process group to synchronize both the cross-machine and cross-GPU training processes.

#### Single-Machine/Node multi-GPU

Let’s check the hardware configuration by running the command in the cell below

In [None]:
!srun --partition=gpu -n2 --gres=gpu:4 nvidia-smi

**Steps**:


First, we need to set the default device/accelerator in the system as follows: 

```python
import torch
device = torch.device ("cuda" if torch.cuda.is_available() else "cpu") 
```

Second, with our pre-defined model, we must pass the model to all the available devices as follows: 

```python
model = torch.nn.DataParallel(model)
```

Then, PyTorch will conduct the data parallel training under the hood. When you run the whole data parallel training job, the system will launch a single process with multiple threads. Each thread is responsible for running training tasks on a single GPU. We set the default GPU to Worker 1. `nn.DataParallel()` works as follows: 

- We initialize the model on Worker 1 and let Worker 1 split the input training data.
- Worker 1 will broadcast the model parameters to all the other workers (that is, Worker 2 and Worker 3). In addition, Worker 1 will also send different input data partitions to different workers (Input1 to Worker 2 and Input2 to Worker 3).
- Then, we can start the data parallel training on all the devices. 
During each training iteration, Worker 1 needs to handle the extra operations besides its local training:

<center><img src="images/model-synchronization.png" width="350px" height="350px" alt-text="all-reduce"/></center>
<center> Model synchronization in nn.DataParallel() </center></br>

After each worker generates its local gradients (for example, `Gradients0` on `Worker 1` and `Gradients1` on `Worker 2`), they send their local gradients to Worker 1. After Worker 1 aggregates all the gradients from all the workers as `Gradients_sum,` Worker 1 broadcasts `Gradients_sum` to all the other workers. 

When executing your program, it is also possible to specify the devices you want to use by specifying another argument, such as `device_ids.` For example, if you just want to use two GPUs, you can pass in the parameters as follows:
```python
model = torch.nn.DataParallel(model, device_ids=[0,1]) 
``` 
The implementation of `nn.DataParallel()` involves many `all-to-one` and `one-to-all` communications, which makes the default root node/GPU the communication bottleneck. Thus, we should adopt a scheme that evenly distributes the workloads and network communications. 


Let's follow the step below to run our `nn.DataParallel()` [code](../source_code/dp/main.py) using one node `(--nnodes=1)` with four GPUs `(--nproc-per-node=4)`

**Steps:**

- Open the terminal from the head node.
- execute the `srun` commands to get on the computing node (the `-w dgx01` is a flag to target a specific node, and it is optional):
  
```text
tade-headnode:~$ srun -p gpu -N 1 --gres=gpu:4 -w dgx01 --pty bash
```
- On the GPU node, run the command below to execute the `nn.DataParallel()` [code](../source_code/dp/main.py).

```text

!cd ../source_code && srun -p gpu -N 1 --gres=gpu:4  torchrun --nnodes=1 --nproc-per-node=4  dp/main.py
```

In [None]:
!cd ../source_code && srun -p gpu -N 1 --gres=gpu:4  torchrun --nnodes=1 --nproc-per-node=4  dp/main.py

**Like output on DGX A100**:

```python
W0210 12:48:11.410000 1059798 site-packages/torch/distributed/run.py:792] 
W0210 12:48:11.410000 1059798 site-packages/torch/distributed/run.py:792] *****************************************
W0210 12:48:11.410000 1059798 site-packages/torch/distributed/run.py:792] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0210 12:48:11.410000 1059798 site-packages/torch/distributed/run.py:792] *****************************************
Using  4 GPUs for data parallel training
Epoch 0
Using  4 GPUs for data parallel training
Epoch 0
NCCL version 2.21.5+cuda12.4
NCCL version 2.21.5+cuda12.4
Using  4 GPUs for data parallel training
Epoch 0
Using  4 GPUs for data parallel training
Epoch 0
...
batch 468, loss 0.7718946933746338
Training Done!
batch 466, loss 1.082253336906433
batch 464, loss 1.0795007944107056
batch 457, loss 0.5857658982276917
batch 467, loss 0.6964226365089417
...
Training Done!
batch 466, loss 0.7101627588272095
...
batch 460, loss 0.8381197452545166
batch 468, loss 0.7506725192070007
Training Done!
...
batch 466, loss 0.7794694304466248
batch 467, loss 1.0738880634307861
batch 468, loss 0.5498190522193909
Training Done!
```

#### Multi-machine/Node multi-GPU Distributed Training

PyTorch distributed training on CIFAR-10 classification using DistributedDataParallel wrapped ResNet models for this section. First, we will discuss multi-process implementations for multiple machine/Node cases. Before we proceed, we need to define some concepts for multi-machine/node cases:

- **rank**: A unique sequence number for all the GPUs in all the machines/node
- **local_rank**: A sequence number for the GPUs within a machine/node
- **world_size**: A count of all the GPUs in all the machines/nodes, which is just the total number of GPUs among all the machines/nodes

For example, we have two machines, each with two GPUs. The `local_rank` for two GPUs within each machine will be `0 and 1`. Rank numbers are unique per GPU among all the machines/nodes; hence, the rank number will range from 0 to 3. Since there are four GPUs in total, `world_size` is `4`.  To implement this approach:

- `nn.parallel.DistributedDataParallel()` is used instead of `nn.DataParallel()`
  
```python 
from torch.nn.parallel import DistributedDataParallel as DDP 
```
- Also import other relevant libraries for distributed data-parallel training and torch's multi-processing library as shown in the code [here](../source_code/test_ddp.py)
- If we are not running a slurm script, we can define several system setups, such as network environments for the master node IP address and port number within the code. For a one-size-fits-all approach, it is better to get the master node IP address dynamically within the slurm script, as we don't know the exact node that will be selected.
  
```python
import os def net_setup():
os.environ['MASTER_ADDR'] = '172.31.26.15' 
os.environ['MASTER_PORT'] = '12345' 
```

- Also, parse some important parameters from the user, number of epochs `(--num_epochs),` batch-size `(--batch_size),` learning rate `(--learning_rate),` etc. Other parameters, such as the number of GPUs `(--gpus),` number of machines or nodes `(--nnodes),` node rank `(--node_rank),` can be passed through the `torchrun` command.  

```python
def main(): 
    ... 
    parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
    parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
    parser.add_argument("--learning_rate", type=float, help="Learning rate.", default=learning_rate_default)
    parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
    parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
    parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default) 
    ...
```

Next, we need to define our distributed training function. 
-	Set torch_seed. manual_seed is used to guarantee that we initialize the same model weights among all the worker nodes we have.

```python 
 def set_random_seeds(random_seed=0):

    torch.manual_seed(random_seed)
    ...
    np.random.seed(random_seed)
    random.seed(random_seed)
```
- Get the local rank

```python
if local_rank is None:
        local_rank = int(os.environ["LOCAL_RANK"])
        print('Local rank ', local_rank)
```
      
- Initializes the distributed backend(eg NCCL or gloo) which will take care of synchronizing nodes/GPUs 
  
```python

torch.distributed.init_process_group(backend="nccl") 
```

- Encapsulate the model on the GPU assigned to the current process and Wrap the model with DistributedDataParallel() including each GPU's local_rank number.

```python
    model = torchvision.models.resnet18(pretrained=False)

    device = torch.device("cuda:{}".format(local_rank))
    model = model.to(device)
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
```

- Prefetch data (Download should be set to be False, because it is not multiprocess safe) and restrict data loading to a subset of the dataset exclusive to the current process:

```python
    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=False, transform=transform)
    ...
    train_sampler = DistributedSampler(dataset=train_set)

    train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
```


We can train the model with all these setups added by Loop over the dataset multiple times and save and evaluate model routinely.

```python
    for epoch in range(num_epochs):

        print("Local Rank: {}, Epoch: {}, Training ...".format(local_rank, epoch))

        # Save and evaluate model routinely
        if epoch % 10 == 0:
            if local_rank == 0:
                accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
                torch.save(ddp_model.state_dict(), model_filepath)
                ...

        ddp_model.train()

        for data in train_loader:
            ...
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            ...
            loss.backward()
            optimizer.step()
```

With that, we have finished implementing data parallel training in multimachine/nodes and multi-GPU settings. 

There are two ways to run the code
#### 1.  Run `torchrun` command as shown below on two separate nodes. 
  
```python
 Master node:
 torchrun  --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="172.31.26.15" --master_port=1234 test_ddp.py

 Worker node:

 torchrun  --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="172.31.26.15" --master_port=1234 test_ddp.py
```
Description:

```text
 --nproc_per_node: number of GPUs per node
 --nnodes: number of nodes
 --node_rank: to specified master node as rank 0 and works node as 1
 --master_addr: IP address of the master node
```
With this approach, you must know the master node's IP address by running the command `hostname—i` on its compute node. For example, you can quickly execute a srun command to move to the computing node (i.e. for the master node) to get the node IP Adresss. 

```text
 > srun -p gpu -N 1 --gres=gpu:4 --pty bash
 > hostname -i
```
**Note**: *partition -p might be differs from cluster to cluster. You can target a particular node by specifying `-w` within the srun command (e.g. -w dgx01)*



Let's follow the step below to run our `nn.parallel.DistributedDataParallel()` [code](../source_code/test_ddp.py) using two nodes `(--nnodes=2)` with four GPUs `(--nproc-per-node=4)`

**Step 1:**
- Open a terminal from the Jupyter Notebook.
- execute these commands to get to the computing node:
  
 `srun -p gpu -N 1 --gres=gpu:4  --pty bash`
 
- Run the [master node script](../source_code/slurm/ddp_master_node.sh) using the command below. 
  
`workspace/source_code/slurm/ddp_master_node.sh`

**Likely output**:

```python

Master IP Address:  10.184.92.71
W0312 06:37:46.399000 2230576 site-packages/torch/distributed/run.py:792] 
W0312 06:37:46.399000 2230576 site-packages/torch/distributed/run.py:792] *****************************************
W0312 06:37:46.399000 2230576 site-packages/torch/distributed/run.py:792] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0312 06:37:46.399000 2230576 site-packages/torch/distributed/run.py:792] *****************************************
Local rank  3
Local rank  0
Local rank  1
Local rank  2
...
```
The master node will not proceed but will wait until the worker node starts.
- Next, copy the master IP address from the terminal (e.g. 10.184.92.71)
- Open the worker node [script file](../source_code/slurm/ddp_worker_node.sh) and paste the master IP address (e.g. `MasterIPdress=10.184.92.71`). Then, press Ctrl + S to save the file
- Open another terminal from the Jupyter Notebook
- Execute this command to get to the computing node: `srun -p gpu -N 1 --gres=gpu:4 --pty bash`
 
- Run the worker node script using this command: `workspace/source_code/slurm/ddp_worker_node.sh`
  
**Likely output**

```python
...
W0210 11:54:37.849000 982813 site-packages/torch/distributed/run.py:792] 
W0210 11:54:37.849000 982813 site-packages/torch/distributed/run.py:792] *****************************************
W0210 11:54:37.849000 982813 site-packages/torch/distributed/run.py:792] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0210 11:54:37.849000 982813 site-packages/torch/distributed/run.py:792] *****************************************
Local rank  2
Local rank  1
Local rank  0
Local rank  3
...
NCCL version 2.21.5+cuda12.4
Local Rank: 3, Epoch: 0, Training ...
Local Rank: 1, Epoch: 0, Training ...
Local Rank: 0, Epoch: 0, Training ...
Local Rank: 2, Epoch: 0, Training ...
---------------------------------------------------------------------------
Epoch: 0, Accuracy: 0.0
---------------------------------------------------------------------------
Local Rank: 1, Epoch: 1, Training ...
Local Rank: 3, Epoch: 1, Training ...
Local Rank: 2, Epoch: 1, Training ...
Local Rank: 0, Epoch: 1, Training ...
Local Rank: 3, Epoch: 2, Training ...
Local Rank: 0, Epoch: 2, Training ...
...
```
Both master and worker nodes will proceed to execute the DDP program. Please note that you must `cancel` the job submission to be able to run the next instructions given in the notebook. 

- From one of the opened terminal please run the command `squeue --me`. You will see similar output as shown below.
  ```text
   JOBID PARTITION  NAME       USER  ST       TIME    TIME_LEFT  CPUS MIN_MEM  NODE NODELIST(REASON)
  15648   gpu       bash       tade  R      1:32:46   10:27:14     2     0      1     dgx02
  15647   gpu       bash       tade  R      1:36:31   10:23:29     2     0      1     dgx01
  ```
- To cancel each of the job, use the command: `scancel <JOBID>.` For example, `scancel 15648` and `scancel 15647`

#### 2.  Run `torchrun` with Slurm Command

A major disadvantage of the above approach is that if you intend to run the program on five or more nodes, You will be required to open five or more terminals and execute the process simultaneously, which takes a lot of effort. The Slurm approach is a way to simplify the process by having just a single script containing slurm commands that provide the opportunity to run your application on many nodes available from one point. Let's dive deep into a sample slurm script for our `nn.parallel.DistributedDataParallel()` code.

**Sbatch section:** This is the section to set the number of nodes, the GPU partition, the number of tasks per node, the number of threads to be executed per task, the number of GPU to use per node, the name given to the submitted job, name of the file to output errors and warnings, and name of a file to output training progress and results.

```text
#!/bin/bash
#SBATCH --nodes=2               # number of nodes
#SBATCH --partition=gpu         # GPU partition
#SBATCH --ntasks-per-node=4     # number of tasks per node
#SBATCH --cpus-per-task=8       # number of threads per task
#SBATCH --threads-per-core=1    # number of threads per core
#SBATCH --gres=gpu:4            # number of gpus per node
#SBATCH --time=12:00:00         # format: HH:MM:SS
...
#SBATCH --job-name=ddpslurm     # job name
#SBATCH -o %x.output%j          # name of the file to output errors and warnings
#SBATCH -e %x.error%j           # name of a file to output training progress and results
```

**Libraries/virtual environment settings**: This section might differ from cluster to cluster. The dependencies required by your application determine it. In the example below, we load a conda library and activate a conda environment that contains all the dependencies needed to run our `nn.parallel.DistributedDataParallel()` code.

```text

# load conda env

module load conda/2023
conda init
source activate env_mnode
```

**Getting master node IP address**: This is the part of the slurm script that dynamically gets the master node IP address (head_node_ip). 

```text
# get master node IP Address 
...
nodes=( $(scontrol show hostnames "$SLURM_JOB_NODELIST"))
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
```

**Executing Srun command**: This is the section that starts two nodes, and submits using a simple command below:

`srun python source_code/slurm_ddp.py`

*For reference purpose, you can also submit a job using `torchrun` command. Within the torchrun command you set two [Rendezvous](https://pytorch.org/docs/stable/elastic/rendezvous.html#torch.distributed.elastic.rendezvous.RendezvousHandler) flags `--rdzv_backend` that uses a `C10d` store (by default TCPStore) as the rendezvous backend, and `--rdzv_endpoint` that takes ip address and port number as input. You can read more on  [Rendezvous flags here](https://pytorch.org/docs/stable/elastic/run.html). For example: `srun torchrun --nproc_per_node=4 --nnodes=2 --rdzv_backend c10d --rdzv_endpoint $head_node_ip:29500 source_code/test_ddp.py`*



Let's follow the steps below to run the slurm script [found here](../source_code/slurm/ddp_multinode.slurm):

- Open a terminal from the head node and navigate to the project workspace folder.

- Use `sbatch` command to execute the Slurm script: `sbatch workspace/source_code/slurm/ddp_multinode.slurm`

- Check if your job was submitted successfully using the `squeue --me` command. This will show the list of jobs you submitted.

````text

 JOBID PARTITION   NAME       USER ST         TIME    TIME_LEFT  CPUS MIN_MEM  NODE NODELIST(REASON)
 14695  gpu      ddpslurm      tade  R         0:05     11:59:55   512    0      2   dgx[01-02]
````
- On successful execution, you can check for the output progress (`ddpslurm.outputxxx`) in the workspace directory. In case you find on ouput there, please check for warnings and errors in the error file `ddpslurm.errorxxx.`

Likely output from the `ddpslurm.outputxxx`(<NAME.outputJOBID>).

```python
...
Local rank  1
Local rank  0
Local rank  1
Local rank  2
Local rank  0
Local rank  3
NCCL version 2.21.5+cuda12.4
Local rank  2
Local rank  3
Local Rank: 3, Epoch: 0, Training ...
Local Rank: 0, Epoch: 0, Training ...
Local Rank: 2, Epoch: 0, Training ...
Local Rank: 1, Epoch: 0, Training ...
Local Rank: 1, Epoch: 0, Training ...
Local Rank: 2, Epoch: 0, Training ...
Local Rank: 3, Epoch: 0, Training ...
Local Rank: 0, Epoch: 0, Training ...
---------------------------------------------------------------------------
Epoch: 0, Accuracy: 0.0
---------------------------------------------------------------------------
---------------------------------------------------------------------------
Epoch: 0, Accuracy: 0.0
---------------------------------------------------------------------------
Local Rank: 3, Epoch: 1, Training ...
Local Rank: 2, Epoch: 1, Training ...
Local Rank: 2, Epoch: 1, Training ...
...
```


### Model Checkpointing and Fault Tolerance

In distributed training, a single process failure can disrupt the entire training job. Since the training process failure rate can be higher, making your training script robust is particularly important. PyTorch offers a utility called `torchrun` that provides [fault-tolerance](https://pytorch.org/tutorials/beginner/ddp_series_fault_tolerance.html) and [elastic training](https://pytorch.org/docs/stable/elastic/run.html). When a failure occurs, torchrun logs the errors and attempts to automatically restart all the processes from the last saved “snapshot” of the training job. The snapshot saves more than just the model state; it can include details about the number of epochs run, optimizer states, or any other stateful attribute of the training job necessary for its continuity. Model checkpointing is all about achieving in-parallel model saving. Below is a sample custom checkpointing snapshot function

```python

def checkpointing(rank, epoch, net, optimizer, loss):
	path = f"model{rank}.pt"
	torch.save({
				'epoch':epoch,
				'model_state':net.state_dict(),
				'loss': loss,
				'optim_state': optimizer.state_dict(),
				}, path)
	print(f"Checkpointing model {rank} done.")
    
```

Let's proceed to the `Fully Sharded Data Parallelism (FSDP)` notebook. Here, you will learn a concept that improves on data parallelism. Please click the [Next Link](fsdp.ipynb).


---
## Licensing

Copyright © 2025 OpenACC-Standard.org. This material is released by OpenACC-Standard.org, in collaboration with NVIDIA Corporation, under the Creative Commons Attribution 4.0 International (CC BY 4.0). These materials include references to hardware and software developed by other entities; all applicable licensing and copyrights apply.