# multi-gpu pytorch

In [40]:
!nvidia-smi

Sun Jun  7 18:09:23 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.82       Driver Version: 440.82       CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla T4            Off  | 00000000:00:1B.0 Off |                    0 |
| N/A   33C    P8     9W /  70W |     11MiB / 15109MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla T4            Off  | 00000000:00:1C.0 Off |                    0 |
| N/A   34C    P8     9W /  70W |     11MiB / 15109MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  Tesla T4            Off  | 00000000:00:1D.0 Off |                    0 |
| N/A   

Interesting to note that workspaces running on a multi-GPU instance only record metrics for the *first* GPU device in the details graph. We could do a better job with that.

For reference here's the basic complete distributed example (copied over from the `scratch` workspace):

```python
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import os

def init_process(rank, size, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)

def example(rank, world_size):
    init_process(rank, world_size)

    # TEMP: override rank with 0 because I'm running on a simple T40x1.
    # To get the full effect you need to run this code on an INSTANCE_TYPEx2 machine.
    rank = 0
    
    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()
    
    print(f"Finished process {rank}/{world_size}.")

def main():
    world_size = 2
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    main()
```

In [6]:
import torch.nn as nn
tmp = nn.Linear(10, 10)

In [15]:
class TmpModule(nn.Module):
    def __init__(self):
        super().__init__()
        self.ff1 = tmp

    def forward(X):
        return self.ff1(X)

In [49]:
%%writefile ../models/2_pytorch_distributed_model.py
import numpy as np
import nlp
import transformers
import torch
import torch.nn as nn
from transformers import GPT2Config, GPT2Model
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from torch.utils.tensorboard import SummaryWriter
import os

# NEW
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data.distributed import DistributedSampler

# NEW
def init_process(rank, size, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)

class IMDBDataset:
    def __init__(self, part):
        self.dataset = nlp.load_dataset('imdb')['train']
        self.tokenizer = transformers.GPT2Tokenizer.from_pretrained('gpt2')
    
    def __getitem__(self, idx):
        review = self.dataset[idx]
        label = torch.tensor(review['label'])
        text = torch.tensor(self.tokenizer.encode(review['text']))
        # The default GPT2 token length is 1024. The IMBD text review corpus is pretty long, and
        # the GPT2 BPE tokenizer is pretty verbose, so we exceed this character limit in ~3% of
        # cases. Since this is simple benchmark we are ignoring this problem (ConstantPad1d
        # just clips the last few out words out).
        text = nn.ConstantPad1d((1, 1024 - text.shape[0] - 1), 0)(text)
        return {'text': text, 'label': label}
    
    def __len__(self):
        return self.dataset.num_rows


class IMDBSentimentClassificationModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.gpt2_config = transformers.GPT2Config()
        self.gpt2_model = transformers.GPT2Model(self.gpt2_config)
        self.head = nn.Sequential(*[
            nn.Linear(768, 2**6),
            nn.Linear(2**6, 2**4),
            nn.Linear(2**4, 2),
            nn.LogSoftmax(dim=0)
        ])
    
    def forward(self, tokens):
        hidden_states, _ = self.gpt2_model(tokens)
        final_hidden_state = hidden_states[:, -1, :]
        out = self.head(final_hidden_state)
        return out


def get_dataloader(rank, world_size):
    dataset = IMDBDataset('train')    
    
    # NEW
    sampler = DistributedSampler(dataset, rank=rank, num_replicas=world_size, shuffle=True)
    dataloader = DataLoader(dataset, batch_size=4, sampler=sampler)
    
    return dataloader

def get_model():
    return IMDBSentimentClassificationModel()

def train(rank, num_epochs, world_size):
    print(f"Rank {rank}/{world_size} training process initialized.\n")
    
    model = get_model()
    model.cuda(rank)
    model.train()
    
    # NEW
    init_process(rank, world_size)
    
    # NEW
    # Since this is a single-instance multi-GPU training script, it's important
    # that only one process handle downloading of the data, to avoid race conditions
    # implicit in having multiple processes attempt to write to the same file
    # simultaneously.
    if rank == 0:
        nlp.load_dataset('imdb')
        transformers.GPT2Tokenizer.from_pretrained('gpt2')
    dist.barrier()
    print(f"Rank {rank}/{world_size} training process passed data download barrier.\n")
    
    # NEW
    model = DistributedDataParallel(model, device_ids=[rank])
    
    dataloader = get_dataloader(rank, world_size)

    loss_fn = nn.NLLLoss()

    # NEW
    # Since we are computing the average of several batches at once (an effective batch size of
    # world_size * batch_size) we scale the learning rate to match.
    optimizer = Adam(model.parameters(), lr=1e-3 * world_size)
    
    writer = SummaryWriter(f'/spell/tensorboards/model_1')

    for epoch in range(1, num_epochs + 1):
        losses = []

        for idx, batch in enumerate(dataloader):
            tokens, labels = batch['text'], batch['label']
            tokens = tokens.cuda(rank)
            labels = labels.cuda(rank)

            model.zero_grad()
            y_pred = model(tokens)
            
            loss = loss_fn(y_pred, labels)
            loss.backward()
            optimizer.step()

            losses.append(loss.item())

            if idx % 10 == 0:
                print(
                    f'Finished epoch {epoch}, rank {rank}/{world_size}, batch {idx}. '
                    f'Loss: {loss:.3f}.\n'
                )
            if rank == 0:
                writer.add_scalar('training loss', loss)
            losses.append(loss)

        print(
            f'Finished epoch {epoch}, rank {rank}/{world_size}. '
            f'Avg Loss: {np.mean(losses)}; Median Loss: {np.min(losses)}.\n'
        )
        
        if rank == 0:
            if not os.path.exists('/spell/checkpoints/'):
                os.mkdir('/spell/checkpoints/')
            torch.save(model.state_dict(), f'/spell/checkpoints/model_{epoch}.pth')

# NEW
NUM_EPOCHS = 20
WORLD_SIZE = torch.cuda.device_count()
def main():
    mp.spawn(train,
        args=(NUM_EPOCHS, WORLD_SIZE),
        nprocs=WORLD_SIZE,
        join=True)

if __name__=="__main__":
    main()


Overwriting ../models/2_pytorch_distributed_model.py


In [29]:
# import torch.multiprocessing as mp
# mp.spawn?

In [48]:
import torch
torch.cuda.device_count()

4

In [44]:
!python ../models/2_pytorch_distributed_model.py

Rank 3/4 training process initialized.

Rank 0/4 training process initialized.

Rank 1/4 training process initialized.

Rank 2/4 training process initialized.

Using downloaded and verified file: /mnt/pascal_voc_segmentation/VOCtrainval_11-May-2012.tar
Using downloaded and verified file: /mnt/pascal_voc_segmentation/VOCtrainval_11-May-2012.tarUsing downloaded and verified file: /mnt/pascal_voc_segmentation/VOCtrainval_11-May-2012.tar
Using downloaded and verified file: /mnt/pascal_voc_segmentation/VOCtrainval_11-May-2012.tar

Finished epoch 1, rank 0/4, batch 0. Loss: 3.202.

Finished epoch 1, rank 1/4, batch 0. Loss: 3.182.

Finished epoch 1, rank 3/4, batch 0. Loss: 3.134.

Finished epoch 1, rank 2/4, batch 0. Loss: 3.235.

Finished epoch 1, rank 0/4, batch 10. Loss: 1.991.

Finished epoch 1, rank 1/4, batch 10. Loss: 2.106.

Finished epoch 1, rank 3/4, batch 10. Loss: 1.620.

Finished epoch 1, rank 2/4, batch 10. Loss: 1.957.

Finished epoch 1, rank 0/4, batch 20. Loss: 1.036.

Fini

In [47]:
%ls ../checkpoints/

model_1.pth  model_2.pth  model_3.pth  model_4.pth  model_5.pth
