## Section 8.8 of GNNs in Action
## Parallel & Distributed Processing

This notebook demonstrates the use of the datadistributedparallel class in training a GNN model on a machine with two GPUs. 

NOTE: You must activate the  GPU T4x2 accelerator to use this code.

In [None]:
!pip uninstall torch -y

In [None]:
!pip install torch

In [None]:
print('test')

In [None]:
# Find the CUDA version PyTorch was installed with
!python -c "import torch; print(torch.version.cuda)"

In [None]:
# PyTorch version
!python -c "import torch; print(torch.__version__)"

In [None]:
#########
!pip install torch_geometric

# Optional dependencies:
!pip install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.0.1+cu117.html

In [None]:
####
!pip install ogb
!pip install gputil
!pip install nvidia-ml-py3
!pip install thop

In [None]:
####
# import ogb
from ogb.nodeproppred import PygNodePropPredDataset
dataset = PygNodePropPredDataset(name='ogbn-products')
# dataset = []

## Encapsulate model and training into a file
We're writing the GCN model and training function into a separate Python file 'my_module.py'. This separation is essential for multiprocessing, particularly when using the multiprocessing library in PyTorch. By placing the model and training function in a separate file, we ensure that each process can import and access these components cleanly, preventing potential issues related to variable scope, function definitions, and Python's "__main__" guard during multiprocessing.

As for the script itself, this Python script is designed for distributed training of a Graph Convolutional Network (GCN) on a graph-based dataset using PyTorch and PyTorch Geometric. The code is divided into several parts, each serving a specific purpose. First, necessary libraries and modules are imported, including those required for distributed computing and memory usage tracking. The get_memory_usage function uses the psutil library to monitor the memory usage of the running process. The GCN class defines the architecture of the Graph Convolutional Network, including two GCN layers and a final fully connected layer for classification.

The train function is where the model is trained for one epoch. It calculates the loss, performs backpropagation, and updates the model’s weights. During each epoch, it monitors and prints the memory usage and epoch time. The main function sets up distributed training, where each process is assigned to a separate GPU. It initializes the model, data, and other training necessities and runs the training loop for a specified number of epochs. It calculates and prints the average epoch time, memory usage, and total convergence time for the training process.


In [None]:
%%writefile my_module.py

# Importing necessary libraries
import torch
from torch.nn import Linear
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data, DataLoader
from torch_geometric.loader import NeighborLoader
from ogb.nodeproppred import PygNodePropPredDataset
import time
import os
import psutil  # Library for retrieving system-level information, for our case, CPU memory usage
import GPUtil
import pynvml
import numpy as np  
import logging  
from thop import profile  

# Suppress INFO messages from thop
logger = logging.getLogger('thop')
logger.setLevel(logging.ERROR)
logging.getLogger('thop').setLevel(logging.WARNING)  

# Initialize NVML library
pynvml.nvmlInit()

# Function to get the current process's memory usage
def get_cpu_memory_usage():
    """
    Returns the current memory usage of the cpu process.

    :return: Memory usage in bytes
    """
    process = psutil.Process(os.getpid())
    return process.memory_info().rss


def get_gpu_memory_usage():
    """
    Returns the current GPU memory usage using pynvml.

    :return: GPU Memory usage in bytes
    """
    handle = pynvml.nvmlDeviceGetHandleByIndex(0)  # 0 for the first GPU
    info = pynvml.nvmlDeviceGetMemoryInfo(handle)
    return info.used  # This will return the used GPU memory in bytes



# GCN (Graph Convolutional Network) model definition
class GCN(torch.nn.Module):
    def __init__(self):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(100, 128)  # First GCN layer
        self.conv2 = GCNConv(128, 128)  # Second GCN layer
        self.fc = Linear(128, 47)  # Final fully connected layer, 47 classes for ogbn-products dataset

    def forward(self, x, edge_index):
        """
        Forward pass through the network.

        :param x: Input features
        :param edge_index: Edge indices defining the graph structure
        :return: Output after passing through network
        """
        x = self.conv1(x, edge_index)
        x = self.conv2(x, edge_index)
        x = self.fc(x)
        return x

# Function for training the model
def train(model, trainloader, criterion, optimizer, device, epoch):
    model.train()
    total_loss = 0.0
    num_batches = 0

    start_time = time.time()

    memory_tracking = {
        key: [] for key in [
            'optimizer.zero_grad', 'model forward pass', 'target processing',
            'loss calculation', 'loss.backward', 'optimizer.step'
        ]
    }

    max_memory = 0  # Initialize the max_memory here
    max_memory_step = ""
    
    total_flops = 0  # Initialize total FLOPs here

    for batch in trainloader:
        optimizer.zero_grad()
        memory = get_gpu_memory_usage()
        memory_tracking['optimizer.zero_grad'].append(memory)
        if memory > max_memory:  # Update max_memory if the current memory is greater
            max_memory = memory
            max_memory_step = 'optimizer.zero_grad'

        out = model(batch.x.float(), batch.edge_index)
        memory = get_gpu_memory_usage()
        memory_tracking['model forward pass'].append(memory)
        if memory > max_memory:  # Update max_memory if the current memory is greater
            max_memory = memory
            max_memory_step = 'model forward pass'

        target = batch.y.view(-1)
        memory = get_gpu_memory_usage()
        memory_tracking['target processing'].append(memory)
        if memory > max_memory:  # Update max_memory if the current memory is greater
            max_memory = memory
            max_memory_step = 'target processing'

        loss = criterion(out, target)
        memory = get_gpu_memory_usage()
        memory_tracking['loss calculation'].append(memory)
        if memory > max_memory:  # Update max_memory if the current memory is greater
            max_memory = memory
            max_memory_step = 'loss calculation'

        loss.backward()
        memory = get_gpu_memory_usage()
        memory_tracking['loss.backward'].append(memory)
        if memory > max_memory:  # Update max_memory if the current memory is greater
            max_memory = memory
            max_memory_step = 'loss.backward'

        optimizer.step()
        memory = get_gpu_memory_usage()
        memory_tracking['optimizer.step'].append(memory)
        if memory > max_memory:  # Update max_memory if the current memory is greater
            max_memory = memory
            max_memory_step = 'optimizer.step'

        # Calculating FLOPs
        with torch.no_grad():
            macs, params = profile(model.module, inputs=(batch.x.float().to(device), batch.edge_index.to(device)), verbose=False)
            total_flops += macs * 2  # Convert MACs to FLOPs
        
        total_loss += loss.item()
        num_batches += 1

    avg_loss = total_loss / num_batches if num_batches > 0 else 0

    # Calculate and print memory stats
    avg_memories = {k: np.mean(v) if v else 0 for k, v in memory_tracking.items()}
    overall_avg_memory = np.mean([mem for mem in avg_memories.values() if mem > 0])

    end_time = time.time()
    epoch_time = end_time - start_time

    print(f"Device: {device}\n"
      f"Epoch: {epoch + 1}\n"
      f"Avg Loss: {avg_loss:.4f}\n"
      f"Avg Memory Usage: {overall_avg_memory/(1024 * 1024):.2f} MB\n"
      f"Max Memory Usage: {max_memory/(1024 * 1024):.2f} MB at {max_memory_step}\n"
      f"FLOPs: {(total_flops/num_batches if num_batches > 0 else 0):.2f}\n"
      f"Epoch Time: {epoch_time:.2f} s\n"
      f"{'='*40}")

    return epoch_time, overall_avg_memory, max_memory


# Main function for distributed training
def main(rank, world_size, dataset):
    """
    Main training loop for distributed training.

    :param rank: Rank of the current process
    :param world_size: Total number of processes
    :param dataset: Dataset for training
    """
    convergence_start_time = time.time()  # Time at the start of training
    
    split_idx = dataset.get_idx_split()
    train_idx = split_idx["train"]
    
    # Initializing distributed process group
    dist.init_process_group(
        backend='nccl',
        init_method='tcp://localhost:23456',
        rank=rank,
        world_size=world_size
    )
    device = torch.device(f'cuda:{rank}')  # Set device for the current process
    
    data = dataset[0].to(device)  # Load data to the device

    # Initializing DataLoader with neighbor sampling
    trainloader = NeighborLoader(
        data,
        num_neighbors=[15, 10, 5],
        batch_size=128 * 10,
        input_nodes=train_idx,
        shuffle=True,
        persistent_workers=False
    )
  
    model = GCN().to(device)
    model = DistributedDataParallel(model, device_ids=[rank])  # Wrap model for distributed training

    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)  # Using Adam optimizer
    
    num_epochs = 2  # Total number of epochs
    total_epoch_time = 0
    total_memory_usage = 0
    
    # Training loop
    for epoch in range(num_epochs):
        epoch_time, avg_memory_usage, max_memory_usage = train(model, trainloader, criterion, optimizer, device, epoch)
    
        total_epoch_time += epoch_time
        total_memory_usage += avg_memory_usage
    
    # Calculating and printing average values and total convergence time
    avg_epoch_time = total_epoch_time / num_epochs
    avg_memory_usage = total_memory_usage / num_epochs * (1024 *1024)
    convergence_time = time.time() - convergence_start_time
    
    print(f"Average Epoch Time: {avg_epoch_time}s")
    print(f"Average Memory Usage: {avg_memory_usage}MB")
    print(f"Total Convergence Time: {convergence_time}s")

    pynvml.nvmlShutdown()

This script is responsible for initializing and executing the distributed training process. It begins by importing the main function from a file named "my_module.py". This main function contains the logic for training a GCN model, monitoring its performance, and tracking resource usage metrics like memory and time.

The script then imports PyTorch's multiprocessing module to facilitate the use of multiple processors, enabling parallelized operations and speeding up the training process.

Inside the if __name__ == '__main__': block, an initial print statement indicates the start of the preprocessing steps. The world_size variable is set to 2, implying that two separate processes will be spawned for training, each potentially utilizing a different GPU or set of computational resources.

The mp.spawn function is crucial here. It initiates the distributed training by creating world_size number of processes. Each process runs the main function with the provided arguments, performing the training concurrently and ensuring efficient utilization of available resources.



In [None]:
%%time  

# Importing the 'main' function from 'my_module.py' which contains the 
# entire training process and resource tracking mechanisms.
from my_module import main  

# Importing the multiprocessing module from PyTorch, enabling the ability to 
# use multiple processes for parallel and distributed training.
import torch.multiprocessing as mp

# Ensuring the main code is run under the __main__ scope to avoid potential
# issues with multiprocessing across different modules.
if __name__ == '__main__':
    
    # Setting the 'world_size' variable to 2, indicating that two processes 
    # will be spawned for distributed training.
    # In the context of multi-GPU training, this typically means 
    # training will utilize two GPUs.
    
    world_size = 2  
    
    # The 'mp.spawn' function is used to spawn 'world_size' number of 
    # processes that will execute the 'main' function.
    # Each process will run on a separate GPU (or other resources), 
    # enabling parallel and distributed training.
    # The 'args' parameter is used to pass arguments to the 'main' 
    # function in each process.
    # 'nprocs' is set to 'world_size', ensuring the number of processes 
    # spawned equals the defined 'world_size'.
    # 'join=True' means the main process will wait for all spawned 
    # processes to complete before proceeding.
    
    mp.spawn(main, args=(world_size, dataset,), nprocs=world_size, join=True)
