In [1]:
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

## basics

- All to one：reduce； one to All：broadcast
- rank：`[0, world_size-1]`

## ddp setup

In [3]:
world_size = torch.cuda.device_count()
world_size

8

In [4]:
def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    # rank 0 process
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    # nccl：NVIDIA Collective Communication Library 
    # 分布式情况下的，gpus 间通信
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

## custom Trainer（自定义 trainer）

In [5]:
class Trainer:
    def __init__(self, 
                 model: torch.nn.Module, 
                 train_dataloader: DataLoader, 
                 optimizer: torch.optim.Optimizer, 
                 gpu_id: int) -> None:
        # rank
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_dataloader = train_dataloader
        self.optimizer = optimizer
        self.model = DDP(model, device_ids=[gpu_id])
    
    def _run_batch(self, xs, ys):
        self.optimizer.zero_grad()
        output = self.model(xs)
        loss = F.cross_entropy(output, ys)
        loss.backward()
        self.optimizer.step()
    
    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}')
        self.train_dataloader.sampler.set_epoch(epoch)
        for xs, ys in self.train_dataloader:
            xs = xs.to(self.gpu_id)
            ys = ys.to(self.gpu_id)
            self._run_batch(xs, ys)
    
    def train(self, max_epoch: int):
        for epoch in range(max_epoch):
            self._run_epoch(epoch)

In [None]:
# class Trainer:
#     def __init__(self, 
#                  model: torch.nn.Module, 
#                  train_dataloader: DataLoader, 
#                  optimizer: torch.optim.Optimizer, 
#                  gpu_id: int) -> None:
#         # 保存当前进程对应的 GPU ID（即 rank）
#         self.gpu_id = gpu_id
        
#         # 将模型移动到当前进程指定的 GPU 上
#         self.model = model.to(gpu_id)
        
#         # 保存训练用的数据加载器
#         self.train_dataloader = train_dataloader
        
#         # 保存优化器
#         self.optimizer = optimizer
        
#         # 使用 DistributedDataParallel 包装模型，启用分布式训练
#         # device_ids 指定当前进程使用的 GPU 设备
#         self.model = DDP(model, device_ids=[gpu_id])
    
#     def _run_batch(self, xs, ys):
#         # 清空上一次的梯度
#         self.optimizer.zero_grad()
        
#         # 前向传播：得到模型输出
#         output = self.model(xs)
        
#         # 计算损失函数（交叉熵损失）
#         loss = F.cross_entropy(output, ys)
        
#         # 反向传播：计算梯度
#         loss.backward()
        
#         # 更新参数
#         self.optimizer.step()
    
#     def _run_epoch(self, epoch):
#         # 获取一个 batch 的样本数量（用于打印信息）
#         batch_size = len(next(iter(self.train_dataloader))[0])
        
#         # 打印当前 epoch、batch 大小和总步数（每个 epoch 的 batch 数量）
#         print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}')
        
#         # 在每个 epoch 开始时设置 sampler 的 epoch，确保数据打乱方式不同
#         self.train_dataloader.sampler.set_epoch(epoch)
        
#         # 遍历 dataloader 中的每一个 batch
#         for xs, ys in self.train_dataloader:
#             # 将输入数据移动到当前进程的 GPU 上
#             xs = xs.to(self.gpu_id)
            
#             # 将标签数据移动到当前进程的 GPU 上
#             ys = ys.to(self.gpu_id)
            
#             # 执行单个 batch 的训练步骤
#             self._run_batch(xs, ys)
    
#     def train(self, max_epoch: int):
#         # 对每个 epoch 进行训练
#         for epoch in range(max_epoch):
#             # 执行一个完整的训练轮次（一个 epoch）
#             self._run_epoch(epoch)

## pipeline

In [6]:
class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size
    
    def __getitem__(self, index):
        return self.data[index]

In [None]:
# class MyTrainDataset(Dataset):
#     def __init__(self, size):  # 构造函数，初始化数据集大小
#         self.size = size  # 保存数据集的大小
#         self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]  # 生成随机输入和输出的数据对，作为数据集内容

#     def __len__(self):  # 返回数据集的大小
#         return self.size  # 返回数据集中样本的数量

#     def __getitem__(self, index):  # 根据索引获取单个数据样本
#         return self.data[index]  # 返回指定索引位置上的数据样本

In [7]:
train_dataset = MyTrainDataset(2048)

In [8]:
train_dataset[0]

(tensor([0.9667, 0.0344, 0.4949, 0.0568, 0.7420, 0.0653, 0.5933, 0.2974, 0.1428,
         0.3780, 0.9968, 0.7291, 0.5143, 0.4751, 0.9737, 0.4574, 0.8796, 0.5992,
         0.0597, 0.3505]),
 tensor([0.5779]))

In [9]:
def main(rank: int, world_size: int, max_epochs: int, batch_size: int):
    ddp_setup(rank, world_size)
    
    train_dataset = MyTrainDataset(2048)
    train_dataloader = DataLoader(train_dataset, 
                              batch_size=batch_size, 
                              pin_memory=True, 
                              shuffle=False, 
                              # batch input: split to each gpus (且没有任何 overlaping samples 各个 gpu 之间)
                              sampler=DistributedSampler(train_dataset))
    model = torch.nn.Linear(20, 1)
    optimzer = torch.optim.SGD(model.parameters(), lr=1e-3)
    
    trainer = Trainer(model=model, gpu_id=rank, optimizer=optimzer, train_dataloader=train_dataloader)
    trainer.train(max_epochs)
    
    destroy_process_group()

In [None]:
# def main(rank: int, world_size: int, max_epochs: int, batch_size: int):
#     # 调用 ddp_setup 函数，初始化分布式训练环境，传入当前进程的 rank 和总的进程数 world_size
#     ddp_setup(rank, world_size)
    
#     # 创建自定义数据集 MyTrainDataset 的实例，数据集大小为 2048
#     train_dataset = MyTrainDataset(2048)
    
#     # 使用 DataLoader 加载数据集，设置每个批次的数据量为 batch_size，启用 pin_memory 提高 GPU 数据传输效率
#     # shuffle=False 表示不随机打乱数据，因为使用了 DistributedSampler 来自动分配不同 GPU 的数据
#     # DistributedSampler 保证每个 GPU 进程获取到不同的数据子集，且没有重叠样本
#     train_dataloader = DataLoader(train_dataset, 
#                                 batch_size=batch_size, 
#                                 pin_memory=True, 
#                                 shuffle=False, 
#                                 sampler=DistributedSampler(train_dataset))
    
#     # 定义一个简单的线性模型，输入维度为 20，输出维度为 1
#     model = torch.nn.Linear(20, 1)
    
#     # 使用 SGD（随机梯度下降）优化器，学习率为 1e-3，用于更新模型参数
#     optimzer = torch.optim.SGD(model.parameters(), lr=1e-3)
    
#     # 初始化 Trainer 类，将模型、GPU ID、优化器和数据加载器传递给它，准备开始训练
#     trainer = Trainer(model=model, gpu_id=rank, optimizer=optimzer, train_dataloader=train_dataloader)
    
#     # 开始训练，指定最大训练轮次 max_epochs
#     trainer.train(max_epochs)
    
#     # 销毁分布式训练的进程组，释放资源
#     destroy_process_group()

## 分布式 training

In [17]:
world_size = torch.cuda.device_count()
world_size

8

In [18]:
2048/32

64.0

In [19]:
(32*32) * 2

2048

In [20]:
# mp.spawn(main, args=(world_size, 10, 32), nprocs=world_size)
!python ddp_gpus.py --max_epochs 5 --batch_size 32 

[W518 14:40:19.035822065 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [localhost]:12355 (errno: 97 - Address family not supported by protocol).
[W518 14:40:19.035860824 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [localhost]:12355 (errno: 97 - Address family not supported by protocol).
[W518 14:40:19.036353298 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [::ffff:127.0.0.1]:12355 (errno: 97 - Address family not supported by protocol).
[W518 14:40:19.050114151 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [::ffff:127.0.0.1]:12355 (errno: 97 - Address family not supported by protocol).
[W518 14:40:19.054724000 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [localhost]:12355 (errno: 97 - Address family not supported by protocol).
[W518 14:40:19.057531352 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [loc

## torchrun 运行

- https://pytorch.org/docs/stable/elastic/run.html
- fault-tolerant 
- `torchrun`
    - Worker RANK and WORLD_SIZE are assigned automatically.
    - `python -m torch.distributed.launch --use-env`

In [14]:
!torchrun ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32

[W518 13:52:47.541889790 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [::ffff:127.0.0.1]:29500 (errno: 97 - Address family not supported by protocol).
[W518 13:52:48.781252687 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [localhost]:29500 (errno: 97 - Address family not supported by protocol).
[GPU: 0] Epoch: 0 | Batchsize: 32 | Steps: 64
[GPU: 0] Epoch: 1 | Batchsize: 32 | Steps: 64
[GPU: 0] Epoch: 2 | Batchsize: 32 | Steps: 64
[GPU: 0] Epoch: 3 | Batchsize: 32 | Steps: 64
[GPU: 0] Epoch: 4 | Batchsize: 32 | Steps: 64


In [15]:
!torchrun --nproc-per-node=2 ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32

W0518 13:52:54.965000 43619 site-packages/torch/distributed/run.py:766] 
W0518 13:52:54.965000 43619 site-packages/torch/distributed/run.py:766] *****************************************
W0518 13:52:54.965000 43619 site-packages/torch/distributed/run.py:766] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0518 13:52:54.965000 43619 site-packages/torch/distributed/run.py:766] *****************************************
[W518 13:52:54.044864284 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [localhost]:29500 (errno: 97 - Address family not supported by protocol).
[W518 13:52:56.230635954 socket.cpp:755] [c10d] The client socket cannot be initialized to connect to [localhost]:29500 (errno: 97 - Address family not supported by protocol).
[W518 13:52:56.231561212 socket.cpp:755] [c10d] The client socket

In [16]:
!python -m torch.distributed.launch --use-env --nproc-per-node=2 ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32

and will be removed in future. Use torchrun.
Note that --use-env is set by default in torchrun.
If your script expects `--local-rank` argument to be set, please
change it to read from `os.environ['LOCAL_RANK']` instead. See 
https://pytorch.org/docs/stable/distributed.html#launch-utility for 
further instructions

  main()
W0518 13:53:03.041000 46459 site-packages/torch/distributed/run.py:766] 
W0518 13:53:03.041000 46459 site-packages/torch/distributed/run.py:766] *****************************************
W0518 13:53:03.041000 46459 site-packages/torch/distributed/run.py:766] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
W0518 13:53:03.041000 46459 site-packages/torch/distributed/run.py:766] *****************************************
[W518 13:53:03.121082584 socket.cpp:755] [c10d] The client socket cannot be initialized to c