In [1]:
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

### basics
- all to one: reduce; one to all: broadcast
- rank: [0, world_size - 1]

### ddp setup

In [4]:
world_size = torch.cuda.device_count()
world_size

4

In [5]:
def ddp_setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    init_process_group("nccl", rank=rank, world_size=world_size)
    init_process_group(backend='nccl', rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

In [6]:
class Trainer:
    def __init__(self, model: torch.nn.Module, train_dataloader: DataLoader, optimizer: torch.optim.Optimizer,
                 gpu_id: int) -> None:
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_dataloader = train_dataloader
        self.optimizer = optimizer
        self.model = DDP(model, device_ids=[gpu_id])

    def _run_batch(self, xs, ys):
        self.optimizer.zero_grad()
        output = self.model(xs)
        loss = F.cross_entropy(output, ys)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | BatchSize: {batch_size} | Steps: {len(self.train_dataloader)}')
        self.train_dataloader.sampler.set_epoch(epoch)
        for xs, ys in self.train_dataloader:
            xs = xs.to(self.gpu_id)
            ys = ys.to(self.gpu_id)
            self._run_batch(xs, ys)

    def train(self, max_epoch: int):
        for epoch in range(max_epoch):
            self._run_epoch(epoch)

In [10]:
class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.randn(20), torch.randn(1)) for _ in range(size)]

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        return self.data[index]

In [11]:
train_dataset = MyTrainDataset(2048)

In [12]:
train_dataset[0]

(tensor([-0.3474,  0.5461,  0.7366, -0.1509, -0.3166,  0.4327, -1.9924,  0.4900,
          1.1023, -1.7980, -0.9315,  0.0735, -1.6287, -1.2957,  0.4347, -0.7403,
          0.3555, -0.5719,  0.1192, -1.9336]),
 tensor([-1.0166]))

In [13]:
def main(rank: int, world_size: int, max_epochs: int, batch_size: int):
    ddp_setup(rank, world_size)

    train_dataset = MyTrainDataset(2048)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,
                                  sampler=DistributedSampler(train_dataset))
    model = torch.nn.Linear(20, 1)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

    trainer = Trainer(model=model, gpu_id=rank, optimizer=optimizer, train_dataloader=train_dataloader)
    trainer.train(max_epochs)

    destroy_process_group()


In [14]:
world_size = torch.cuda.device_count()
world_size

4

In [None]:
!python ddp_gpus.py --max_epochs 5 --batch_size 32  #%%
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [None]:
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

### basics
- all to one: reduce; one to all: broadcast
- rank: [0, world_size - 1]

### ddp setup

In [None]:
world_size = torch.cuda.device_count()
world_size

In [None]:
def ddp_setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    init_process_group("nccl", rank=rank, world_size=world_size)
    init_process_group(backend='nccl', rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

In [None]:
class Trainer:
    def __init__(self, model: torch.nn.Module, train_dataloader: DataLoader, optimizer: torch.optim.Optimizer,
                 gpu_id: int) -> None:
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_dataloader = train_dataloader
        self.optimizer = optimizer
        self.model = DDP(model, device_ids=[gpu_id])

    def _run_batch(self, xs, ys):
        self.optimizer.zero_grad()
        output = self.model(xs)
        loss = F.cross_entropy(output, ys)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | BatchSize: {batch_size} | Steps: {len(self.train_dataloader)}')
        self.train_dataloader.sampler.set_epoch(epoch)
        for xs, ys in self.train_dataloader:
            xs = xs.to(self.gpu_id)
            ys = ys.to(self.gpu_id)
            self._run_batch(xs, ys)

    def train(self, max_epoch: int):
        for epoch in range(max_epoch):
            self._run_epoch(epoch)

In [None]:
class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.randn(20), torch.randn(1)) for _ in range(size)]

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        return self.data[index]

In [None]:
train_dataset = MyTrainDataset(2048)

In [None]:
train_dataset[0]

In [None]:
def main(rank: int, world_size: int, max_epochs: int, batch_size: int):
    ddp_setup(rank, world_size)

    train_dataset = MyTrainDataset(2048)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,
                                  sampler=DistributedSampler(train_dataset))
    model = torch.nn.Linear(20, 1)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

    trainer = Trainer(model=model, gpu_id=rank, optimizer=optimizer, train_dataloader=train_dataloader)
    trainer.train(max_epochs)

    destroy_process_group()


In [None]:
world_size = torch.cuda.device_count()
world_size

distributed training的基本思路是：
- 假如提供了一个2048长度的数据集，如果有4个GPU，那么在distributed 过程中，每个GPU只需要处理512个数据，然后将结果reduce到rank=0的GPU上，这样就完成了一次epoch的训练。
- 但是，如果数据集长度不是4的整数倍，那么就会出现数据不均匀的情况，这时候就需要使用sampler来保证每个GPU上的数据量是一样的。
- 在一次epoch的过程中，每个GPU处理的相当于是不同的数据，但是每个GPU上的模型参数是一样的，这是因为每个GPU上的模型都是从rank=0的GPU上复制过来的。

In [16]:
!python ddp_gpus.py --max_epochs 5 --batch_size 32

/bin/bash: python: command not found
