[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/juansensio/blog/blob/master/071_pytorch_distributed/071_pytorch_distributed.ipynb)

In [1]:
import os
from sklearn.model_selection import train_test_split

def setup(path='./data', test_size=0.2, random_state=42):

    classes = sorted(os.listdir(path))

    print("Generating images and labels ...")
    images, encoded = [], []
    for ix, label in enumerate(classes):
        _images = os.listdir(f'{path}/{label}')
        images += [f'{path}/{label}/{img}' for img in _images]
        encoded += [ix]*len(_images)
    print(f'Number of images: {len(images)}')

     # train / val split
    print("Generating train / val splits ...")
    train_images, val_images, train_labels, val_labels = train_test_split(
        images,
        encoded,
        stratify=encoded,
        test_size=test_size,
        random_state=random_state
    )

    print("Training samples: ", len(train_labels))
    print("Validation samples: ", len(val_labels))
    
    return classes, train_images, train_labels, val_images, val_labels

classes, train_images, train_labels, val_images, val_labels = setup('./data')

Generating images and labels ...
Number of images: 27000
Generating train / val splits ...
Training samples:  21600
Validation samples:  5400


In [2]:
import torch
from skimage import io 

class Dataset(torch.utils.data.Dataset):
    def __init__(self, images, labels):
        self.images = images
        self.labels = labels

    def __len__(self):
        return len(self.images)

    def __getitem__(self, ix):
        img = io.imread(self.images[ix])[...,(3,2,1)]
        img = torch.tensor(img / 4000, dtype=torch.float).clip(0,1).permute(2,0,1)  
        label = torch.tensor(self.labels[ix], dtype=torch.long)        
        return img, label
    
ds = {
    'train': Dataset(train_images, train_labels),
    'val': Dataset(val_images, val_labels)
}

batch_size = 1024
dl = {
    'train': torch.utils.data.DataLoader(ds['train'], batch_size=batch_size, shuffle=True, num_workers=20, pin_memory=True),
    'val': torch.utils.data.DataLoader(ds['val'], batch_size=batch_size, shuffle=False, num_workers=20, pin_memory=True)
}

In [3]:
import torch.nn.functional as F
import torchvision

class Model(torch.nn.Module):

    def __init__(self, n_outputs=10, use_amp=True):
        super().__init__()
        self.model = torchvision.models.resnet50(pretrained=True)
        self.model.fc = torch.nn.Linear(2048, n_outputs)
        self.use_amp = use_amp

    def forward(self, x, log=False):
        if log:
            print(x.shape)
        with torch.cuda.amp.autocast(enabled=self.use_amp):
            return self.model(x)

In [4]:
from tqdm import tqdm
import numpy as np

def step(model, batch, device):
    x, y = batch
    x, y = x.to(device), y.to(device)
    y_hat = model(x)
    loss = F.cross_entropy(y_hat, y)
    acc = (torch.argmax(y_hat, axis=1) == y).sum().item() / y.size(0)
    return loss, acc

def train_amp(model, dl, optimizer, epochs=10, device="cpu", use_amp = True, prof=None, end=0):
    model.to(device)
    hist = {'loss': [], 'acc': [], 'val_loss': [], 'val_acc': []}
    scaler = torch.cuda.amp.GradScaler(enabled=use_amp)
    for e in range(1, epochs+1):
        # train
        model.train()
        l, a = [], []
        bar = tqdm(dl['train'])
        stop=False
        for batch_idx, batch in enumerate(bar):
            optimizer.zero_grad()
            
            # AMP
            with torch.cuda.amp.autocast(enabled=use_amp):
                loss, acc = step(model, batch, device)
            scaler.scale(loss).backward()
            # gradient clipping 
            #torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.1)
            scaler.step(optimizer)
            scaler.update()
            
            l.append(loss.item())
            a.append(acc)
            bar.set_description(f"training... loss {np.mean(l):.4f} acc {np.mean(a):.4f}")
            # profiling
            if prof:
                if batch_idx >= end:
                    stop = True
                    break
                prof.step()  
        hist['loss'].append(np.mean(l))
        hist['acc'].append(np.mean(a))
        if stop:
            break
        # eval
        model.eval()
        l, a = [], []
        bar = tqdm(dl['val'])
        with torch.no_grad():
            for batch in bar:
                loss, acc = step(model, batch, device)
                l.append(loss.item())
                a.append(acc)
                bar.set_description(f"evluating... loss {np.mean(l):.4f} acc {np.mean(a):.4f}")
        hist['val_loss'].append(np.mean(l))
        hist['val_acc'].append(np.mean(a))
        # log
        log = f'Epoch {e}/{epochs}'
        for k, v in hist.items():
            log += f' {k} {v[-1]:.4f}'
        print(log)
        
    return hist

In [5]:
model = Model()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
hist = train_amp(model, dl, optimizer, epochs=3, device="cuda")

training... loss 0.3928 acc 0.8706: 100%|██████████| 22/22 [00:07<00:00,  3.02it/s]
evluating... loss 4.5814 acc 0.4061: 100%|██████████| 6/6 [00:01<00:00,  4.16it/s]
  0%|          | 0/22 [00:00<?, ?it/s]

Epoch 1/3 loss 0.3928 acc 0.8706 val_loss 4.5814 val_acc 0.4061


training... loss 0.1117 acc 0.9636: 100%|██████████| 22/22 [00:06<00:00,  3.35it/s]
evluating... loss 0.6180 acc 0.8515: 100%|██████████| 6/6 [00:01<00:00,  4.05it/s]
  0%|          | 0/22 [00:00<?, ?it/s]

Epoch 2/3 loss 0.1117 acc 0.9636 val_loss 0.6180 val_acc 0.8515


training... loss 0.0684 acc 0.9780: 100%|██████████| 22/22 [00:06<00:00,  3.48it/s]
evluating... loss 0.5459 acc 0.8631: 100%|██████████| 6/6 [00:01<00:00,  4.01it/s]

Epoch 3/3 loss 0.0684 acc 0.9780 val_loss 0.5459 val_acc 0.8631





## Data Parallel

Varias GPUs en la misma máquina, copiamos el modelo en cada GPU y dividimos el batch entre GPUs.

In [6]:
model = Model()
if torch.cuda.device_count() > 1:
  print("Let's use", torch.cuda.device_count(), "GPUs!")
  model = torch.nn.DataParallel(model)

Let's use 2 GPUs!


In [7]:
model.cuda()

# cada gpu recibe la mitad del batch !
output = model(torch.randn(32, 3, 32, 32).cuda(), log=True)

output.size()

torch.Size([16, 3, 32, 32])torch.Size([16, 3, 32, 32])



torch.Size([32, 10])

In [8]:
batch_size = 2048
dl = {
    'train': torch.utils.data.DataLoader(ds['train'], batch_size=batch_size, shuffle=True, num_workers=20, pin_memory=True),
    'val': torch.utils.data.DataLoader(ds['val'], batch_size=batch_size, shuffle=False, num_workers=20, pin_memory=True)
}

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
hist = train_amp(model, dl, optimizer, epochs=3, device="cuda")

training... loss 0.6510 acc 0.7866: 100%|██████████| 11/11 [00:05<00:00,  2.15it/s]
evluating... loss 70.5833 acc 0.1393: 100%|██████████| 3/3 [00:02<00:00,  1.45it/s]
  0%|          | 0/11 [00:00<?, ?it/s]

Epoch 1/3 loss 0.6510 acc 0.7866 val_loss 70.5833 val_acc 0.1393


training... loss 0.1157 acc 0.9635: 100%|██████████| 11/11 [00:05<00:00,  2.18it/s]
evluating... loss 4.0169 acc 0.5263: 100%|██████████| 3/3 [00:02<00:00,  1.40it/s]
  0%|          | 0/11 [00:00<?, ?it/s]

Epoch 2/3 loss 0.1157 acc 0.9635 val_loss 4.0169 val_acc 0.5263


training... loss 0.0609 acc 0.9805: 100%|██████████| 11/11 [00:04<00:00,  2.20it/s]
evluating... loss 1.1383 acc 0.7805: 100%|██████████| 3/3 [00:02<00:00,  1.46it/s]

Epoch 3/3 loss 0.0609 acc 0.9805 val_loss 1.1383 val_acc 0.7805





## Distributed Data Parallel

Ver script

## Model Parallel

In [45]:
class ModelParallel(torch.nn.Module):

    def __init__(self, gpu1, gpu2, n_outputs=10, use_amp=True):
        super().__init__()
        resnet = torchvision.models.resnet50(pretrained=True)
        self.backbone1 = torch.nn.Sequential(*list(resnet.children())[:6]).to(gpu1)
        self.backbone2 = torch.nn.Sequential(
            *list(resnet.children())[6:-1],
            torch.nn.Flatten(),
            torch.nn.Linear(2048, n_outputs)
        ).to(gpu2)
        self.use_amp = use_amp
        self.gpu1 = gpu1 
        self.gpu2 = gpu2

    def forward(self, x):
        with torch.cuda.amp.autocast(enabled=self.use_amp):
            x = x.to(self.gpu1)
            x = self.backbone1(x)
            x = x.to(self.gpu2)
            x = self.backbone2(x)
            return x

In [46]:
gpu1 = torch.device('cuda:0')
gpu2 = torch.device('cuda:1')

modelParallel = ModelParallel(gpu1, gpu2)

output = modelParallel(torch.randn(32, 3, 32, 32))

output.size()

torch.Size([32, 10])

In [44]:
modelParallel.backbone1

Sequential(
  (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (4): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)


In [33]:
modelParallel.backbone2

Sequential(
  (0): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(512, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(512, 1024, kernel_size=(1, 1), stride=(2, 2), bias=False)
        (1): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (1): Bottleneck(
      (conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, aff

In [53]:
def step_mp(model, batch, device):
    x, y = batch
    y = y.to(device)
    y_hat = model(x)
    loss = F.cross_entropy(y_hat, y)
    acc = (torch.argmax(y_hat, axis=1) == y).sum().item() / y.size(0)
    return loss, acc

def train_mp(model, dl, optimizer, device, epochs=10, use_amp = True, prof=None, end=0):
    hist = {'loss': [], 'acc': [], 'val_loss': [], 'val_acc': []}
    scaler = torch.cuda.amp.GradScaler(enabled=use_amp)
    for e in range(1, epochs+1):
        # train
        model.train()
        l, a = [], []
        bar = tqdm(dl['train'])
        stop=False
        for batch_idx, batch in enumerate(bar):
            optimizer.zero_grad()
            
            # AMP
            with torch.cuda.amp.autocast(enabled=use_amp):
                loss, acc = step(model, batch, device)
            scaler.scale(loss).backward()
            # gradient clipping 
            #torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.1)
            scaler.step(optimizer)
            scaler.update()
            
            l.append(loss.item())
            a.append(acc)
            bar.set_description(f"training... loss {np.mean(l):.4f} acc {np.mean(a):.4f}")
            # profiling
            if prof:
                if batch_idx >= end:
                    stop = True
                    break
                prof.step()  
        hist['loss'].append(np.mean(l))
        hist['acc'].append(np.mean(a))
        if stop:
            break
        # eval
        model.eval()
        l, a = [], []
        bar = tqdm(dl['val'])
        with torch.no_grad():
            for batch in bar:
                loss, acc = step(model, batch, device)
                l.append(loss.item())
                a.append(acc)
                bar.set_description(f"evluating... loss {np.mean(l):.4f} acc {np.mean(a):.4f}")
        hist['val_loss'].append(np.mean(l))
        hist['val_acc'].append(np.mean(a))
        # log
        log = f'Epoch {e}/{epochs}'
        for k, v in hist.items():
            log += f' {k} {v[-1]:.4f}'
        print(log)
        
    return hist

In [55]:
batch_size = 1024
dl = {
    'train': torch.utils.data.DataLoader(ds['train'], batch_size=batch_size, shuffle=True, num_workers=20, pin_memory=True),
    'val': torch.utils.data.DataLoader(ds['val'], batch_size=batch_size, shuffle=False, num_workers=20, pin_memory=True)
}
modelParallel = ModelParallel(gpu1, gpu2)
optimizer = torch.optim.Adam(modelParallel.parameters(), lr=1e-3)
hist = train_mp(modelParallel, dl, optimizer, gpu2, epochs=3)

training... loss 0.4027 acc 0.8729: 100%|██████████| 22/22 [00:07<00:00,  3.01it/s]
evluating... loss 2.9557 acc 0.6414: 100%|██████████| 6/6 [00:01<00:00,  3.39it/s]
  0%|          | 0/22 [00:00<?, ?it/s]

Epoch 1/3 loss 0.4027 acc 0.8729 val_loss 2.9557 val_acc 0.6414


training... loss 0.1123 acc 0.9647: 100%|██████████| 22/22 [00:07<00:00,  3.01it/s]
evluating... loss 0.3366 acc 0.9054: 100%|██████████| 6/6 [00:01<00:00,  3.34it/s]
  0%|          | 0/22 [00:00<?, ?it/s]

Epoch 2/3 loss 0.1123 acc 0.9647 val_loss 0.3366 val_acc 0.9054


training... loss 0.0522 acc 0.9828: 100%|██████████| 22/22 [00:07<00:00,  3.04it/s]
evluating... loss 0.2378 acc 0.9343: 100%|██████████| 6/6 [00:01<00:00,  3.41it/s]

Epoch 3/3 loss 0.0522 acc 0.9828 val_loss 0.2378 val_acc 0.9343





Se puede combinar con DDP.

## Refs

- https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html