# Writing Distributed Applications with PyTorch

이 짧은 튜토리얼에서 우리는 pytorch의 분산 패키지를 이용해 볼 것이다. 분산처리에 대한 준비와 색다른 통신 방법을 사용해보고, 패키지의 내부까지 다뤄보도록 하자.

## Setup

``torch.distributed``같은 pytorch의 분산 패키지는 사용자가 쉽게 작업을 여러 프로세스들이나 기계들에게 나누어 병렬처리할 수 있도록 돕는다. 더 깊게 들어가면, 이 패키지는 프로세스가 다른 프로세스와 데이터를 주고받을 수 있도록 메세지 전달에 대한 특정 문법을 사용한다. ``torch.multiprocessing`` 패키지와는 반대로, 프로세스들은 여러가지의 통신 방법을 사용할 수 있고 같은 컴퓨터상에서 돌아가야 한다는 제약 또한 없다.

분산 패키지를 사용하기 위해서는 먼저 많은 수의 프로세스들을 동시에 실행하는 능력이 있어야 한다. 만약 많은 수의 프로세스들을 돌리고 싶다면 반드시 로컬의 시스템 관리자를 확인해보거나 가장 친숙한 병렬처리 도구를 사용해야 한다. 이 튜토리얼의 목적을 위해서, 아래의 코드를 이용해 한 개의 컴퓨터를 가지고 다수의 프로세스들을 만들어 실행해 볼 것이다.

In [36]:
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

def run(rank, size):
    pass

def init_processes(rank, size, fn, backend='tcp'):
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    
if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)
        
    for p in processes:
        p.join()

위의 코드는 각각 분산처리 환경을 만들어주는 두 개의 프로세스를 만들고 이 둘을 이용해서 프로세스 그룹을 초기화 한다. (``torch.init_process_group``) 마지막으로 ``run`` 함수를 실행한다.

``init_processes`` 함수를 보면, 같은 ip 주소와 포트를 이용해도 모든 프로세스가 master(중심이 되는 프로세스)를 통해 병렬처리가 잘 된다. 그리고 코드에서는 TCP를 사용하는데, MPI나 Gloo를 사용할 수도 있다.

``dist.init_process_group``가 만들어내는 마법같은 일은 마지막에 다룰 것이지만, 이 함수는 프로세스들이 서로 자신들의 공간을 공유하면서 통신할 수 있도록 도와준다는 사실을 기억하자.

## Point-to-Point Communication

![%E1%84%89%E1%85%B3%E1%84%8F%E1%85%B3%E1%84%85%E1%85%B5%E1%86%AB%E1%84%89%E1%85%A3%E1%86%BA%202018-03-22%20%E1%84%8B%E1%85%A9%E1%84%92%E1%85%AE%2011.21.07.png](attachment:%E1%84%89%E1%85%B3%E1%84%8F%E1%85%B3%E1%84%85%E1%85%B5%E1%86%AB%E1%84%89%E1%85%A3%E1%86%BA%202018-03-22%20%E1%84%8B%E1%85%A9%E1%84%92%E1%85%AE%2011.21.07.png)

한 프로세스에서 다른 프로세스로 데이터를 보내는 것을 point-to-point communication이라고 한다. ``send``와 ``recv`` 함수를 이용하거나 ``isend``와 ``irecv``를 이용하면 해볼 수 있다.

In [37]:
# 여기서 rank는 프로세스의 번호 같은게 아닌가 생각한다.

import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        dist.send(tensor=tensor, dst=1)
    else:
        dist.recv(tensor=tensor, src=0)
    print('Rank', rank, 'has data', tensor[0])
    

def init_processes(rank, size, fn, backend='tcp'):
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    
if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)
        
    for p in processes:
        p.join()

Rank 0 has data 1.0
Rank 1 has data 1.0


위의 예제에서, 두 프로세스들은 zero tensor를 가지고 시작되어 process 0은 tensor를 1 증가시키고 process 1에게 보냈고, process 1은 process 0에서 건낸 tensor를 받았다. 결과적으로 두 프로세스들은 값이 1인 tensor를 가지고 종료되었다. 이 때 process 1은 받을 데이터를 위한 메모리를 할당해야 한다는 사실을 기억하자.

그리고 ``send``/``recv``는 **blocking**이다. 즉 두 프로세스들은 통신이 완전히 이루어질 때까지 멈춘 상태로 있는다. 그와 반대로 ``isend``/``irecv``는 **non-blocking**이다. 아래의 코드는 이 함수들을 사용한다. 여기서 이 함수들은 ``DistributedRequest`` 객체를 반환하며, 이 객체는 ``wait`` 메서드를 가진다.

In [38]:
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        req = dist.isend(tensor=tensor, dst=1)
        print("Rank 0 started sending")
    else:
        req = dist.irecv(tensor=tensor, src=0)
        print("Rank 1 started receiving")
    req.wait()
    print("Rank", rank, 'has data', tensor[0])

def init_processes(rank, size, fn, backend='tcp'):
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    
if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)
        
    for p in processes:
        p.join()

Rank 1 started receiving
Rank 0 started sending
Rank 0 has data 1.0
Rank 1 has data 1.0


``isend``/``irecv``를 사용할 때, 보낸 tensor와 받은 tensor를 사용하는 것에 주의해야 한다. 이 함수를 사용할 경우, 언제 다른 프로세스와 통신이 끝나는 지 모르기 때문에 ``req.wait()``이 끝날 때까지는 보낸 tensor와 받은 tensor를 건드리지 않아야 한다. 쉽게 말하면

- ``dist.isend()`` 이후에 보낸 tensor를 수정하는 것은 안전하지 못하다.
- ``dist.irecv()`` 이후에 받은 tensor를 읽는 것은 안전하지 못하다.

하지만 ``req.wait()`` 이후에는 프로세스간의 통신이 끝났기 때문에 tensor의 수정이 가능하다.

## Collective Communication

![%E1%84%89%E1%85%B3%E1%84%8F%E1%85%B3%E1%84%85%E1%85%B5%E1%86%AB%E1%84%89%E1%85%A3%E1%86%BA%202018-03-22%20%E1%84%8B%E1%85%A9%E1%84%92%E1%85%AE%2011.19.22.png](attachment:%E1%84%89%E1%85%B3%E1%84%8F%E1%85%B3%E1%84%85%E1%85%B5%E1%86%AB%E1%84%89%E1%85%A3%E1%86%BA%202018-03-22%20%E1%84%8B%E1%85%A9%E1%84%92%E1%85%AE%2011.19.22.png)

point-to-point communication과는 반대로, collective communication은 **group**안의 모든 프로세스들을 위한 통신 패턴을 준비해 놓았다. 여기서 group는 존재하는 모든 프로세스들의 일부를 말한다. group를 만들기 위해서는 우리는 ``dist.new_group()`` 메서드에 rank들의 리스트를 인자로 넘겨줘야한다. 기본적으로 collective communication은 **world**라고 불리는 존재하는 모든 프로세스들에게 적용된다. 아래 코드는 모든 프로세스들을 이용해서 모든 tensor들의 합을 구하는 코드이다. 우리는 여기서 ``dist.all_reduce(tensor, op, group)``를 사용한다.

In [39]:
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

def run(rank, size):
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.reduce_op.SUM, group=group)
    print("Rank", rank, 'has data', tensor[0])

def init_processes(rank, size, fn, backend='tcp'):
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    
if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)
        
    for p in processes:
        p.join()

Rank 0 has data 2.0
Rank 1 has data 2.0


이 코드에서, group안에서 존재하는 모든 tensor들을 더하고 싶었기 때문에 ``dist.reduce_op.SUM``을 reduce operator로 사용했다. 이 연산자 이외에도

- ``dist.reduce_op.PRODUCT``
- ``dist.reduce_op.MAX``
- ``dist.reduce_op.MIN``

이 있고, 이들은 모두 tensor의 element 수준에서 돌아간다.

``dist.all_reduce(tensor, op, group)``뿐만 아니라 총 6개의 collective communication pattern이 pytorch에 구현이 되어있다.

- ``dist.broadcast(tensor, op, group)``는 src의 tensor를 src를 제외한 모든 프로세스에 복사한다.
- ``dist.reduce(tensor, dst, op, group)``는 op를 모든 tensor에 적용한 뒤, dst에 그 결과를 저장한다. 
- ``dist.all_reduce(tensor, op, group)``는 reduce랑 똑같지만 group안에 존재하는 모든 프로세스에 결과를 저장한다.
- ``dist.scatter(tensor, src, scatter_list, group)``는 `scatter_list[i]`를 복사해서 i번째 프로세스에 저장한다.
- `dist.gather(tensor, dst, gather_list, group)`는 dst안에 존재하는 모든 프로세스의 tensor를 복사해 `gather_list`로 가져온다.
- `dist.all_gather(tensor_list, tensor, group)`는 group안에 존재하는 모든 프로세스의 tensor를 복사해서 tensor_list로 가져와 다시 tensor_list를 모든 프로세스에 복사해서 보내준다.

## Distributed Training

분산처리되는 모델이 어떻게 작동하는지 알아보았다. 이제부터는 이를 이용해서 유용한 걸 만들어보자. 최종적으로 `torch.nn.parallel.DistributedDataParallel`을 만들어 볼텐데, 이는 단순히 예제니까 회사에 가서 쓰지말고 이미 만들어진 더 좋은 걸 써라.

SGD의 분산처리 버전을 만들어 볼텐데, 모든 프로세스 하나 하나가 복사된 모델을 가질 것이고, 각 프로세스마다 데이터셋의 일부를 받아 모델을 훈련시킬 것이다. 이후에 각 모델마다 만들어진 gradient들을 모아 평균을 낼 것이다. 프로세스의 수가 바뀌어도 비슷한 결과를 만들기 위해 먼저 하나의 데이터 셋을 여러 개로 나눌 것이다. (아래의 코드가 아닌 `tnt.dataset.SplitDataset`을 써도 된다.)

In [40]:
from random import Random

class Partition(object):
    
    def __init__(self, data, index):
        self.data = data
        self.index = index
        
    def __len__(self):
        return len(self.index)
    
    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]
        
class DataPartitioner(object):
    
    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)
        
        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]
            
    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

위의 코드를 이용해서 어느 데이터셋이든지 나눌 수 있게 된다.

In [41]:
from torchvision import datasets, transforms

def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 / float(size)
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                            batch_size=bsz,
                                            shuffle=True)
    return train_set, bsz

2개의 복제된 모델을 가지고 있다고 가정하면, 각 프로세스는 60000 / 2 = 30000개의 샘플들을 가진 `train_set`을 가진다. 또한 전체 데이터셋의 크기를 복제된 모델의 수로 나누기 때문에 항상 전체 데이터셋의 크기는 128이 된다.

아래의 forward-backward-optimize traning는 SGD의 분산처리 버전이다. 이 때 각 프로세스에서 만들어진 gradient들을 평균내는 함수를 사용한다.

In [42]:
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

In [None]:
import os
import torch
import torch.distributed as dist
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from math import ceil

from torch.multiprocessing import Process

# 안 돌아감... ㅠㅠ
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
        param.grad.data /= size

def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            data, target = Variable(data), Variable(target)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.data[0]
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print("Rank", dist.get_rank(), ', epoch', epoch, ":", epoch_loss / num_batches)

def init_processes(rank, size, fn, backend='tcp'):
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    
if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)
        
    for p in processes:
        p.join()