In [None]:
# Understanding Distributed Data Parallel (DDP) of Pytorch
"""
DDP implements data parallelism at the module level which can run across multiple machines.
Data parallelism is a way to process multiple data batches across multiple devices
simultaneously to achieve better performance. In PyTorch, the DistributedSampler
ensures each device gets a non-overlapping input batch. The model is replicated on
all the devices; each replica calculates gradients and simultaneously synchronizes
with the others using the ring all-reduce algorithm.
"""

# Note: Try to use this notebook with multiple GPUs only


In [2]:
import torch

In [3]:
torch.__version__

'2.0.1+cu118'

In [5]:
rank =2
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
map_location

{'cuda:0': 'cuda:2'}

In [None]:
# Lets understand how multiple GPU devices are detected and computation are done on it.
# Imagine we have 2 GPU system
if torch.cuda.is_available():
  device = torch.device('cuda')
else:
  device = torch.device('cpu')

print(device)
if torch.cuda.device_count() == 2:
  device0 = torch.device('cuda:0')
  device1 = torch.device('cuda:1')
  print(device1)

  x = torch.tensor([2., 3.], device=device) # takes the default GPU device
  y = torch.tensor([4., 7.], device = device0) # takes specific GPU device mentioned

  # Context GPU
  with torch.cuda.device(1):

    a = torch.tensor([9., 6.], device= device) # It takes the default context GPU
    b = torch.tensor([9., 6.]).cuda() #  It also takes the default context GPU

    c = a + b  # operations will be done on context GPU
    """
    Note: Cross-GPU operations are not allowed by default. If two elements are on
    two different device, computation can't be done.
    """


In [None]:
# torch.distributed leverages message passing semantics allowing each process to communicate data
# to any of the other processes. As opposed to the multiprocessing (torch.multiprocessing)
# package, processes can use different communication backends and are not restricted to
# being executed on the same machine.
# It supports three built-in backends, each with different capabilities.

"""
Rule of thumb

 -> Use the NCCL backend for distributed GPU training

 -> Use the Gloo backend for distributed CPU training.

"""

# Backends for torch.distributed

# 1. Gloo
"""
 It supports all point-to-point and collective operations on CPU, and all collective
 operations on GPU. The implementation of the collective operations for CUDA tensors
 is not as optimized as the ones provided by the NCCL backend.

 In order to use multiple GPUs, let us also make the following modifications:

 1. Use device = torch.device("cuda:{}".format(rank))

 2. model = Net() →→ model = Net().to(device)

 3. Use data, target = data.to(device), target.to(device

"""

# 2. NCCL Backend
"""
 It provides an optimized implementation of collective operations against CUDA tensors.
 If you only use CUDA tensors for your collective operations, consider using this
 backend for the best in class performance

 It needs to be initialized using the torch.distributed.init_process_group()
 function before calling any other methods.
 """


In [None]:
import os
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.nn as nn
import torch.optim as optim


# Processes for NCCL backend DDP

# Returns True if the distributed package is available.
torch.distributed.is_available()

#
torch.distributed.init_process_group(*args, **kwargs)
"""
Initializes the default distributed process group, and this will also initialize the distributed package.

There are 2 main ways to initialize a process group:
 1. Specify store, rank, and world_size explicitly.

 2. Specify init_method (a URL string) which indicates where/how to discover peers.
    Optionally specify rank and world_size, or encode all required parameters in
    the URL and omit them.

If neither is specified, init_method is assumed to be “env://”.

"""
# world_size = Number of processes participating in the job. Required if store is specified.
# rank = Rank of the current process (it should be a number between 0 and world_size-1).
#         Required if store is specified.

# For more refernece, visit :  https://pytorch.org/docs/stable/distributed.html#module-torch.distributed


In [None]:
# Setting the environment and initialisating process_group
def setup(rank, world_size):

  # ************ important step
  os.environ['MASTER_ADDR'] = 'localhost'
  os.environ['MASTER_PORT'] = '12355'

  #************** important step
  # initialize the process group
  dist.init_process_group("nccl", rank=rank, world_size=world_size)

#***************
# Destroying process group created for training comupation at the end
def cleanup():
  dist.destroy_process_group()


#**************
# Define Simple Linear model for demo purpose
class SimpleLinearModel(nn.Module):

  def __init__(self):
    super(SimpleLinearModel, self).__init__()
    self.net = nn.Linear(10, 10)

  def forward(self, x):
    return self.net(x)

In [None]:
# training step using DDP methodology
def demo_training(rank, world_size):

  print(f"Running basic DDP example on rank {rank}.")
  setup(rank, world_size)

  # create model and move it to GPU with id rank
  model = SimpleLinearModel().to(rank)
  #******* Important step
  ddp_model = DDP(model, device_ids=[rank])

  loss_fn = nn.MSELoss()
  optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

  # DistributedSampler chunks the input data across all distributed processes.
  # the effective batch size is 32 * nprocs
  """
  train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=False,
    sampler=DistributedSampler(train_dataset)
  """

  optimizer.zero_grad()
  outputs = ddp_model(torch.randn(20, 10))
  labels = torch.randn(20, 10).to(rank)
  loss_fn(outputs, labels).backward()
  optimizer.step()

  cleanup()

In [None]:
# run the code to spawn model on each processor before starting training
def run_demo(demo_fn, world_size):

  mp.spawn(demo_fn,args=(world_size,),nprocs=world_size,join=True)

In [None]:
# Note: DDP won't run in Jupyter Notebook, It needs to be run as a Python script.
# Please refer to the Python script and training model added in this repo.


In [None]:
# Issues with the DDP training methodology

# Skewed Processing Speeds
"""
Different processes are expected to launch the same number of synchronizations and
reach these synchronization points in the same order and enter each synchronization
point at roughly the same time. Otherwise, fast processes might arrive early and
timeout while waiting for stragglers.
"""
# Note: Developers are responsible for managing and monitoring balance workload
# distributions across processes.

"""
Sometimes, skewed processing speeds are inevitable due to, e.g., network delays,
resource contentions, or unpredictable workload spikes.
To avoid timeouts in these situations, make sure that you pass a sufficiently large
timeout value when calling init_process_group.
-> timeout=datetime.timedelta(seconds=1800) (by default, it is 30 minutes)
"""


In [None]:
# Saving and Loading Checkpoints
"""
It's common practice to use torch.save and torch.load to checkpoint modules during
training and recover from checkpoints.

When using DDP, one optimization is to save the model in only one process(rank 0),then
load it to all processes, reducing write overhead. This is correct because all processes
start from the same parameters and gradients are synchronized in backward passes,
and hence optimizers should keep setting parameters to the same values.
"""
# Note: Make sure no process starts loading before the saving is finished. To achieve
# this, we can ad barrier before starting next model update cycle.
# torch.distributed.barrier()

"""
Additionally, when loading the module, you need to provide an appropriate map_location
argument to prevent a process from stepping into others’ devices. If map_location
is missing, torch.load will first load the module to CPU.
"""
# Note: For more advanced failure recovery and elasticity support, refer to TorchElastic.

In [None]:
import tempfile
# DDP model training using Checkpoints as fault-tolerance procedure

def demo_training_with_checkpoint(rank, world_size):

  print(f"Running training with checkpoint example on rank {rank}.")
  setup(rank, world_size)

  # create model and move it to GPU with id rank
  model = SimpleLinearModel().to(rank)
  #******* Important step
  ddp_model = DDP(model, device_ids=[rank])

  # &&&&&&&&&&&&&&&&&&&&&&&&&&
  CHK_PATH = tempfile.gettempdir() + "/model.checkpoint"
  if rank==0:
    torch.save(ddp_model.state_dict(), CHK_PATH)

  # Use a barrier() to make sure that other process loads the model after process 0 saves it.
  dist.barrier()

  # configure map_location properly, e.g: {'cuda:0': 'cuda:2'} for rank = 2
  map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}

  ddp_model.load_state_dict(torch.load(CHK_PATH, map_location=map_location))
  # &&&&&&&&&&&&&&&&&&&&&&&&&&


  loss_fn = nn.MSELoss()
  optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

  # DistributedSampler chunks the input data across all distributed processes.
  # the effective batch size is 32 * nprocs
  """
  train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=False,
    sampler=DistributedSampler(train_dataset)
  """

  optimizer.zero_grad()
  outputs = ddp_model(torch.randn(20, 10))
  labels = torch.randn(20, 10).to(rank)
  loss_fn(outputs, labels).backward()
  optimizer.step()

  cleanup()