# Ultra-Scale Playbook: Part 4

Welcome back! In this episode we dive into **Data Parallelism** and three key optimizations to maximize GPU utilization and minimize idle time.

## Overview

- Recap of Data Parallelism (DP)
- **Optimization 1:** Overlap gradient synchronization with backward pass
- **Optimization 2:** Bucketed gradient reductions
- **Optimization 3:** Combining DP with gradient accumulation via `no_sync()`
- Hands-on exercises & quiz

## 1 Data Parallelism Recap

- We **replicate** the full model on each GPU (called *replicas*).
- Each GPU processes a different **microbatch** in parallel: forward → backward.
- To keep replicas in sync, we **all-reduce** their gradients **before** `optimizer.step()`.

```python
# pseudo-DDP loop
for x_mb, y_mb in microbatches:
    output = model(x_mb)        # each GPU
    loss = loss_fn(output,y_mb)
    loss.backward()             # triggers all-reduce of grads
optimizer.step()
```

### The naive inefficiency

By default, DDP performs an all-reduce **after every** `backward()`.  If you have *N* microbatches per update, that is *N* communications—wasting GPU cycles.

## 2 Optimization 1: Overlap gradient sync with backward pass

**Idea:** As soon as each layer’s backward finishes, launch its all-reduce **while** computing the next layer’s backward.

- Attaching a per-parameter hook lets DDP overlap communication & computation.

```python
def attach_sync_hook(model):
    for p in model.parameters():
        if p.requires_grad:
            p.register_hook(lambda grad: dist.all_reduce(grad, op=dist.ReduceOp.SUM))
```

This reduces idle GPU time and can boost throughput by ~1.3×.

In [None]:
import torch
import torch.distributed as dist

# Example: attach a simple all-reduce hook to each parameter
def attach_sync_hook(model):
    for p in model.parameters():
        if p.requires_grad:
            p.register_hook(lambda grad: dist.all_reduce(grad, op=dist.ReduceOp.SUM))

# Usage in a DDP setup (pseudo)
# dist.init_process_group('nccl', ...)
# model = torch.nn.parallel.DistributedDataParallel(model)
# attach_sync_hook(model)


## 3 Optimization 2: Bucketed gradient reductions

**Idea:** Group many small gradient tensors into **buckets**, then do **one** all-reduce per bucket instead of per-tensor.

- Large tensors amortize communication startup cost.
- Buckets often map to layers or parameter groups.

```python
# pseudo-bucket logic
bucket_size = 1_000_000  # number of elements
buckets = []
current = []
for p in model.parameters():
    current.append(p.grad.view(-1))
    if sum(x.numel() for x in current) >= bucket_size:
        buckets.append(torch.cat(current))
        current.clear()
# all_reduce each bucket once
for b in buckets:
    dist.all_reduce(b, op=dist.ReduceOp.SUM)
```

## 4 Optimization 3: DP + Gradient Accumulation → `no_sync()`

When you **accumulate** gradients over *K* microbatches before calling `optimizer.step()`, you only need **one** all-reduce at the end.

```python
model = torch.nn.parallel.DistributedDataParallel(model)
optimizer.zero_grad()

accum_steps = 4
for i in range(accum_steps):
    out = model(x[i])
    loss = loss_fn(out, y[i])
    if i < accum_steps - 1:
        with model.no_sync():
            loss.backward()  # no communication
    else:
        loss.backward()  # single all-reduce here
optimizer.step()
```

In [None]:
import torch
from torch import nn, optim
from torch.nn.parallel import DistributedDataParallel as DDP

# Pseudo-initialization
# dist.init_process_group('nccl', rank=..., world_size=...)
model = nn.Linear(128, 10).cuda()
model = DDP(model)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

# Fake microbatches
microbatches = [(torch.randn(8,128).cuda(), torch.randint(0,10,(8,)).cuda()) for _ in range(4)]
optimizer.zero_grad()
for i, (x_mb, y_mb) in enumerate(microbatches):
    logits = model(x_mb)
    loss = loss_fn(logits, y_mb)
    if i < len(microbatches)-1:
        with model.no_sync():
            loss.backward()
    else:
        loss.backward()
optimizer.step()
print("Done with bucketed accumulation + single sync")

## 🧠 Exercise 1: Implement Overlap Hook

1. Copy the `attach_sync_hook` function above.
2. Initialize a small DDP model (e.g. `nn.Linear`) in a single-node 2-GPU setup.
3. Measure throughput **with** and **without** the hook to see the speed-up.

## 🧠 Exercise 2: Bucket via Parameter Groups

1. Partition model parameters into 2 buckets (e.g. first half, second half).
2. After a backward pass, manually gather all `.grad` from each bucket, concat, and all-reduce once.
3. Compare to default DDP behavior by timing a few iterations.

## ❓ Quiz

1. **Why overlap gradient sync with backward?**  
   A) To reduce peak memory usage  
   B) To hide communication under computation ✅  
   C) To increase batch size  
   D) To simplify code

2. **Bucketed reductions** help by:  
   A) Reducing number of all-reduce calls ✅  
   B) Eliminating gradient accumulation  
   C) Avoiding model replication  
   D) Merging forward & backward

3. **`model.no_sync()`** is used to:  
   A) Disable gradient computation  
   B) Temporarily disable DDP’s all-reduce hooks ✅  
   C) Stop optimizer steps  
   D) Synchronize parameters

4. **After `loss.backward()` inside `no_sync()`, when do gradients sync?**  
   A) Immediately, per parameter  
   B) At the next `loss.backward()` call ✅  
   C) At optimizer initialization  
   D) They never sync