# 다중 GPU를 사용한 GNN의 확률적(Storchastic) 학습 


이번 튜토리얼에서는 Multi GPU 환경에서 노드 분류를 위한 다중 레이어 GraphSAGE 모델을 학습하는 방법을 배워보겠습니다.  
사용할 데이터셋은 OGB에서 제공하는 Amazon Copurchase Network으로, 240만 노드와 6100만 엣지를 포함하고 있으므로, 단일 GPU에는 올라가지 않습니다.  


이 튜토리얼은 다음 내용을 포함하고 있습니다.  

* `torch.nn.parallel.DistributedDataParallel` 메서드를 사용해 그래프 크기에 상관없이 GNN 모델을 단일 머신, 다중 GPU으로 학습하기.

PyTorch `DistributedDataParallel` (혹은 짧게 말해 DDP)는 multi-GPU 학습의 일반적인 해결책입니다.  
DGL과 PyTorch DDP를 결합하는 것은 매우 쉬운데, 평범한 PyTorch 어플리케이션에서 적용하는 방법과 같이 하면 됩니다.

* 데이터를 각 GPU에 대해 분할하기
* PyTorch DDP를 사용해 모델 파라미터를 분배합니다
* 이웃 샘플링 전략을 각자의 방법으로 커스터마이징합니다.

In [1]:
import numpy as np
import dgl
import torch
import dgl.nn as dglnn
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel
import torch.nn.functional as F
import torch.multiprocessing as mp
import sklearn.metrics
import tqdm

import utils

Using backend: pytorch


## 데이터셋 로드하기

아래 코드는 첫번째 튜토리얼에서 복사되었습니다.

In [2]:
def load_data():
    import pickle

    with open('data.pkl', 'rb') as f:
        data = pickle.load(f)
    graph, node_features, node_labels, train_nids, valid_nids, test_nids = data
    utils.prepare_mp(graph)
    
    num_features = node_features.shape[1]
    num_classes = (node_labels.max() + 1).item()
    
    return graph, node_features, node_labels, train_nids, valid_nids, test_nids, num_features, num_classes

## 이웃 샘플링 커스터마이징하기

이전 튜토리얼에서, `NodeDataLoader`와 `MultiLayerNeighborSampler`를 사용하는 방법을 배워 보았습니다.  
사실, `MultiLayerNeighborSampler`를 우리 마음대로 정한 샘플링 전략으로 대체할 수 있습니다.  

커스터마이징은 간단합니다.  
각 GNN 레이어에 대해, message passing에서 포함되는 엣지를 그래프로 지정해주면 됩니다.  
이 그래프는 기존 그래프와 같은 노드를 갖게 됩니다.  

예를 들어, `MultiLayerNeighborSampler`는 아래와 같이 구현됩니다.

In [3]:
class MultiLayerNeighborSampler(dgl.dataloading.BlockSampler):
    def __init__(self, fanouts):
        super().__init__(len(fanouts), return_eids=False)
        self.fanouts = fanouts
        
    def sample_frontier(self, layer_id, g, seed_nodes):
        fanout = self.fanouts[layer_id]
        return dgl.sampling.sample_neighbors(g, seed_nodes, fanout)

## Distributed Data Parallel (DDP)를 위한 데이터 로더 정의하기

PyTorch DDP에서, 각 worker process는 정수값인 *rank*로 할당됩니다.  
이 rank는 worker process가 데이터셋의 어떤 파티션을 처리할지를 나타냅니다.  

따라서 데이터 로더 관점에서의 단일 GPU 경우와 다중 GPU 학습 간 유일한 차이점은,  
데이터 로더가 노드의 일부 파티션에 대해서만 iterate한다는 점입니다.


In [4]:
def create_dataloader(rank, world_size, graph, nids):
    partition_size = len(nids) // world_size
    partition_offset = partition_size * rank
    nids = nids[partition_offset:partition_offset+partition_size]
    
    sampler = MultiLayerNeighborSampler([4, 4, 4])
    dataloader = dgl.dataloading.NodeDataLoader(
        graph, nids, sampler,
        batch_size=1024,
        shuffle=True,
        drop_last=False,
        num_workers=0
    )
    
    return dataloader

## 모델 정의하기

모델 구현은 첫번째 튜토리얼에서 본 것과 정확히 동일합니다.

In [5]:
class SAGE(nn.Module):
    def __init__(self, in_feats, n_hidden, n_classes, n_layers):
        super().__init__()
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        self.layers = nn.ModuleList()
        self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
        for i in range(1, n_layers - 1):
            self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
        self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
        
    def forward(self, bipartites, x):
        for l, (layer, bipartite) in enumerate(zip(self.layers, bipartites)):
            x = layer(bipartite, x)
            if l != self.n_layers - 1:
                x = F.relu(x)
        return x

## 모델을 여러 GPU에 분배하기

PyTorch DDP는 모델의 분산과 가중치의 synchronization을 관리해 줍니다.  
DGL에서는, 모델을 단순히 `torch.nn.parallel.DistributedDataParallel`으로 감싸 줌으로써 이 PyTorch DDP의 이점을 그대로 누릴 수 있습니다.

분산 학습에서 추천되는 방식은 한 GPU에 학습 process를 하나만 가져가는 것입니다.  
이로써, 모델 instantiation 중에 process rank를 지정해줄 수도 있게 되는데, 이 rank가 GPU ID와 동일해지게 됩니다.

In [6]:
def init_model(rank, in_feats, n_hidden, n_classes, n_layers):
    model = SAGE(in_feats, n_hidden, n_classes, n_layers).to(rank)
    return DistributedDataParallel(model, device_ids=[rank], output_device=rank)

## 1개 process를 위한 학습 루프

학습 루프는 다른 PyTorch DDP 어플리케이션과 똑같이 생겼습니다.

In [7]:
@utils.fix_openmp
def train(rank, world_size, data):
    # data is the output of load_data
    torch.distributed.init_process_group(
        backend='nccl',
        init_method='tcp://127.0.0.1:12345',
        world_size=world_size,
        rank=rank)
    torch.cuda.set_device(rank)
    
    graph, node_features, node_labels, train_nids, valid_nids, test_nids, num_features, num_classes = data
    
    train_dataloader = create_dataloader(rank, world_size, graph, train_nids)
    # We only use one worker for validation
    valid_dataloader = create_dataloader(0, 1, graph, valid_nids)
    
    model = init_model(rank, num_features, 128, num_classes, 3)
    opt = torch.optim.Adam(model.parameters())
    torch.distributed.barrier()
    
    best_accuracy = 0
    best_model_path = 'model.pt'
    for epoch in range(10):
        model.train()

        for step, (input_nodes, output_nodes, bipartites) in enumerate(train_dataloader):
            bipartites = [b.to(rank) for b in bipartites]
            inputs = node_features[input_nodes].cuda()
            labels = node_labels[output_nodes].cuda()
            predictions = model(bipartites, inputs)

            loss = F.cross_entropy(predictions, labels)
            opt.zero_grad()
            loss.backward()
            opt.step()

            accuracy = sklearn.metrics.accuracy_score(labels.cpu().numpy(), predictions.argmax(1).detach().cpu().numpy())

            if rank == 0 and step % 10 == 0:
                print('Epoch {:05d} Step {:05d} Loss {:.04f}'.format(epoch, step, loss.item()))

        torch.distributed.barrier()
        
        if rank == 0:
            model.eval()
            predictions = []
            labels = []
            with torch.no_grad():
                for input_nodes, output_nodes, bipartites in valid_dataloader:
                    bipartites = [b.to(rank) for b in bipartites]
                    inputs = node_features[input_nodes].cuda()
                    labels.append(node_labels[output_nodes].numpy())
                    predictions.append(model.module(bipartites, inputs).argmax(1).cpu().numpy())
                predictions = np.concatenate(predictions)
                labels = np.concatenate(labels)
                accuracy = sklearn.metrics.accuracy_score(labels, predictions)
                print('Epoch {} Validation Accuracy {}'.format(epoch, accuracy))
                if best_accuracy < accuracy:
                    best_accuracy = accuracy
                    torch.save(model.module.state_dict(), best_model_path)
                    
        torch.distributed.barrier()

In [8]:
if __name__ == '__main__':
    procs = []
    data = load_data()
    for proc_id in range(4):    # 4 gpus
        p = mp.Process(target=train, args=(proc_id, 4, data))
        p.start()
        procs.append(p)
    for p in procs:
        p.join()

Epoch 00000 Step 00000 Loss 5.7553
Epoch 00000 Step 00010 Loss 2.6858
Epoch 00000 Step 00020 Loss 2.1455
Epoch 00000 Step 00030 Loss 1.7148
Epoch 00000 Step 00040 Loss 1.6470
Epoch 0 Validation Accuracy 0.7247158151717824
Epoch 00001 Step 00000 Loss 1.3390
Epoch 00001 Step 00010 Loss 1.3108
Epoch 00001 Step 00020 Loss 1.3176
Epoch 00001 Step 00030 Loss 1.4312
Epoch 00001 Step 00040 Loss 1.1797
Epoch 1 Validation Accuracy 0.7972687739999491
Epoch 00002 Step 00000 Loss 1.0574
Epoch 00002 Step 00010 Loss 1.1461
Epoch 00002 Step 00020 Loss 1.0746
Epoch 00002 Step 00030 Loss 1.0027
Epoch 00002 Step 00040 Loss 0.9308
Epoch 2 Validation Accuracy 0.8152480736464665
Epoch 00003 Step 00000 Loss 0.9768
Epoch 00003 Step 00010 Loss 1.0767
Epoch 00003 Step 00020 Loss 0.9237
Epoch 00003 Step 00030 Loss 1.0979
Epoch 00003 Step 00040 Loss 0.8528
Epoch 3 Validation Accuracy 0.83111664928922
Epoch 00004 Step 00000 Loss 0.9134
Epoch 00004 Step 00010 Loss 0.9284
Epoch 00004 Step 00020 Loss 0.8158
Epoch 000

## 결론

이 튜토리얼에서, GPU에 올라가지 않는 대규모 데이터에서 노드 분류를 위한 다중 레이어 GraphSAGE 모델을 학습하는 방법을 배웠습니다.  
여기서 배운 이 방법은 어떤 사이즈의 그래프에서든 확장될 수 있으며,  
단일 머신의 *몇 개의 GPU 에서든* 작동합니다.

## 추가 자료: DDP로 학습할 때의 주의점

DDP 코드를 작성할 때, 이 두가지 에러를 겪을 수 있습니다.  

* `Cannot re-initialize CUDA in forked subprocess`  

    이는 `mp.Process`를 사용해 subprocess를 만들기 전에 CUDA context를 초기화 해서 발생합니다.
    해결책은 다음과 같습니다.  
    
    * `mp.Process`를 호출하기 전에, CUDA context를 초기화할 수 있는 모든 가능한 코드를 제거합니다.  
    예를 들어, `mp.Process`를 호출하기 전에 GPU의 갯수를 `torch.cuda.device_count()`로 확인할 수 없습니다.  
    왜냐하면, 갯수를 확인하는 `torch.cuda.device_count()`는 CUDA context를 초기화하기 때문입니다.  
    
    CUDA context가 초기화 되었는지의 여부를 `torch.cuda.is_initialized()`로 확인해볼 수 있습니다.
    
    * `mp.Process`로 forking하지 마시고, `torch.multiprocessing.spawn()`를 사용해 process를 생성하세요.  
    (전자 방식의) 불리점은, 파이썬이 이 방법으로 생성된 모든 process에 대해 그래프 storage를 복제한다는 점입니다.  
    메모리 소비량이 선형적으로 증가하게 되지요.
    
* 학습 프로세스가 미니배치 iteration중에 멈춤
    이 원인은 다음과 같습니다. [lasting bug in the interaction between GNU OpenMP and `fork`](https://github.com/pytorch/pytorch/issues/17199)  
    다른 해결책은, `mp.Process`의 목표 함수를 데코레이터 `utils.fix_openmp`를 사용해 감싸는 것입니다.  
    이 방식은 이 튜토리얼에서 구현되어 있습니다.