### Model training with the `DistributedDataParallel` strategy

In [6]:
import torch
from torch.xpu import device


# Creating a multilayer perceptron with two hidden layers

class NeuralNetwork(torch.nn.Module):
  def __init__(self, num_inputs, num_outputs):
    super().__init__()
    self.layers = torch.nn.Sequential(
        # First hidden layer
        torch.nn.Linear(num_inputs, 30),
        torch.nn.ReLU(),

        # Second hidden layer
        torch.nn.Linear(30, 20),
        torch.nn.ReLU(),

        # Output layer
        torch.nn.Linear(20, num_outputs)
    )

  def forward(self, x):
    logits = self.layers(x)
    return logits


# Creating a small toy dataset
X_train = torch.tensor([
    [-1.2, 3.1],
    [-0.9, 2.9],
    [-0.5, 2.6],
    [2.3, -1.1],
    [2.7, -1.5]
])

y_train = torch.tensor([0, 0, 0, 1, 1])

x_test = torch.tensor([
    [-0.8, 2.8],
    [2.6, -1.6],])

y_test = torch.tensor([0, 1])


In [7]:
# Defining a custom Dataset class
from torch.utils.data import Dataset

class ToyDataset(Dataset):
  def __init__(self, X, y):
    self.features = X
    self.labels = y

  def __getitem__(self, index):
    one_x = self.features[index]
    one_y = self.labels[index]
    return one_x, one_y

  def __len__(self):
    return self.labels.shape[0]


train_ds = ToyDataset(X_train, y_train)
test_ds = ToyDataset(x_test, y_test)

In [8]:
# Instantiating data loaders
from torch.utils.data import DataLoader

torch.manual_seed(123)

train_loader = DataLoader(
    dataset=train_ds,
    batch_size=2,
    shuffle=True,
    num_workers=0)


test_loader = DataLoader(
    dataset=test_ds,
    batch_size=2,
    shuffle=False,
    num_workers=0
)

In [12]:
import torch
import os

import torch.nn.functional as F
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
from torch.utils.data import DistributedSampler



def ddp_setup(rank, world_size):
    """Initialise distributed training in PyTorch
    This function sets the main node’s address and port to allow for communication between the different processes, initialises the process group with the NCCL backend (designed for GPU-to-GPU communication), and sets the rank (process identifier) and world size (total number of processes). Finally, it specifies the GPU device corresponding to the current model training process rank."""

    # Environment setup (like a meeting point)
    os.environ["MASTER_ADDR"] = "localhost" # Address of the main node (where the processes meet on the computer)
    os.environ["MASTER_ADDR"] = "12345" # Any free port on the machine (which door to use)

    # Process Group Initialisation
    init_process_group(
        backend="nccl", # NVIDIA Collective Communication Library (Communication protocol)
        rank=rank, # refers to the index of the GPU we want to use (This process's ID, Giving each GPU worker an ID number)
        world_size=world_size # is the number of GPUs to use (total number of processes)
    )
    torch.cuda.set_device(rank) # Sets the current GPU device on which tensors will be allocated and operations will be performed (This assigns work to specific GPUs: If rank=0, use GPU 0 If rank=1, use GPU 1)

In [9]:
def prepare_dataset():
    # Insert dataset preparation code
    train_loader = DataLoader(
        dataset=train_ds,
        batch_size=2,
        shuffle=False, # Distributed-Sampler takes care of the shuffling now
        pin_memory=True, # Enables faster memory transfer when training on GPU (Makes CPU-to-GPU transfer faster)
        drop_last=True, # Drops incomplete final batch - Prevents issues with uneven batch sizes
        sampler=DistributedSampler(train_ds) # split the dataset to distinct, non-overlapping subsets for each process(GPU). Each GPU will receive a different subsample of the training data. To ensure this, we set `sampler=DistributedSampler(train_ds)` in the training loader.
    )
    return train_loader, test_loader

# Think of it like dealing cards in a poker game:
# - DistributedSampler is the dealer
# - Each GPU is a player
# - Cards (data) are dealt evenly
# - No player (GPU) gets the same cards (data)

In [13]:
def compute_accuracy(model, dataloader, device): # `model` is the trained NN, `dataloader` provides batches of test/validation data
    model = model.eval() # Puts the model in evaluation mode (Disables dropout layers, Uses the running statistics in batch normalization)
    correct = 0
    total_examples = 0

    for idx, (features, labels) in enumerate(dataloader):
        with torch.no_grad():
            logits = model(features)

        predictions = torch.argmax(logits, dim=1)
        compare = labels == predictions # Creates a boolean tensor where: `True`--> prediction matches the label, `False`--> prediction was wrong
        print(f" compare is: {compare}")
        correct+= torch.sum(compare)
        total_examples += len(compare)

    return (correct / total_examples).item() # .item() converts from tensor to Python scalar

In [None]:
def main(rank, world_size, num_epochs):
    ddp_setup(rank, world_size)
    train_loader, test_loader = prepare_dataset()

    model = NeuralNetwork(num_inputs=2, num_outputs=2)
    model.to(rank) # we now transfer the model and data to the target device
    optimiser = torch.optim.SGD(model.parameters(), lr = 0.5)
    model =  DDP(model,device_ids=[rank])

    for epoch in range(num_epochs):
        for features, labels in train_loader:
            features, labels = features.to(rank), labels.to(rank) # rank is the GPU ID
            logits = model(features)
            loss = F.cross_entropy(logits, labels)

            optimiser.zero_grad()
            loss.backward()
            optimiser.step()

            print(f"[GPU{rank}] Epoch: {epoch+1:03d}/{num_epochs:03d}"
                  f" | Batch size {labels.shape[0]:03d}"
                  f" | Train/Val Loss: {loss:.2f}")

    model.eval()
    train_acc = compute_accuracy(model, train_loader, device=rank)
    print(f"[GPU{rank}] Training accuracy", train_acc)

    test_acc = compute_accuracy(model, test_loader, device=rank)
    print(f"[GPU{rank}] Test accuracy", test_acc)

    destroy_process_group()

In [None]:
if __name__ == "__main__":
    print("Number of GPUs available:", torch.cuda.device_count())
    torch.manual_seed(123)
    num_epochs = 3
    world_size = torch.cuda.device_count()
    mp.spawn(main, args=(world_size, num_epochs), nprocs=world_size) # Here, the `spawn` function launches one process per GPU setting `nproces=world_size`, where the world size is the number of available GPUs.
    # Note that the main function has a `rank` argument that we don’t include in the `mp.spawn()` call. That’s because the rank, which refers to the process ID we use as the GPU ID, is already passed automatically.

The `main` function sets up the distributed environment via `ddp_setup` —another function we defined—loads the training and test sets, sets up the model, and carries out the training. Compared to the single-GPU training , we now transfer the model and data to the target device via `.to(rank)`, which we use to refer to the GPU device ID. Also, we wrap the model via DDP, which enables the synchronisation of the gradients between the different GPUs during training. After the training finishes and we evaluate the models, we use `destroy_process_group()` to cleanly exit the distributed training and free up the allocated resources.

Earlier I mentioned that each GPU will receive a different subsample of the training data. To ensure this, we set `sampler=DistributedSampler(train_ds)` in the training loader.