In [36]:
!pip install multiprocess matplotlib



In [37]:
%matplotlib inline

In [38]:
import os
import sys
import tempfile
import torch

import torch.nn.functional as F

import torch.nn as nn
import torch.optim as optim
import multiprocess as mp

from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist

import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np

In [39]:
import time, gc

# Timing utilities
start_time = None

def start_timer():
    global start_time
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.reset_max_memory_allocated()
    torch.cuda.synchronize()
    start_time = time.time()

def end_timer_and_print(local_msg):
    torch.cuda.synchronize()
    end_time = time.time()
    print("\n" + local_msg)
    print("Total execution time = {:.3f} sec".format(end_time - start_time))
    print("Max memory used by tensors = {} bytes".format(torch.cuda.max_memory_allocated()))

In [40]:
nodes=1
gpus=2
nr=0
num_workers=1

batch_size=4
epochs=2

world_size = gpus * nodes

In [41]:
def dataloader(gpu,world_size,batch_size,num_workers):
    transform = transforms.Compose(
    [transforms.ToTensor(),
     #transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    trainset = torchvision.datasets.CIFAR10(root='/home/mohsin/data', train=True,
                                        download=True, transform=transform)
    trainSampler = torch.utils.data.distributed.DistributedSampler(trainset,
                                                               num_replicas=world_size,
                                                               rank=gpu)
    trainloader = torch.utils.data.DataLoader(trainset, 
                                          batch_size=batch_size,
                                          shuffle=False, 
                                          num_workers=num_workers,
                                          pin_memory=True,
                                          sampler=trainSampler)
                                         

    testset = torchvision.datasets.CIFAR10(root='/home/mohsin/data', train=False,
                                       download=True, transform=transform)
    testSampler = torch.utils.data.distributed.DistributedSampler(testset,
                                                                  num_replicas=world_size,
                                                                  rank=gpu)
    testloader = torch.utils.data.DataLoader(testset, 
                                             batch_size=batch_size,
                                             shuffle=False, 
                                             num_workers=num_workers,
                                             pin_memory=True,
                                             sampler=testSampler)

    classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
    return trainloader,testloader

Lets check the shape of the training dataloader

THe above shows that we have a total of 50,000 pictures of 10 classes in training dataset. 

Setting the batch_size=4 means we that our input will be 4 pictures i.e. 4*(3x32x32) pixels fed to our model at a time.
This implies that our training loop will do 50000/4 = 12500 trips across the PCIe bus. 

Let us show some of the training images

### 2. Define a Convolutional Neural Network
Copy the neural network from the Neural Networks section before and modify it to
take 3-channel images (instead of 1-channel images as it was defined).



In [42]:
# Our naive model

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        # Layers
        self.conv1 = nn.Conv2d(3, 24, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(24, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    # Activations    
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

an alternative model from the torchvision catalouge

net=torchvision.models.vgg11()

### 3. Define a Loss function and optimizer
Let's use a Classification Cross-Entropy loss and SGD with momentum.



### 4. Train the network

This is when things start to get interesting.
We simply have to loop over our data iterator, and feed the inputs to the
network and optimize.



In [43]:
def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

In [44]:
def train (gpu_id,nr,gpus,world_size,epochs,batch_size):
    start_timer()
    trainloader,testloader = dataloader(gpu_id,world_size,batch_size,num_workers)
    rank = nr*gpus + gpu_id
    setup(rank,world_size)
    torch.manual_seed(0)
    net=Net()
    if torch.cuda.is_available():
        torch.cuda.set_device(gpu_id)
        net.cuda(gpu_id)
    device = torch.device('cuda:%d'%(gpu_id) if torch.cuda.is_available() else 'cpu')
    
    criterion = nn.CrossEntropyLoss().cuda(gpu_id)
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
    
    # Wrap model as DDP
    net = torch.nn.parallel.DistributedDataParallel(net)
    for epoch in range(epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data[0].to(device), data[1].to(device)
        
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if (i % 2000 == 1999) and (gpu_id == 0):    # print every 2000 mini-batches
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
                running_loss = 0.0

    end_timer_and_print('Finished Training')

In [45]:
with mp.Pool(gpus) as pool:
    print(pool.starmap(train, [(i,nr,gpus,world_size,epochs,batch_size) for i in range(gpus)]))



Files already downloaded and verifiedFiles already downloaded and verified

Files already downloaded and verifiedFiles already downloaded and verified

[1,  2000] loss: 2.241
[1,  4000] loss: 1.993
[1,  6000] loss: 1.783
[2,  2000] loss: 1.597
[2,  4000] loss: 1.513
[2,  6000] loss: 1.470

Finished Training

Finished TrainingTotal execution time = 59.700 sec

Total execution time = 59.705 secMax memory used by tensors = 3844096 bytes

Max memory used by tensors = 3844096 bytes
[None, None]
