In [1]:
%%time 
%reload_ext autoreload
%autoreload 2
import os, sys
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7"

import torch
import ipdb
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler  # distributes data across GPUs
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

CPU times: user 312 ms, sys: 81.1 ms, total: 393 ms
Wall time: 434 ms


---

### Multi-GPU training with DDP  `https://www.youtube.com/watch?v=-LAtx9Q6DA8`

In [3]:
class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        rank: int,
        save_every: int, 
    ) -> None:
        
        self.rank = rank
        self.model = model.to(rank)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every

    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        
        loss = F.cross_entropy(output, targets.view(-1))
        loss.backward()
        list_loss.append(loss.item())
        self.optimizer.step()

    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_data))[0])
        print(f"[GPU{self.rank}] Epoch {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_data)}")
        for source, targets in self.train_data:
            source = source.to(self.rank)
            targets = targets.to(self.rank)
            self._run_batch(source, targets)

    def _save_checkpoint(self, epoch):
        ckp = self.model.state_dict()
        PATH = "checkpoint.pt"
        torch.save(ckp, PATH)
        print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if epoch % self.save_every == 0:
                self._save_checkpoint(epoch)


def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=True
    )

def main(total_epochs=50, save_every=10, batch_size=32):
    train_set = MyTrainDataset(2048)  # load your dataset
    model = torch.nn.Linear(20, 5)  # load your model
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    train_data = DataLoader(train_set, batch_size=batch_size, pin_memory=True, shuffle=True)
    trainer = Trainer(model, train_data, optimizer, device, save_every)
    trainer.train(total_epochs)
    
device = 0  # shorthand for cuda:0
list_loss = []
main()
plt.plot(list_loss, alpha=.5)

[GPU0] Epoch 0 | Batchsize: 32 | Steps: 64
Epoch 0 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 1 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 2 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 3 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 4 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 5 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 6 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 7 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 8 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 9 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 10 | Batchsize: 32 | Steps: 64
Epoch 10 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 11 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 12 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 13 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 14 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 15 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 16 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 17 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 18 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 19 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 20 | Batch

NameError: name 'plt' is not defined

In [5]:
train_set = MyTrainDataset(2048)  # load your dataset
batch_size = 32
train_data = DataLoader(train_set, batch_size=batch_size, pin_memory=True, shuffle=True)

for source, targets in train_data:
    print(source.shape, targets.shape)


torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 20]) torch.Size([32, 1])
torch.Size([32, 

---
### `https://pytorch.org/tutorials/intermediate/ddp_tutorial.html`

In [5]:
def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()


In [6]:
class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()

def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)
