# Stochastic Training of GNN with Multiple GPUs

In this tutorial you will learn how to train a multi-layer GraphSAGE for node classification on Amazon Copurchase Network provided by OGB with multiple GPUs.  The dataset contains 2.4 million nodes and 61 million edges, hence not able to fit in a single GPU.

The contents in this tutorial include how to

* Train a GNN model with a single machine with multiple GPUs on a graph of any size with `torch.nn.parallel.DistributedDataParallel`.

PyTorch `DistributedDataParallel` (or DDP in short) is a common solution for multi-GPU training.  It is easy to combine DGL with PyTorch DDP, as you do the same thing as that in any ordinary PyTorch applications:

* Divide the data to each GPU.
* Distribute the model parameters using PyTorch DDP.
* Customize your neighborhood sampling strategy.

In [None]:
import numpy as np
import dgl
import torch
import dgl.nn as dglnn
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel
import torch.nn.functional as F
import torch.multiprocessing as mp
import sklearn.metrics
import tqdm

import utils

## Load Dataset

The following code is copied from the first tutorial.

In [None]:
def load_data():
    import pickle

    with open('data.pkl', 'rb') as f:
        data = pickle.load(f)
    graph, node_features, node_labels, train_nids, valid_nids, test_nids = data
    utils.prepare_mp(graph)
    
    num_features = node_features.shape[1]
    num_classes = (node_labels.max() + 1).item()
    
    return graph, node_features, node_labels, train_nids, valid_nids, test_nids, num_features, num_classes

## Customize Neighborhood Sampling

Previously we have seen how to use `NodeDataLoader` together with `MultiLayerNeighborSampler`.  In fact, you can replace `MultiLayerNeighborSampler` with your own sampling strategy.

The customization is simple.  For each GNN layer, you only need to specify the edges involved in the message passing as a graph.  For example, here is how `MultiLayerNeighborSampler` is implemented:

In [None]:
class MultiLayerNeighborSampler(dgl.dataloading.BlockSampler):
    def __init__(self, fanouts):
        super().__init__(len(fanouts), return_eids=False)
        self.fanouts = fanouts
        
    def sample_frontier(self, block_id, g, seed_nodes):
        fanout = self.fanouts[block_id]
        return dgl.sampling.sample_neighbors(g, seed_nodes, fanout)

## Defining Data Loader for Distributed Data Parallel (DDP)

In PyTorch DDP each worker process is assigned an integer *rank*.  The rank would indicate which partition of the dataset the worker process will handle.  So the only difference between single GPU and multiple GPU training in terms of data loader is that the data loader will only iterate over a partition of the nodes.

In [None]:
def create_dataloader(rank, world_size, graph, nids):
    partition_size = len(nids) // world_size
    partition_offset = partition_size * rank
    nids = nids[partition_offset:partition_offset+partition_size]
    
    sampler = MultiLayerNeighborSampler([4, 4, 4])
    dataloader = dgl.dataloading.NodeDataLoader(
        graph, nids, sampler,
        batch_size=1024,
        shuffle=True,
        drop_last=False,
        num_workers=0
    )
    
    return dataloader

## Defining Model

The model implementation will be exactly the same as what you have seen in the first tutorial.

In [None]:
class SAGE(nn.Module):
    def __init__(self, in_feats, n_hidden, n_classes, n_layers):
        super().__init__()
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        self.layers = nn.ModuleList()
        self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
        for i in range(1, n_layers - 1):
            self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
        self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
        
    def forward(self, blocks, x):
        for l, (layer, block) in enumerate(zip(self.layers, blocks)):
            x = layer(block, x)
            if l != self.n_layers - 1:
                x = F.relu(x)
        return x

## Distributing the Model to GPUs

PyTorch DDP manages the distribution of models and synchronization of the gradients for you.  In DGL, you can benefit from PyTorch DDP as well by simply wrapping the model with `torch.nn.parallel.DistributedDataParallel`.

The recommended way to distribute training is to have one training process per GPU, so during model instantiation we also specify the process rank, which is equal to the GPU ID.

In [None]:
def init_model(rank, in_feats, n_hidden, n_classes, n_layers):
    model = SAGE(in_feats, n_hidden, n_classes, n_layers).to(rank)
    return DistributedDataParallel(model, device_ids=[rank], output_device=rank)

## The Training Loop for one Process

The training loop looks the same as other PyTorch DDP applications.

In [None]:
@utils.fix_openmp
def train(rank, world_size, data):
    # data is the output of load_data
    torch.distributed.init_process_group(
        backend='nccl',
        init_method='tcp://127.0.0.1:12345',
        world_size=world_size,
        rank=rank)
    torch.cuda.set_device(rank)
    
    graph, node_features, node_labels, train_nids, valid_nids, test_nids, num_features, num_classes = data
    
    train_dataloader = create_dataloader(rank, world_size, graph, train_nids)
    # We only use one worker for validation
    valid_dataloader = create_dataloader(0, 1, graph, valid_nids)
    
    model = init_model(rank, num_features, 128, num_classes, 3)
    opt = torch.optim.Adam(model.parameters())
    torch.distributed.barrier()
    
    best_accuracy = 0
    best_model_path = 'model.pt'
    for epoch in range(10):
        model.train()

        for step, (input_nodes, output_nodes, blocks) in enumerate(train_dataloader):
            blocks = [b.to(rank) for b in blocks]
            inputs = node_features[input_nodes].cuda()
            labels = node_labels[output_nodes].cuda()
            predictions = model(blocks, inputs)

            loss = F.cross_entropy(predictions, labels)
            opt.zero_grad()
            loss.backward()
            opt.step()

            accuracy = sklearn.metrics.accuracy_score(labels.cpu().numpy(), predictions.argmax(1).detach().cpu().numpy())

            if rank == 0 and step % 10 == 0:
                print('Epoch {:05d} Step {:05d} Loss {:.04f}'.format(epoch, step, loss.item()))

        torch.distributed.barrier()
        
        if rank == 0:
            model.eval()
            predictions = []
            labels = []
            with torch.no_grad():
                for input_nodes, output_nodes, blocks in valid_dataloader:
                    blocks = [b.to(rank) for b in blocks]
                    inputs = node_features[input_nodes].cuda()
                    labels.append(node_labels[output_nodes].numpy())
                    predictions.append(model.module(blocks, inputs).argmax(1).cpu().numpy())
                predictions = np.concatenate(predictions)
                labels = np.concatenate(labels)
                accuracy = sklearn.metrics.accuracy_score(labels, predictions)
                print('Epoch {} Validation Accuracy {}'.format(epoch, accuracy))
                if best_accuracy < accuracy:
                    best_accuracy = accuracy
                    torch.save(model.module.state_dict(), best_model_path)
                    
        torch.distributed.barrier()

In [None]:
if __name__ == '__main__':
    procs = []
    data = load_data()
    for proc_id in range(4):    # 4 gpus
        p = mp.Process(target=train, args=(proc_id, 4, data))
        p.start()
        procs.append(p)
    for p in procs:
        p.join()

## Conclusion

In this tutorial, you have learned how to train a multi-layer GraphSAGE for node classification on a large dataset that cannot fit into GPU.  The method you have learned can scale to a graph of any size, and works on a single machine with *any number of* GPU.