In [1]:
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

### basics
- All to one：reduce； one to All：broadcast
- rank：[0, world_size-1]

### ddp setup
- MASTER_ADDR: 主节点的IP地址，这里是localhost表示单机多卡
- MASTER_PORT: 主节点监听的端口，需要是一个空闲端口
- 所有进程通过这些信息找到彼此并进行通信
- backend="nccl": 使用NVIDIA的NCCL库，这是GPU间最高效的通信后端
- rank=rank: 传入当前进程的唯一标识
- world_size=world_size: 传入总进程数

In [3]:
world_size = torch.cuda.device_count()
world_size

9

In [None]:
def ddp_setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank) # 将当前进程绑定到对应的GPU上，确保每个进程使用不同的GPU

### custom Trainer（自定义 trainer）

In [None]:
class Trainer:
    def __init__(self, 
                 model: torch.nn.Module, 
                 train_dataloader: DataLoader, 
                 optimizer: torch.optim.Optimizer, 
                 gpu_id: int) -> None:
        # rank
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_dataloader = train_dataloader
        self.optimizer = optimizer
        # 将模型包装成分布式数据并行模型，自动处理梯度同步。
        self.model = DDP(model, device_ids=[gpu_id])
    
    def _run_batch(self, xs, ys):
        self.optimizer.zero_grad()        # 清空梯度
        output = self.model(xs)           # 前向传播
        loss = F.cross_entropy(output, ys) # 计算损失
        loss.backward()                   # 反向传播
        self.optimizer.step()             # 更新参数
    
    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}')
        # 每个epoch重新对数据洗牌
        # 确保数据分布不同，避免所有GPU看到相同的数据顺序
        self.train_dataloader.sampler.set_epoch(epoch) 
        for xs, ys in self.train_dataloader:
            xs = xs.to(self.gpu_id)
            ys = ys.to(self.gpu_id)
            self._run_batch(xs, ys)
        
    def train(self, max_epoch: int):
        for epoch in range(max_epoch):
            self._run_epoch(epoch)

### pipeline

In [6]:
class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.randn(1)) for _ in range(size)]

    def __len__(self):
        return self.size
    
    def __getitem__(self, index):
        return self.data[index]

In [7]:
train_dataset = MyTrainDataset(2048)

In [12]:
train_dataset[0]

(tensor([0.1571, 0.6422, 0.2426, 0.7534, 0.4267, 0.6078, 0.6869, 0.1893, 0.2173,
         0.2950, 0.5808, 0.7997, 0.5400, 0.4954, 0.8197, 0.6861, 0.7217, 0.2654,
         0.7464, 0.7950]),
 tensor([-0.5326]))

### 分布式训练主函数：
假设在4GPU上运行：
- 4个进程同时执行这个函数
- 每个进程的 rank 不同 (0, 1, 2, 3)
- 每个进程的 gpu_id 对应各自的GPU
- DistributedSampler 自动将数据分成4份，每个GPU处理不同部分
    - DistributedSampler 的作用：
        - ✅ 数据分区 - 将数据均匀分配到所有GPU
        - ✅ 避免重复 - 确保数据不重叠，全部覆盖
        - ✅ 支持洗牌 - 通过 set_epoch() 控制数据顺序
        - ✅ 梯度独立性 - 每个GPU计算不同数据的梯度

In [14]:
def main(rank: int, world_size: int, max_epochs: int, batch_size: int):
    # 初始化分布式环境
    ddp_setup(rank, world_size)

    # 准备数据
    train_dataset = MyTrainDataset(2048)
    train_dataloader = DataLoader(train_dataset, 
                              batch_size=batch_size, 
                              pin_memory=True, 
                              shuffle=False, 
                              sampler=DistributedSampler(train_dataset))

    # 创建模型
    model = torch.nn.Linear(20, 1)
    optimizer = torch.optim.sgd(model.parameters(), lr=1e-3)

    # 创建训练器并开始训练
    trainer = Trainer(model=model, gpu_id=rank, optimizer=optimizer, train_dataloader=train_dataloader)
    trainer.train(max_epochs)
    
    # 释放分布式资源
    destroy_process_group()

### 分布式 training

In [15]:
world_size = torch.cuda.device_count()
world_size

9

2048条数据，per gpu 32条数据，9 gpu

每个gpu 2048/32/9=7.111≈8 Steps

In [16]:
!python ddp_gpus.py --max_epochs 5 --batch_size 32 

[GPU: 2] Epoch: 0 | Batchsize: 32 | Steps: 8
[GPU: 6] Epoch: 0 | Batchsize: 32 | Steps: 8
[GPU: 8] Epoch: 0 | Batchsize: 32 | Steps: 8
[GPU: 3] Epoch: 0 | Batchsize: 32 | Steps: 8
[GPU: 0] Epoch: 0 | Batchsize: 32 | Steps: 8
[GPU: 7] Epoch: 0 | Batchsize: 32 | Steps: 8
[GPU: 1] Epoch: 0 | Batchsize: 32 | Steps: 8[GPU: 4] Epoch: 0 | Batchsize: 32 | Steps: 8

[GPU: 5] Epoch: 0 | Batchsize: 32 | Steps: 8
[GPU: 1] Epoch: 1 | Batchsize: 32 | Steps: 8
[GPU: 2] Epoch: 1 | Batchsize: 32 | Steps: 8
[GPU: 3] Epoch: 1 | Batchsize: 32 | Steps: 8[GPU: 4] Epoch: 1 | Batchsize: 32 | Steps: 8

[GPU: 0] Epoch: 1 | Batchsize: 32 | Steps: 8
[GPU: 7] Epoch: 1 | Batchsize: 32 | Steps: 8
[GPU: 8] Epoch: 1 | Batchsize: 32 | Steps: 8[GPU: 6] Epoch: 1 | Batchsize: 32 | Steps: 8

[GPU: 5] Epoch: 1 | Batchsize: 32 | Steps: 8
[GPU: 5] Epoch: 2 | Batchsize: 32 | Steps: 8
[GPU: 3] Epoch: 2 | Batchsize: 32 | Steps: 8
[GPU: 2] Epoch: 2 | Batchsize: 32 | Steps: 8
[GPU: 1] Epoch: 2 | Batchsize: 32 | Steps: 8
[GPU: 4] E

- python ddp_gpus.py: 手动进程管理，需要代码内指定GPU数量
    > world_size = torch.cuda.device_count()</br>
    > mp.spawn(main, args=(world_size, args.max_epochs, args.batch_size), nprocs=world_size)

- torchrun ddp_gpus_torchrun.py: 自动进程管理，通过命令行参数指定GPU数量，是PyTorch官方推荐的方式

### torchrun 运行

In [17]:
!torchrun ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32

[GPU: 0] Epoch: 0 | Batchsize: 32 | Steps: 64
[GPU: 0] Epoch: 1 | Batchsize: 32 | Steps: 64
[GPU: 0] Epoch: 2 | Batchsize: 32 | Steps: 64
[GPU: 0] Epoch: 3 | Batchsize: 32 | Steps: 64
[GPU: 0] Epoch: 4 | Batchsize: 32 | Steps: 64


In [18]:
!torchrun --nproc-per-node=2 ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32

W1004 06:38:47.707927 139864687366528 torch/distributed/run.py:779] 
W1004 06:38:47.707927 139864687366528 torch/distributed/run.py:779] *****************************************
W1004 06:38:47.707927 139864687366528 torch/distributed/run.py:779] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W1004 06:38:47.707927 139864687366528 torch/distributed/run.py:779] *****************************************
[GPU: 1] Epoch: 0 | Batchsize: 32 | Steps: 32
[GPU: 0] Epoch: 0 | Batchsize: 32 | Steps: 32
[GPU: 0] Epoch: 1 | Batchsize: 32 | Steps: 32
[GPU: 1] Epoch: 1 | Batchsize: 32 | Steps: 32
[GPU: 0] Epoch: 2 | Batchsize: 32 | Steps: 32
[GPU: 1] Epoch: 2 | Batchsize: 32 | Steps: 32
[GPU: 0] Epoch: 3 | Batchsize: 32 | Steps: 32
[GPU: 1] Epoch: 3 | Batchsize: 32 | Steps: 32
[GPU: 0] Epoch: 4 | Batchsize: 32 | Steps: 32
[GPU: 1] Epoch: 4 |