In [None]:
import torch
from torch.nn import DataParallel, Linear
import torchvision
from torch.utils.data.dataset import Dataset
from torch.utils.data.dataloader import DataLoader
import numpy as np
import  matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
%matplotlib inline


### Load MNIST Data

In [None]:
mnist_train = torchvision.datasets.MNIST(root = './MNIST_data',train=True,download=True)
mnist_train

#A dataset returning pil image and label for each __getitem__ call
print(mnist_train)

In [None]:
plt.figure(figsize = (12,20))
count = 7
for i in range(count):
    plt.subplot(1,count,i+1)
    index = np.random.randint(0,len(mnist_train),(1,))[0]
    im_array = np.asarray(mnist_train[index][0])
    label = mnist_train[index][1]
    imshow(im_array,cmap='gray',)
    plt.title(str(index) + ' label:'+str(label))

### Convert Images to array

In [None]:
class MyDataset(Dataset):
    def __init__(self, mnist_dataset):
        super(MyDataset,self).__init__()
        self.mnist_dataset = mnist_dataset
    def __getitem__(self, index):
        item = self.mnist_dataset[index]
        image = item[0]
        label = item[1]
        image_array = np.array(image,dtype = np.float32).reshape((-1))/255
        return image_array, label
    def __len__(self):
        return len(self.mnist_dataset)

In [None]:
train_ds = MyDataset(mnist_train)
print(train_ds[0][0].shape,train_ds[0][1])
train_ds[0][0][200:250],train_ds[0][1]

## Args

In [None]:
import argparse
args = argparse.Namespace(
    batch = 16,
    lr = 0.001,
    epochs =1
)
args

## Model

In [7]:
class LinearModel(torch.nn.Module):
    def __init__(self, input_size, output_size, verbose = False):
        super(LinearModel,self).__init__()
        self.fc1 = torch.nn.Linear(input_size,64)
        self.relu = torch.nn.ReLU()
        self.fc2 = torch.nn.Linear(64,output_size)
        self.verbose = verbose
    def forward(self, X):
        
        if self.verbose:
            print('X shape = {}, current cuda device = {}' \
                  .format(X.shape,torch.cuda.current_device() if torch.cuda.is_available() else 'NAN'))
        return self.fc2(self.relu(self.fc1(X)))

In [8]:
m = LinearModel(784,10,True)
m(torch.Tensor(train_ds[0][0]))

X shape = torch.Size([784]), current cuda device = 0


tensor([-0.0154,  0.1979,  0.1550,  0.0505,  0.0616, -0.0447, -0.0850, -0.0380,
        -0.1357, -0.1196], grad_fn=<AddBackward0>)

## DataLoader

In [9]:
dataloader = DataLoader(train_ds, batch_size=args.batch)

In [10]:
#converts arrays into tensor and creates batches
for data in dataloader:
    print(data)
    #print(model(data[0]))
    break

[tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]]), tensor([5, 0, 4, 1, 9, 2, 1, 3, 1, 4, 3, 5, 3, 6, 1, 7])]


## Train

In [11]:
from tqdm import tqdm
def train(model, optimizer, dataloader, device):
    model.train()
    print('using device',device)
    losses = []
    for epoch in range(args.epochs):
        for x,y in tqdm(dataloader, position=0):
            optimizer.zero_grad()
            x = x.to(device)
            y = y.to(device)
            
            yhat = model(x)
            
            loss = torch.nn.CrossEntropyLoss()(yhat, y)

            loss.backward()

            optimizer.step()
            losses.append(loss.item())
            
        print ('loss at epoch {} is {}'.format(epoch, losses[-1]))
        for param in model.parameters():
            print(param)

## CPU

In [18]:
model = LinearModel(784,10)
optimizer = torch.optim.Adam(params = model.parameters(), lr=args.lr)

In [19]:
%%time
cpu = torch.device('cpu')
train(model,optimizer,dataloader, cpu)

  0%|          | 1/3750 [00:00<09:08,  6.84it/s]

using device cpu


100%|██████████| 3750/3750 [00:20<00:00, 187.12it/s]

loss at epoch 0 is 0.018243134021759033
CPU times: user 23.5 s, sys: 17.5 s, total: 41.1 s
Wall time: 20 s





## GPU 

In [79]:
gpu = torch.device('cuda')
model = LinearModel(784,10)
model = model.to(gpu)
dataloader = DataLoader(train_ds,batch_size=args.batch)
optimizer = torch.optim.Adam(params = model.parameters(), lr=args.lr)

%time train(model, optimizer, dataloader, gpu)

  1%|          | 38/3750 [00:00<00:09, 374.51it/s]

using device cuda


100%|██████████| 3750/3750 [00:10<00:00, 352.41it/s]

loss at epoch 0 is 0.031419962644577026
CPU times: user 10.7 s, sys: 28.2 ms, total: 10.7 s
Wall time: 10.6 s





#### Higher batch size

In [80]:

dataloader = DataLoader(train_ds,batch_size=args.batch *4)
optimizer = torch.optim.Adam(params = model.parameters(), lr=args.lr*4)
%time train(model, optimizer, dataloader, gpu)

  1%|▏         | 14/938 [00:00<00:06, 134.64it/s]

using device cuda


100%|██████████| 938/938 [00:06<00:00, 140.09it/s]

loss at epoch 0 is 0.056974977254867554
CPU times: user 6.73 s, sys: 0 ns, total: 6.73 s
Wall time: 6.7 s





## DataParallel
single process, multiple GPUs

    This container parallelizes the application of the given :attr:`module` by
    splitting the input across the specified devices by chunking in the batch
    dimension (other objects will be copied once per device). In the forward
    pass, the module is replicated on each device, and each replica handles a
    portion of the input. During the backwards pass, gradients from each replica
    are summed into the original module.

Source: https://pytorch.org/docs/stable/_modules/torch/nn/parallel/data_parallel.html

The batch size should be larger than the number of GPUs used.


1. This is not very optimal as every forward run transfers the model and data portion between GPUs. 
1. Unless the processing in the module takes significant amount of time, this is not very useful.
1. This is even slower than single GPU for simpler neural nets.

        inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
        if len(self.device_ids) == 1:
            return self.module(*inputs[0], **kwargs[0])
        replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
        outputs = self.parallel_apply(replicas, inputs, kwargs)
        return self.gather(outputs, self.output_device)

Issues : https://medium.com/huggingface/training-larger-batches-practical-tips-on-1-gpu-multi-gpu-distributed-setups-ec88c3e51255

In [75]:
model = LinearModel(784,10,verbose = False).to(gpu)
model = torch.nn.DataParallel(model)

#Each batch is split into number of gpus and data is scattered. Use verbose = True to verify parallel processing
dataloader = DataLoader(train_ds,batch_size=args.batch)
optimizer = torch.optim.Adam(params = model.parameters(), lr=args.lr)

%time train(model, optimizer, dataloader, gpu)

  0%|          | 17/3750 [00:00<00:22, 169.62it/s]

using device cuda


100%|██████████| 3750/3750 [00:21<00:00, 176.08it/s]

loss at epoch 0 is 0.02713838219642639
CPU times: user 25.4 s, sys: 2.39 s, total: 27.8 s
Wall time: 21.3 s





#### Using Higher batch size
Even for higher batch sizes, it's still slower than single GPU

In [76]:
#Batch size can be increased to take advantage of the parallel processing
dataloader = DataLoader(train_ds,batch_size=args.batch *4)
#increase the lr for high batch size. (Provide proof for this)
optimizer = torch.optim.Adam(params = model.parameters(), lr=args.lr * 4)

%time train(model, optimizer, dataloader, gpu)

  1%|▏         | 12/938 [00:00<00:08, 110.95it/s]

using device cuda


100%|██████████| 938/938 [00:08<00:00, 110.32it/s]

loss at epoch 0 is 0.02199617028236389
CPU times: user 9.62 s, sys: 620 ms, total: 10.2 s
Wall time: 8.5 s





## Distributed data parallel
Each GPU can be run in it's own process.

1. Models are not transferred. 
1. Each process has it's own model initialized. 
1. Only gradients are summed up using all_reduce operation
1. The seed should be same everywhere for consistency
    
This should be faster than Dataparallel. The only comppinication cost is all reduce.

Tutorial: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Data sampler source : https://pytorch.org/docs/stable/_modules/torch/utils/data/distributed.html#DistributedSampler

In [12]:
#%%writefile dist_training.py
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data.distributed import DistributedSampler
import torch.distributed as dist
import os,torch,sys
import datetime

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12350'
    print('setting up')
    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size, timeout=datetime.timedelta(0,seconds =  5))

    # Explicitly setting seed to make sure that models created in two processes
    # start from same random weights and biases.
    torch.manual_seed(42)


def cleanup():
    dist.destroy_process_group()
    
def run_training(rank, world_size):
    print('inside run_training rank {} and world_size {}'.format(rank,world_size))
    
    #initialize process group and set seed
    setup(rank,world_size)
    
    #print('After setup rank = ', dist.get_rank())
    device_id = rank
    torch.cuda.set_device(device_id)
    
    #Initialize model in it's own device. Can be any device. 
    #Better to initialize in own device for avoiding memory constraints (theory)
    model = LinearModel(784,10,verbose = False).to(device_id)
    
    #Use Distributed data parallel. Check the documentation of DistributedDataParallel?
    ddp_model = DistributedDataParallel(model,device_ids=[device_id], output_device=device_id)
    
    #Check where all_reduce happens
    optimizer = torch.optim.Adam(ddp_model.parameters(), lr = args.lr)
    
    #A sampler is necessary as each process should work only on a subset of data. 
    #No need to pass rank if process group is initialized. Uses rank = dist.get_rank() to get rank
    distributed_sampler = DistributedSampler(train_ds, num_replicas = world_size)
    
    dataloader = DataLoader(train_ds,batch_size=args.batch, sampler= distributed_sampler)
    train(ddp_model, optimizer, dataloader, device_id)
    
    cleanup()
    


import argparse
if __name__ == '__main2__':
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--rank', help = 'Rank of the process')
    parser.add_argument('--size',help = 'world size')
    args = parser.parse_args()
    print('starting distibuted training', args)

    run_this(args.rank, args.size)
    
#run_training(0,1)

Now Create 4 processes and start training.
If run_training throws cuda error, restart kernel

In [14]:
from torch.multiprocessing import Process
def test (rank, size):
    print('Inside. rank = {}, size = {}'.format(rank,size))
def startprocesses(function, ranks, size):
    processes = []
    for rank in ranks:
        p = Process(target=function, args=(rank, size))
        p.start()
        processes.append(p)
    print(processes)
    for p in processes:
        p.join()
    print('Processes finished execution')
    
#startprocesses(test, [0,1],2)

#One process
%time startprocesses(run_training, [0],1)

#Two processes
#%time startprocesses(run_training, [0,1],2)

inside run_training rank 0 and world_size 1
setting up


Process Process-4:
Traceback (most recent call last):
  File "/anaconda/envs/py36/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/anaconda/envs/py36/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-12-8ad920580000>", line 33, in run_training
    torch.cuda.set_device(device_id)
  File "/anaconda/envs/py36/lib/python3.6/site-packages/torch/cuda/__init__.py", line 265, in set_device
    torch._C._cuda_setDevice(device)
RuntimeError: cuda runtime error (3) : initialization error at /opt/conda/conda-bld/pytorch_1556653099582/work/torch/csrc/cuda/Module.cpp:33


[<Process(Process-4, started)>]
Processes finished execution
CPU times: user 3.48 ms, sys: 7.82 ms, total: 11.3 ms
Wall time: 22.2 ms


In [None]:
run_this(0,1)