## Boilerplate code

In [1]:
%matplotlib inline

In [2]:
import time, gc

# Timing utilities
start_time = None

def start_timer():
    global start_time
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.reset_max_memory_allocated()
    torch.cuda.synchronize()
    start_time = time.time()

def end_timer_and_print(local_msg):
    torch.cuda.synchronize()
    end_time = time.time()
    print("\n" + local_msg)
    print("Total execution time = {:.3f} sec".format(end_time - start_time))
    print("Max memory used by tensors = {} bytes".format(torch.cuda.max_memory_allocated()))

In [3]:
import torch, datetime, os

# Business as usual
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from torch.cuda import amp

import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np



In [4]:
torch.manual_seed(43)
cudnn.deterministic = True
cudnn.benchmark = False

In [5]:
# import and instantiate tensorboard for monitoring model performance
from torch.utils.tensorboard import SummaryWriter

### Additional package 
Required for DDP implementation

In [6]:
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

Setting resources and variables for training in a Jupyter notebook.
In a python script version of the code, this section should be parsed in as arguments.

In [7]:
nodes=1
gpus=2
num_workers=10
batch_size=512
epochs=4

## Miscellaneous utility funtions

In [8]:
def accuracy(outputs, labels):
    _, preds = outputs.max(dim=1)
    return torch.sum(preds == labels).item()

## DataLoader
Add a data management section to load and transform data.
Here we manage not only the data location but also how it is loaded into memory

In [9]:
def dataloader(gpu,world_size,batch_size,num_workers):
# Prepare training data
    train_transform = transforms.Compose(
    [transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    ])


    val_transform = transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    ])
    datadir=os.environ['DATA_DIR']
    trainset = torchvision.datasets.ImageFolder(root=os.path.join(datadir,'train'),
                                                transform=train_transform)
    trainSampler = torch.utils.data.distributed.DistributedSampler(trainset,
                                                               num_replicas=world_size,
                                                               rank=gpu,
                                                               shuffle=True,
                                                               drop_last=True)
    trainloader = torch.utils.data.DataLoader(trainset, 
                                          batch_size=batch_size,
                                          shuffle=False, 
                                          num_workers=num_workers,
                                          pin_memory=True,
                                          sampler=trainSampler)
                                         

    valset = torchvision.datasets.ImageFolder(root=os.path.join(datadir,'val'),
                                              transform=val_transform)
    valSampler = torch.utils.data.distributed.DistributedSampler(valset,
                                                                  num_replicas=world_size,
                                                                  rank=gpu,shuffle=True)
    valloader = torch.utils.data.DataLoader(valset, 
                                             batch_size=batch_size,
                                             shuffle=False, 
                                             num_workers=num_workers,
                                             pin_memory=True,
                                             sampler=valSampler)
    return trainloader,valloader

## Choose a Neural Network architecture



In [10]:
net=torchvision.models.resnet50()

## Training
Some additions and modifications are required to your training section. E.g.
- Define a function for setting up multiple GPU context (using awareness of the environment)
    - Here you can select the backend or the communication library to move data between memory of GPUs
- Define a function and add the training steps in it
    - Wrap model in DistributedDataParallel class
    - The model, loss function and optimizer needs to be offloaded to each device using the corresponding gpu_id
    - Figure out which tasks will be done exclusively master process (gpu_id==0)
        - e.g. printing
- Define a function that setups up the training environment and then calls the training



In [11]:
def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

In [12]:
def train (net,gpus,world_size,rank,epochs,batch_size):
    gpu_id=rank
    scaler = amp.GradScaler()
    net.cuda(gpu_id)
    net = nn.SyncBatchNorm.convert_sync_batchnorm(net)
    
    criterion = nn.CrossEntropyLoss().cuda(gpu_id)
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
    
    local_rank = gpu_id #int(os.environ['LOCAL_RANK'])
    trainloader, valloader = dataloader(gpu_id,world_size,
                                        batch_size,
                                        num_workers)
    # Wrap model as DDP
    net = torch.nn.parallel.DistributedDataParallel(net,device_ids=[local_rank],
                                                   output_device=None, )
    start_timer()
    print('Starting training on GPU %d of %d'%(gpu_id,world_size))
    for epoch in range(epochs):  # loop over the dataset multiple times
        train_loss = 0.0
        train_acc  = 0
        trainloader.sampler.set_epoch(epoch)
        for i, data in enumerate(trainloader):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data[0].cuda(gpu_id, non_blocking=True), data[1].cuda(gpu_id,non_blocking=True)
        
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            train_loss += loss.item()
            train_acc  += accuracy(outputs,labels)           

        valloader.sampler.set_epoch(epoch)
        val_loss = 0.0
        val_acc  = 0
        for i, data in enumerate(valloader):
            inputs, labels = data[0].cuda(gpu_id,non_blocking=True), data[1].cuda(gpu_id,non_blocking=True)
            with torch.no_grad():
                outputs = net(inputs)
                loss = criterion(outputs, labels)
            val_loss += loss.item() 
            val_acc  += accuracy(outputs,labels)

        # Gather accuracy metric from all training units on GPU 0  
        # to calculate an average over the size training dataset    
        train_acc = torch.tensor(train_acc).cuda(gpu_id)
        dist.reduce(train_acc,0,dist.ReduceOp.SUM)
        val_acc = torch.tensor(val_acc).cuda(gpu_id)
        dist.reduce(val_acc,0,dist.ReduceOp.SUM)
        
        # Print from GPU 0
        if gpu_id ==0:
            val_loss   = val_loss / len(valloader.dataset.targets)
            val_acc    = 100 * (val_acc.item() / len(trainloader.dataset.targets))
            train_loss = train_loss / len(trainloader.dataset.targets)
            train_acc  = 100 * (train_acc.item() / len(trainloader.dataset.targets))
            print(f'[{epoch + 1}] :Loss (train, val):{train_loss:.3f}, {val_loss:.3f}| Accuracy (train,val): {train_acc:.3f}, {val_acc:.3f}')

    if gpu_id == 0:
        end_timer_and_print('Finished Training')

In [13]:
def main(net,gpus,epochs,batch_size):
    world_size = 2
    setup(rank, world_size)
    train(net,gpus,world_size,rank,epochs,batch_size)
    return True

In [None]:
import multiprocess as mp
num_processes = gpus
# NOTE: this is required for the ``fork`` method to work
net.share_memory()
processes = []
for rank in range(num_processes):
    p = mp.Process(target=main, args=(net,gpus,epochs,batch_size))
    p.start()
    processes.append(p)
for p in processes:
    p.join()




Starting training on GPU 0 of 2



Starting training on GPU 1 of 2
[1] :Loss (train, val):0.006, 0.005| Accuracy (train,val): 0.656, 0.078
