# 2 Distributed Data Parallel Training

## 2.1 Single-Node Distributed Communication in PyTorch

In [None]:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def distributed_demo(rank, world_size):
    setup(rank, world_size)
    data = torch.randint(0, 10, (3,))
    print(f"rank {rank} data (before all-reduce): {data}")
    dist.all_reduce(data, async_op=False)
    print(f"rank {rank} data (after all-reduce): {data}")

world_size = 4
mp.spawn(fn=distributed_demo, args=(world_size,), nprocs=world_size, join=True)

Node: a machine on the network

Worker: an instance of a program that is participating in the distributed training. In this assignment, each worker will have a single process, so we will use worker, process, and worker process interchangeably. However, a worker may use multiple processes in practice.

World size: the number of total workers in a process group.

Global rank: an integer ID (between 0 and world_size-1) that uniquely identifies a worker in the process group. For example, for world size two, one process will have rank 0 (the master process) and the other process will have rank 1.

Local world size: When running applications across different nodes, the local world size is the number of workers running locally on a given node. For example, if we have an application that spawns 4 workers on 2 nodes each, the world size would be 8 and the local world size would be 4. When running on a single node, the local world size is equivalent to the global world size.

Local rank: An integer ID (between 0 and local_world_size-1) that uniquely identifies the index of a local worker on the machine.

### 2.1.1 Best Practices for Benchmarking Distributed Applications

Throughout this portion of the assignment you will be benchmarking distributed applications to better understand the overhead from communication. Here are a few best practices:
- Whenever possible, run benchmarks on the same machine to facilitate controlled comparisons.
- Perform several warm-up steps before timing the operation of interest. This is especially important for NCCL communication calls. 5 iterations of warmup is generally sufficient.
- Call torch.cuda.synchronize() to wait for CUDA operations to complete when benchmarking on GPUs. Note that this is necessary even when calling communication operations with async_op=False, which returns when the operation is queued on the GPU (as opposed to when the communication actually finishes)
- Timings may vary slightly across different ranks, so itâ€™s common to aggregate measurements across ranks to improve estimates. You may find the all-gather collective (specifically the dist.all_gather_object function) to be useful for collecting results from all ranks.
- In general, debug locally with Gloo on CPU, and then as required in a given problem, benchmark with NCCL on GPU. Switching between the backends just involves changing the init_process_group call and tensor device casts

**Benchmark Results (Macbook)**

| Backend | Workers | Data Size | Time (s)      |
|---------|---------|-----------|---------------|
| gloo    | 2       | 1MB       | 0.002193      |
| gloo    | 2       | 10MB      | 0.011302      |
| gloo    | 2       | 100MB     | 0.091152      |
| gloo    | 2       | 1000MB    | 0.893033      |
| gloo    | 4       | 1MB       | 0.004053      |
| gloo    | 4       | 10MB      | 0.022453      |
| gloo    | 4       | 100MB     | 0.226364      |
| gloo    | 4       | 1000MB    | 2.076443      |
| gloo    | 6       | 1MB       | 0.007497      |
| gloo    | 6       | 10MB      | 0.036168      |
| gloo    | 6       | 100MB     | 0.382041      |
| gloo    | 6       | 1000MB    | 3.864817      |
| **NCCL** | -      | -         | Skipping NCCL: CUDA not available. |

**Benchmark Results (HPC)**
GPUs were full.

## 2.2 A Naive Implementation of Distributed Data Parallel Training


| Metric              | Naive           | Flat            | Overlap         |
|:--------------------|:----------------|:----------------|:----------------|
| model               | xl              | xl              | xl              |
| backend             | nccl            | nccl            | nccl            |
| world_size          | 2               | 2               | 2               |
| global_batch_size   | 4               | 4               | 4               |
| warmup_steps        | 5               | 5               | 5               |
| benchmark_steps     | 10              | 10              | 10              |
| step_time_avg_s     | 0.4166874293936417   | 0.41147000561468305   | 0.355364911374636      |
| step_time_std_s     | 0.00018683517882813022 | 0.0005754676821822966 | 0.00040293421827531745 |
| comm_time_avg_s     | 0.09035780989797786    | 0.0848893636954017    | 0.0011527655879035592  |
| comm_time_std_s     | 0.000104253896714834   | 0.0005444497450712978 | 0.00011374137002487174 |
| comm_frac_avg       | 0.21684792522827329    | 0.2063061437328047    | 0.0032438155791756707  |
| comm_frac_std       | 0.00021810614287490294 | 0.0010456305515322477 | 0.00031916462254377004 |

## 2.3 Improving Upon the Minimal DDP Implementation

|                   | 0                      | 1                      | 2                      | 3                      |
|:------------------|:-----------------------|:-----------------------|:-----------------------|:-----------------------|
| bucket_size       | 1                      | 10                     | 100                    | 1000                   |
| model             | xl                     | xl                     | xl                     | xl                     |
| backend           | nccl                   | nccl                   | nccl                   | nccl                   |
| world_size        | 2                      | 2                      | 2                      | 2                      |
| global_batch_size | 4                      | 4                      | 4                      | 4                      |
| warmup_steps      | 5                      | 5                      | 5                      | 5                      |
| benchmark_steps   | 10                     | 10                     | 10                     | 10                     |
| step_time_avg_s   | 0.35737448240397496    | 0.3586082030669786     | 0.3521354146883823     | 0.35815223670797425    |
| step_time_std_s   | 0.00045482918231892186 | 0.00023495819585062406 | 0.00018689529789812692 | 0.0003563481417223787  |
| comm_time_avg_s   | 0.0017477366141974925  | 0.0014774692826904356  | 0.0008597417967393994  | 0.0006798887276090682  |
| comm_time_std_s   | 0.00012864381462239098 | 0.00012264102032681393 | 7.771309629903895e-05  | 4.797181851323098e-05  |
| comm_frac_avg     | 0.004890260091267846   | 0.004119833181109983   | 0.0024415040799994334  | 0.0018983396234894985  |
| comm_frac_std     | 0.0003561744808705491  | 0.00033953065310998607 | 0.0002206324158818065  | 0.00013422975145629642 |

In [1]:
import pandas as pd

results_1 = {'bucket_size': 1, 'model': 'xl', 'backend': 'nccl', 'world_size': 2, 'global_batch_size': 4, 'warmup_steps': 5, 'benchmark_steps': 10, 'step_time_avg_s': 0.35737448240397496, 'step_time_std_s': 0.00045482918231892186, 'comm_time_avg_s': 0.0017477366141974925, 'comm_time_std_s': 0.00012864381462239098, 'comm_frac_avg': 0.004890260091267846, 'comm_frac_std': 0.0003561744808705491}
results_10 = {'bucket_size': 10, 'model': 'xl', 'backend': 'nccl', 'world_size': 2, 'global_batch_size': 4, 'warmup_steps': 5, 'benchmark_steps': 10, 'step_time_avg_s': 0.3586082030669786, 'step_time_std_s': 0.00023495819585062406, 'comm_time_avg_s': 0.0014774692826904356, 'comm_time_std_s': 0.00012264102032681393, 'comm_frac_avg': 0.004119833181109983, 'comm_frac_std': 0.00033953065310998607}
results_100 = {'bucket_size': 100, 'model': 'xl', 'backend': 'nccl', 'world_size': 2, 'global_batch_size': 4, 'warmup_steps': 5, 'benchmark_steps': 10, 'step_time_avg_s': 0.3521354146883823, 'step_time_std_s': 0.00018689529789812692, 'comm_time_avg_s': 0.0008597417967393994, 'comm_time_std_s': 7.771309629903895e-05, 'comm_frac_avg': 0.0024415040799994334, 'comm_frac_std': 0.0002206324158818065}
results_1000 = {'bucket_size': 1000, 'model': 'xl', 'backend': 'nccl', 'world_size': 2, 'global_batch_size': 4, 'warmup_steps': 5, 'benchmark_steps': 10, 'step_time_avg_s': 0.35815223670797425, 'step_time_std_s': 0.0003563481417223787, 'comm_time_avg_s': 0.0006798887276090682, 'comm_time_std_s': 4.797181851323098e-05, 'comm_frac_avg': 0.0018983396234894985, 'comm_frac_std': 0.00013422975145629642}


df = pd.DataFrame([results_1, results_10, results_100, results_1000]).transpose()
print(df.to_markdown())

|                   | 0                      | 1                      | 2                      | 3                      |
|:------------------|:-----------------------|:-----------------------|:-----------------------|:-----------------------|
| bucket_size       | 1                      | 10                     | 100                    | 1000                   |
| model             | xl                     | xl                     | xl                     | xl                     |
| backend           | nccl                   | nccl                   | nccl                   | nccl                   |
| world_size        | 2                      | 2                      | 2                      | 2                      |
| global_batch_size | 4                      | 4                      | 4                      | 4                      |
| warmup_steps      | 5                      | 5                      | 5                      | 5                      |
| benchmark_steps   | 10