In [3]:
import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist

print(torch.__version__)
print(torch.cuda.is_available())
print(torch.cuda.device_count())
print(torch.cuda.get_device_name())

1.2.0
True
4
Tesla V100-SXM2-16GB


In [4]:
!ifconfig

enP5p1s0f0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 169.254.10.231  netmask 255.255.255.0  broadcast 169.254.10.255
        inet6 fe80::a94:efff:fe80:317  prefixlen 64  scopeid 0x20<link>
        ether 08:94:ef:80:03:17  txqueuelen 1000  (Ethernet)
        RX packets 234867969  bytes 20748221612 (19.3 GiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 208396066  bytes 42416777951 (39.5 GiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0
        device interrupt 80  

enP5p1s0f1: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 192.168.20.12  netmask 255.255.255.0  broadcast 192.168.20.255
        inet6 fe80::a94:efff:fe80:318  prefixlen 64  scopeid 0x20<link>
        ether 08:94:ef:80:03:18  txqueuelen 1000  (Ethernet)
        RX packets 165965522  bytes 16537219704 (15.4 GiB)
        RX errors 0  dropped 173  overruns 0  frame 0
        TX packets 234344805  bytes 184344974912 (171.

In [5]:
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

In [6]:
model = ConvNet()
print(model)

ConvNet(
  (layer1): Sequential(
    (0): Conv2d(1, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer2): Sequential(
    (0): Conv2d(16, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc): Linear(in_features=1568, out_features=10, bias=True)
)


## a single gpu on a single node

In [5]:
def train(gpu, epochs):
    torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu)
    model.cuda(gpu)
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='/home/kexu6/DATA',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=True,
                                               num_workers=0,
                                               pin_memory=True)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1, 
                    epochs, 
                    i + 1, 
                    total_step,
                    loss.item())
                   )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))

In [6]:
train(gpu=0, epochs=10)

Epoch [1/10], Step [100/600], Loss: 2.1626
Epoch [1/10], Step [200/600], Loss: 1.9929
Epoch [1/10], Step [300/600], Loss: 1.9224
Epoch [1/10], Step [400/600], Loss: 1.7479
Epoch [1/10], Step [500/600], Loss: 1.6264
Epoch [1/10], Step [600/600], Loss: 1.5411
Epoch [2/10], Step [100/600], Loss: 1.4387
Epoch [2/10], Step [200/600], Loss: 1.3242
Epoch [2/10], Step [300/600], Loss: 1.2894
Epoch [2/10], Step [400/600], Loss: 1.1754
Epoch [2/10], Step [500/600], Loss: 1.1271
Epoch [2/10], Step [600/600], Loss: 1.1246
Epoch [3/10], Step [100/600], Loss: 1.1838
Epoch [3/10], Step [200/600], Loss: 1.0215
Epoch [3/10], Step [300/600], Loss: 1.0146
Epoch [3/10], Step [400/600], Loss: 0.9684
Epoch [3/10], Step [500/600], Loss: 0.9130
Epoch [3/10], Step [600/600], Loss: 0.9182
Epoch [4/10], Step [100/600], Loss: 0.9589
Epoch [4/10], Step [200/600], Loss: 0.8122
Epoch [4/10], Step [300/600], Loss: 0.8425
Epoch [4/10], Step [400/600], Loss: 0.8240
Epoch [4/10], Step [500/600], Loss: 0.7570
Epoch [4/10

## Data Paralell
https://pytorch.org/tutorials/beginner/blitz/data_parallel_tutorial.html

Pytorch has two ways to split models and data across multiple GPUs: `nn.DataParallel` and `nn.DistributedDataParallel`. `nn.DataParallel` is easier to use (just wrap the model and run your training script). However, because it uses one process to compute the model weights and then distribute them to each GPU during each batch, networking quickly becomes a bottle-neck and GPU utilization is often very low. Furthermore, nn.DataParallel requires that all the GPUs be on the same node and doesn’t work with Apex for mixed-precision training.

In [7]:
def train(gpu, epochs):
    torch.manual_seed(0)
    model = ConvNet()
    
    if torch.cuda.device_count() > 1:
        print("Using ", torch.cuda.device_count(), " GPUs!")
        model = nn.DataParallel(model)

    model.cuda(gpu)
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='/home/kexu6/DATA',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=True,
                                               num_workers=0,
                                               pin_memory=True)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1, 
                    epochs, 
                    i + 1, 
                    total_step,
                    loss.item())
                   )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))

In [8]:
train(gpu=0, epochs=10)

Using  4  GPUs!
Epoch [1/10], Step [100/600], Loss: 2.1628
Epoch [1/10], Step [200/600], Loss: 1.9898
Epoch [1/10], Step [300/600], Loss: 1.9229
Epoch [1/10], Step [400/600], Loss: 1.7464
Epoch [1/10], Step [500/600], Loss: 1.6278
Epoch [1/10], Step [600/600], Loss: 1.5438
Epoch [2/10], Step [100/600], Loss: 1.4396
Epoch [2/10], Step [200/600], Loss: 1.3246
Epoch [2/10], Step [300/600], Loss: 1.2891
Epoch [2/10], Step [400/600], Loss: 1.1769
Epoch [2/10], Step [500/600], Loss: 1.1300
Epoch [2/10], Step [600/600], Loss: 1.1270
Epoch [3/10], Step [100/600], Loss: 1.1857
Epoch [3/10], Step [200/600], Loss: 1.0241
Epoch [3/10], Step [300/600], Loss: 1.0166
Epoch [3/10], Step [400/600], Loss: 0.9708
Epoch [3/10], Step [500/600], Loss: 0.9145
Epoch [3/10], Step [600/600], Loss: 0.9214
Epoch [4/10], Step [100/600], Loss: 0.9587
Epoch [4/10], Step [200/600], Loss: 0.8138
Epoch [4/10], Step [300/600], Loss: 0.8442
Epoch [4/10], Step [400/600], Loss: 0.8242
Epoch [4/10], Step [500/600], Loss: 0.

## Distributed Data Paralell
https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html

https://github.com/yangkky/distributed_tutorial/blob/master/src/mnist-distributed.py

https://github.com/pytorch/examples/blob/master/imagenet/main.py

https://medium.com/intel-student-ambassadors/distributed-training-of-deep-learning-models-with-pytorch-1123fa538848

http://www.telesens.co/2019/04/04/distributed-data-parallel-training-using-pytorch-on-aws/

http://seba1511.net/dist_blog/

Works for both single-node(multi-GPU) and multi-node data parallel training. It is proven to be significantly faster than `torch.nn.DataParallel` for single-node multi-GPU data parallel training. `nccl` backend is currently the fastest and highly recommended backend to be used with distributed training and this applies to both single-node and multi-node distributed training.

Multiprocessing with DistributedDataParallel duplicates the model on each GPU on each computer node. The GPUs can all be on the same node or spread across multiple nodes. If you have 2 computer nodes with 4 GPUs each, you have a total of 8 model replicas. Each replica is controlled by one process and handles a portion of the input data.  Every process does identical tasks, and each process communicates with all the others. During the backwards pass, gradients from each node are averaged. Only gradients are passed between the processes/GPUs so that network communication is less of a bottleneck.

During training, each process loads its own minibatches from disk and passes them to its GPU. Each GPU does its own forward pass, and then the gradients are all-reduced across the GPUs. Gradients for each layer do not depend on previous layers, so the gradient all-reduce is calculated concurrently with the backwards pass to futher alleviate the networking bottleneck. At the end of the backwards pass, every node has the averaged gradients, ensuring that the model weights stay synchronized.

All this requires that the multiple processes, possibly on multiple nodes, are synchronized and communicate. Pytorch does this through its `distributed.init_process_group` function. This function needs to know where to find process 0 so that all the processes can sync up and the total number of processes to expect. Each individual process also needs to know the total number of processes as well as its rank within the processes and which GPU to use. It’s common to call the total number of processes the `world size`. Finally, each process needs to know which slice of the data to work on so that the batches are non-overlapping. Pytorch provides `nn.utils.data.DistributedSampler` to accomplish this.

In [22]:
def train(gpu, world_size, nr, gpus):
    ############################################################
    rank = nr * gpus + gpu  # global rank of the process                       
    dist.init_process_group(backend='nccl', 
                            init_method='env://',                                   
                            world_size=world_size,                              
                            rank=rank)                                                          
    ############################################################
    
    torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu)
    model.cuda(gpu)
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    
    ###############################################################
    # Wrap the model
    model = nn.parallel.DistributedDataParallel(model,
                                                device_ids=[gpu])
    ###############################################################

    # Data loading code
    train_dataset = torchvision.datasets.MNIST(
        root='./data',
        train=True,
        transform=transforms.ToTensor(),
        download=True
    )                                               
    ################################################################
    # makes sure that each process gets a different slice of the training data
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                    num_replicas=world_size,
                                                                    rank=rank)
    ################################################################
    # Use the nn.utils.data.DistributedSampler instead of shuffling the usual way
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=False,
                                               num_workers=0,
                                               pin_memory=True,
                                               sampler=train_sampler)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1, 
                    epochs, 
                    i + 1, 
                    total_step,
                    loss.item())
                   )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))

In [23]:
gpus = 8 # number of gpus on each node
nodes = 4 # total number of nodes we’re going to use
world_size = gpus * nodes # total number of processes
nr = i # the rank of the current node within all the nodes, and goes from 0 to args.nodes - 1
os.environ['MASTER_ADDR'] = '169.254.10.231' # what IP address to look at for process 0. It needs this so that all the processes can sync up initially             
os.environ['MASTER_PORT'] = '8888'  
# spawn args.gpus processes, each of which runs train(i, args), where i goes from 0 to args.gpus - 1
mp.spawn(train, nprocs=gpus, args=(gpu, world_size, nr, gpus))

NameError: name 'i' is not defined

In [1]:
(wmlce-v1.6.2-py3.7) [kexu6@hal12 src]$ python mnist_distributed.py -n 1 -g 4 -nr 0 --epochs 10
Epoch [1/10], Step [50/150], Loss: 2.2473
Epoch [1/10], Step [100/150], Loss: 2.1133
Epoch [1/10], Step [150/150], Loss: 2.0436
Epoch [2/10], Step [50/150], Loss: 2.0311
Epoch [2/10], Step [100/150], Loss: 1.9043
Epoch [2/10], Step [150/150], Loss: 1.8384
Epoch [3/10], Step [50/150], Loss: 1.8513
Epoch [3/10], Step [100/150], Loss: 1.7253
Epoch [3/10], Step [150/150], Loss: 1.6633
Epoch [4/10], Step [50/150], Loss: 1.6923
Epoch [4/10], Step [100/150], Loss: 1.5687
Epoch [4/10], Step [150/150], Loss: 1.5136
Epoch [5/10], Step [50/150], Loss: 1.5525
Epoch [5/10], Step [100/150], Loss: 1.4323
Epoch [5/10], Step [150/150], Loss: 1.3860
Epoch [6/10], Step [50/150], Loss: 1.4303
Epoch [6/10], Step [100/150], Loss: 1.3143
Epoch [6/10], Step [150/150], Loss: 1.2771
Epoch [7/10], Step [50/150], Loss: 1.3240
Epoch [7/10], Step [100/150], Loss: 1.2118
Epoch [7/10], Step [150/150], Loss: 1.1835
Epoch [8/10], Step [50/150], Loss: 1.2314
Epoch [8/10], Step [100/150], Loss: 1.1234
Epoch [8/10], Step [150/150], Loss: 1.1032
Epoch [9/10], Step [50/150], Loss: 1.1503
Epoch [9/10], Step [100/150], Loss: 1.0464
Epoch [9/10], Step [150/150], Loss: 1.0338
Epoch [10/10], Step [50/150], Loss: 1.0792
Epoch [10/10], Step [100/150], Loss: 0.9793
Epoch [10/10], Step [150/150], Loss: 0.9734
Training complete in: 0:00:26.337336

SyntaxError: invalid syntax (<ipython-input-1-5f034cb220d0>, line 1)

## With Apex for mixed precision
* mnist apex: https://github.com/yangkky/distributed_tutorial/blob/master/src/mnist-mixed.py
* apex examples: https://github.com/nvidia/apex/tree/master/examples
* apex tutorial: https://devblogs.nvidia.com/apex-pytorch-easy-mixed-precision-training/
* apex tutorial: https://developer.nvidia.com/automatic-mixed-precision
* apex doc: https://docs.nvidia.com/deeplearning/sdk/mixed-precision-training/index.html

Mixed precision training: majority of the network uses FP16 arithmetic, while automatically casting potentially unstable operations to FP32.

Advantages:
* reducing memory storage/bandwidth demands
* use larger batch sizes
* take advantage of NVIDIA Tensor Cores for gemms and convolutions

In [25]:
from apex.parallel import DistributedDataParallel as DDP
from apex import amp

In [None]:
def train(gpu, world_size, nr, gpus):

    rank = nr * gpus + gpu  # global rank of the process                       
    dist.init_process_group(backend='nccl', 
                            init_method='env://',                                   
                            world_size=world_size,                              
                            rank=rank)                                                          
    
    torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu)
    model.cuda(gpu)
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    
    ###############################################################
    # Wrap the model
    model, optimizer = amp.initialize(model, optimizer, 
                                      opt_level='O2')
    model = DDP(model)
    ###############################################################

    # Data loading code
    train_dataset = torchvision.datasets.MNIST(
        root='./data',
        train=True,
        transform=transforms.ToTensor(),
        download=True
    )                                               

    # makes sure that each process gets a different slice of the training data
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                    num_replicas=world_size,
                                                                    rank=rank)
    # Use the nn.utils.data.DistributedSampler instead of shuffling the usual way
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=False,
                                               num_workers=0,
                                               pin_memory=True,
                                               sampler=train_sampler)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            
            ##############################################################
            with amp.scale_loss(loss, optimizer) as scaled_loss:
                scaled_loss.backward()
            ##############################################################
        
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1, 
                    epochs, 
                    i + 1, 
                    total_step,
                    loss.item())
                   )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))

## Horovod

https://docs.databricks.com/applications/deep-learning/distributed-training/mnist-pytorch.html

https://horovod.readthedocs.io/en/latest/pytorch.html

https://github.com/horovod/horovod/blob/master/docs/pytorch.rst