<a href="https://colab.research.google.com/github/learn2Pro/rl_learning/blob/master/llm/ddp_toy_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [5]:
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

## basics
- All to one：reduce； one to All：broadcast
- rank：[0, world_size-1]


In [6]:
world_size = torch.cuda.device_count()
world_size

1

In [8]:
def ddp_setup(rank, world_size):
  """
  Args:
  rank: Unique identifier of each process
  world_size: Total number of processes
  """
  # rank 0 process
  os.environ['MASTER_ADDR'] = 'localhost'
  os.environ['MASTER_PORT'] = '12355'
  init_process_group(backend='ncc1',rank=rank,world_size=world_size)
  torch.cuda.set_device(rank)

## custom trainer

In [14]:
class Trainer:
  def __init__(self,
               model:torch.nn.Module,
               train_dataloader: DataLoader,
               optimizer: torch.optim.Optimizer,
               gpu_id:int
               ) -> None:
    self.gpu_id = gpu_id
    self.model = model.to(gpu_id)
    self.train_dataloader = train_dataloader
    self.optimizer = optimizer
    self.model = DDP(model,device_ids=[gpu_id])

  def __run_batch(self,xs,ys):
    self.optimizer.zero_grad()
    output = self.model(xs)
    loss = F.cross_entropy(output, ys)
    loss.backward()
    self.optimizer.step()

  def _run_epoch(self, epoch):
    batch_size = len(next(iter(self.train_dataloader))[0])
    print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}')
    self.train_dataloader.sampler.set_epoch(epoch)
    for xs, ys in self.train_dataloader:
      xs = xs.to(self.gpu_id)
      ys = ys.to(self.gpu_id)
      self._run_batch(xs, ys)

  def train(self,max_epoch:int):
    for epoch in range(max_epoch):
      self._run_epoch(epoch)


## pipeline

In [17]:
class MyTrainDataset(Dataset):

  def __init__(self, size):
    self.size = size
    self.data = [(torch.rand(20),torch.rand(1)) for _ in range(size)]

  def __len__(self):
    return self.size
  def __getitem__(self, index):
    return self.data[index]

In [18]:
train_dataset = MyTrainDataset(2048)

In [19]:
train_dataset[0]

(tensor([0.8135, 0.9584, 0.3655, 0.7456, 0.4496, 0.4581, 0.1562, 0.7645, 0.9840,
         0.8844, 0.4045, 0.5310, 0.8722, 0.1511, 0.6923, 0.5194, 0.1092, 0.4003,
         0.3517, 0.5437]),
 tensor([0.4592]))

In [22]:
def main(rank:int, world_size: int, max_epochs:int, batch_size:int):
  ddp_setup(rank, world_size)

  train_dataset = MyTrainDataset(2048)
  train_dataloader = DataLoader(train_dataset,
                                batch_size=batch_size,
                                pin_memory=True,
                                shuffle=True,
                                sampler = DistributedSampler(train_dataset)
                                )
  model = torch.nn.Linear(20, 1)
  optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

  trainer = Trainer(model=model, gpu_id=rank, optimizer=optimizer, train_dataloader=train_dataloader)
  trainer.train(max_epochs)

  destroy_process_group()

In [24]:
world_size = torch.cuda.device_count()
world_size

1