"""
As Data Parallel uses threading to achieve parallelism, it suffers from a major well-known issue that arise due to Global Interpreter Lock (GIL) in Python. The way Python interpreter is designed, it is not possible to achieve perfect parallelism in Python using threading. Let’s see what GIL is.

Global Interpreter Lock (GIL)
As I mentioned earlier, the way Python interpreter is implemented, it is very difficult to achieve perfect parallelism using threading. This is due to something called Global Interpreter Lock.

GIL
The Python Global Interpreter Lock or GIL, in simple words, is a mutex (or a lock) that allows only one thread to hold the control of the Python interpreter. Only one thread can be in a state of execution at any point in time.

Mutex
Mutex is a mutual exclusion object that synchronizes access to a resource. It is created with a unique name at the start of a program. The Mutex is a locking mechanism that makes sure only one thread can acquire the Mutex at a time and enter the critical section.

This basically defeats the whole purpose of using threading in the first place. Which is why we have something in PyTorch that can be used to achieve perfect parallelism.

"""

"""Distributed Data Parallel in PyTorch
DDP in PyTorch does the same thing but in a much proficient way and also gives us better control while achieving perfect parallelism. DDP uses multiprocessing instead of threading and executes propagation through the model as a different process for each GPU. DDP duplicates the model across multiple GPUs, each of which is controlled by one process. A process here can be called a script that runs on your system. Usually we spawn processes such that there is a separate process for each GPU.

Each of the process here does identical tasks but with different batch of data. Each process communicates with other processes to share gradients which needs to be all-reduced during the optimization step. At the end of an optimization step each process has averaged gradients, ensuring that the model weights stay synchronized.
"""

In [None]:
“node” is a system in your distributed architecture. In lay man’s terms, a single system that has multiple GPUs can be called as a node.

“global rank” is a unique identification number for each node in our architecture.

“local rank” is a unique identification number for processes in each node.

“world” is a union of all of the above which can have multiple nodes where each node spawns multiple processes. (Ideally, one for each GPU)

“world_size” is equal to number of nodes * number of gpus

In [1]:
import torch
import torch.nn as nn
import torch.distributed as dist
import torchvision
import torchvision.transforms as transforms

import argparse
from datetime import datetime
import os
import torch.multiprocessing as mp

In [2]:
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

In [None]:

def initialize():
    nodes = 1
    nr = 0
    gpus = torch.cuda.device_count()
    epochs = 2
    world_size = gpus * nodes
#     parser = argparse.ArgumentParser()
#     parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
#     parser.add_argument('-g', '--gpus', default=1, type=int,
#                         help='number of gpus per node')
#     parser.add_argument('-nr', '--nr', default=0, type=int,
#                         help='ranking within the nodes')
#     parser.add_argument('--epochs', default=2, type=int, metavar='N',
#                         help='number of total epochs to run')
#     args = parser.parse_args()

#     args.world_size = args.gpus * args.nodes
    os.environ['MASTER_ADDR'] = '192.168.1.3'
    os.environ['MASTER_PORT'] = '8888'


In [None]:
def train(gpu, args):

    rank = args.nr * args.gpus + gpu
    dist.init_process_group(
    	backend='nccl',
        init_method='env://',
    	world_size=args.world_size,
    	rank=rank
    )

    torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu)
    model.cuda(gpu)
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)

    # Wrapper around our model to handle parallel training
    model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])

    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    
    # Sampler that takes care of the distribution of the batches such that
    # the data is not repeated in the iteration and sampled accordingly
    train_sampler = torch.utils.data.distributed.DistributedSampler(
    	train_dataset,
    	num_replicas=args.world_size,
    	rank=rank
    )
    
    # We pass in the train_sampler which can be used by the DataLoader
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=False,
                                               num_workers=0,
                                               pin_memory=True,
                                               sampler=train_sampler)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1, 
                    args.epochs, 
                    i + 1, 
                    total_step,
                    loss.item())
                   )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))