In [1]:
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
import torch.multiprocessing as ma
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

# 1. Basics

- All to one: reduce, one to All: boardcast
- rank: [0, world_size-1]

# 2. DDP setup

In [3]:
world_size = torch.cuda.device_count()
world_size

1

In [4]:
def ddp_setup(rank, world_size):
    
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"

    """
    nccl: NVIDIA Collective Communication Library
    """
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

# 3. Custom Trainer

In [5]:
class Trainer:

    def __init__(self,
                 model: torch.nn.Module,
                 train_dataloader: DataLoader,
                 optimizer: torch.optim.Optimizer,
                 gpu_id: int) -> None:
        
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_dataloader = train_dataloader
        self.optimizer = optimizer
        self.model = DDP(model, device_ids=[gpu_id])

    def _run_batch(self, xs, ys):
        self.optimizer.zero_grad()
        output = self.model(xs)
        loss = F.cross_entropy(output, ys)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        
        print(f"GPU: {self.gpu_id}")
        print(f"Epoch: {epoch}")
        print(f"Batchsize: {batch_size}")
        print(f"Step: {len(self.train_dataloader)}")

        self.train_dataloader.sampler.set_epoch(epoch)

    def train(self, max_epoch: int):
        for epoch in range(max_epoch):
            self._run_epoch(epoch)

# 4. Pipeline

In [6]:
class MyTrainDataset(Dataset):
    
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size
    
    def __getitem__(self, index):
        return self.data[index]

In [7]:
train_dataset = MyTrainDataset(2048)

In [8]:
train_dataset[0]

(tensor([0.7611, 0.3523, 0.7436, 0.6182, 0.3252, 0.4338, 0.6137, 0.3898, 0.6130,
         0.6168, 0.8746, 0.4442, 0.3539, 0.3470, 0.6931, 0.1793, 0.8233, 0.9674,
         0.3574, 0.3794]),
 tensor([0.1624]))

In [9]:
def main(rank: int, world_size: int, max_epochs: int, batch_size: int):

    ddp_setup(rank, world_size)
    train_dataset = MyTrainDataset(2048)
    train_dataloader = DataLoader(train_dataset,
                                  batch_size=batch_size,
                                  pin_memory=True,
                                  shuffle=False,
                                  sampler=DistributedSampler(train_dataset))
    model = torch.nn.Linear(20, 1)
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    trainer = Trainer(
        model=model,
        gpu_id=rank,
        optimizer=optimizer,
        train_dataloader=train_dataloader
    )

    trainer.train(max_epochs)

    destroy_process_group()

# 5. 分布式training

数据量：2048， batch_size:32 -> 每个批次64

如果有2个gpu，单个gpu上分配了64/2==32

`（32*32）* 2 = 2048`

In [10]:
world_size = torch.cuda.device_count()
world_size

1

In [None]:
!python ddp_gpus.py --max_epochs 5 --batch_size 32