<a href="https://colab.research.google.com/github/takzen/pytorch-black-belt/blob/main/notebooks/43_DDP_Concepts.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# --------------------------------------------------------------
# ☁️ COLAB SETUP (Automatyczna instalacja środowiska)
# --------------------------------------------------------------
import sys
import os

# Sprawdzamy, czy jesteśmy w Google Colab
if 'google.colab' in sys.modules:
    print('☁️ Wykryto środowisko Google Colab. Konfiguruję...')

    # 1. Pobieramy plik requirements.txt bezpośrednio z repozytorium
    !wget -q https://raw.githubusercontent.com/takzen/ai-engineering-handbook/main/requirements.txt -O requirements.txt

    # 2. Instalujemy biblioteki
    print('⏳ Instaluję zależności (to może chwilę potrwać)...')
    !pip install -q -r requirements.txt

    print('✅ Gotowe! Środowisko jest zgodne z repozytorium.')
else:
    print('💻 Wykryto środowisko lokalne. Zakładam, że masz już uv/venv.')


# 🥋 Lekcja 43: DDP (Distributed Data Parallel) - Anatomia Synchronizacji

Dlaczego `nn.DataParallel` (DP) jest złe?
Bo kopiuje model do VRAM przy każdym kroku (Forward), a potem go kasuje. Narzut komunikacyjny jest gigantyczny.

**DDP (Distributed Data Parallel)** jest inne:
1.  Model jest kopiowany raz (na starcie).
2.  Procesy żyją niezależnie (nie blokują się przez GIL).
3.  Jedyny moment komunikacji to **Uśrednianie Gradientów (AllReduce)**.

**Matematyka DDP:**
*   GPU 0: Obliczył gradient $g_0 = 10$.
*   GPU 1: Obliczył gradient $g_1 = 12$.
*   **AllReduce:** Oba GPU ustalają: "Nasz wspólny gradient to $(10+12)/2 = 11$".
*   Update: Oba GPU odejmują $11$ od wag. Pozostają identyczne.

Zasymulujemy to ręcznie na dwóch "wirtualnych" GPU.

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import copy

# Konfiguracja
# Udajemy, że mamy 2 urządzenia (nawet jeśli to CPU)
RANK_0_DEVICE = "cpu"
RANK_1_DEVICE = "cpu"

# 1. TWORZYMY MODEL (Baza)
# nn.Linear(10, 1) tworzy wagi o kształcie [1, 10]
base_model = nn.Linear(10, 1, bias=False)

# Ustawiamy wagi na sztywno, żeby start był identyczny
with torch.no_grad():
    base_model.weight.fill_(1.0)

# 2. Rozsyłamy model na "GPU" (Repliki)
model_gpu0 = copy.deepcopy(base_model).to(RANK_0_DEVICE)
model_gpu1 = copy.deepcopy(base_model).to(RANK_1_DEVICE)

# Każdy ma swój optymalizator
opt_gpu0 = optim.SGD(model_gpu0.parameters(), lr=0.1)
opt_gpu1 = optim.SGD(model_gpu1.parameters(), lr=0.1)

print("Symulacja: Dwa procesy wystartowały z identycznym modelem.")

# --- POPRAWKA ---
# Zamiast .item() na całym tensorze, bierzemy pierwszy element [0, 0]
print(f"Waga GPU0 (pierwsza): {model_gpu0.weight[0, 0].item()}")
print(f"Waga GPU1 (pierwsza): {model_gpu1.weight[0, 0].item()}")

Symulacja: Dwa procesy wystartowały z identycznym modelem.
Waga GPU0 (pierwsza): 1.0
Waga GPU1 (pierwsza): 1.0


## Krok 1: Forward/Backward (Desynchronizacja)

Każdy proces dostaje **inne dane** (dzięki `DistributedSampler`).
Dlatego każdy wyliczy **inny gradient**.
Jeśli zrobimy `step()` teraz, modele się "rozjadą" i trening pójdzie do kosza.

In [3]:
# Dane dla GPU 0 (np. pierwsza połowa batcha)
data_0 = torch.tensor([[1.0] * 10]).to(RANK_0_DEVICE) # Same jedynki
target_0 = torch.tensor([[100.0]]).to(RANK_0_DEVICE)

# Dane dla GPU 1 (np. druga połowa batcha)
data_1 = torch.tensor([[2.0] * 10]).to(RANK_1_DEVICE) # Same dwójki
target_1 = torch.tensor([[200.0]]).to(RANK_1_DEVICE)

# --- PROCES GPU 0 ---
opt_gpu0.zero_grad()
pred_0 = model_gpu0(data_0)
loss_0 = (pred_0 - target_0)**2
loss_0.backward()

# --- PROCES GPU 1 ---
opt_gpu1.zero_grad()
pred_1 = model_gpu1(data_1)
loss_1 = (pred_1 - target_1)**2
loss_1.backward()

print("--- GRADIENTY LOKALNE ---")
# Gradienty są różne, bo dane były różne!
grad_0 = model_gpu0.weight.grad
grad_1 = model_gpu1.weight.grad

print(f"Grad GPU0: {grad_0[0,0].item():.2f}")
print(f"Grad GPU1: {grad_1[0,0].item():.2f}")
print("Modele chcą iść w różnych kierunkach!")

--- GRADIENTY LOKALNE ---
Grad GPU0: -180.00
Grad GPU1: -720.00
Modele chcą iść w różnych kierunkach!


## Krok 2: AllReduce (Synchronizacja)

To jest ten moment, który DDP robi automatycznie (poprzez backend `nccl` na NVIDIA lub `gloo` na CPU).
Musimy uśrednić gradienty ze wszystkich GPU.

$$ G_{global} = \frac{G_0 + G_1 + ... + G_k}{K} $$

Następnie nadpisujemy lokalne gradienty tym globalnym.

In [4]:
# Symulacja operacji DIST.ALL_REDUCE
with torch.no_grad():
    # 1. Sumujemy (Reduce)
    global_grad = grad_0 + grad_1
    
    # 2. Dzielimy przez liczbę GPU (Average)
    global_grad = global_grad / 2.0
    
    # 3. Rozsyłamy z powrotem do modeli (Broadcast)
    # Nadpisujemy lokalny .grad
    model_gpu0.weight.grad.copy_(global_grad)
    model_gpu1.weight.grad.copy_(global_grad)

print("--- PO ALL-REDUCE ---")
print(f"Grad GPU0: {model_gpu0.weight.grad[0,0].item():.2f}")
print(f"Grad GPU1: {model_gpu1.weight.grad[0,0].item():.2f}")
print("Gradienty są teraz identyczne. Jesteśmy zsynchronizowani.")

--- PO ALL-REDUCE ---
Grad GPU0: -450.00
Grad GPU1: -450.00
Gradienty są teraz identyczne. Jesteśmy zsynchronizowani.


In [5]:
# Krok 3: Optimizer Step
opt_gpu0.step()
opt_gpu1.step()

print("\n--- WAGI PO KROKU ---")
# POPRAWKA: Indeksowanie [0, 0]
w0 = model_gpu0.weight[0, 0].item()
w1 = model_gpu1.weight[0, 0].item()

print(f"Waga GPU0: {w0:.4f}")
print(f"Waga GPU1: {w1:.4f}")

if w0 == w1:
    print("✅ SUKCES! Modele pozostały bliźniakami.")
else:
    print("❌ BŁĄD! Modele się rozjechały.")


--- WAGI PO KROKU ---
Waga GPU0: 46.0000
Waga GPU1: 46.0000
✅ SUKCES! Modele pozostały bliźniakami.


## Boilerplate (Jak to wygląda w kodzie?)

W prawdziwym skrypcie nie robisz tego ręcznie. Używasz wrappera `DistributedDataParallel`.

Oto "Szablon Startowy", który każdy inżynier DDP ma pod ręką.
*(Ten kod nie zadziała w notatniku, bo wymaga uruchomienia przez `torchrun`, ale jest referencją)*.

```python
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

def main():
    # 1. Inicjalizacja grupy procesowej
    dist.init_process_group("nccl")
    
    # 2. Sprawdzenie, kim jestem (Rank)
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    device = torch.device(f"cuda:{rank}")
    
    # 3. Model na GPU
    model = MyModel().to(device)
    # Magia: DDP automatycznie rejestruje hooki na gradientach, żeby robić AllReduce
    model = DDP(model, device_ids=[rank])
    
    # 4. Sampler (Żeby każdy GPU dostał inne dane!)
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    loader = DataLoader(dataset, sampler=sampler, batch_size=32)
    
    # 5. Pętla
    for epoch in range(10):
        sampler.set_epoch(epoch) # Ważne dla tasowania!
        for batch in loader:
            ...

## 🥋 Black Belt Summary

1.  **DistributedSampler:** Kluczowy element. Dzieli tort danych na kawałki. Bez tego każdy GPU uczyłby się na tym samym (strata zasobów) lub losowo (ryzyko duplikatów).
2.  **SyncBatchNorm:** Zwykły BatchNorm działa tylko lokalnie (na 1 GPU). Jeśli masz mały batch per GPU (np. 2), BN zwariuje. Musisz użyć `nn.SyncBatchNorm.convert_sync_batchnorm(model)`, żeby statystyki były liczone globalnie (kosztuje trochę czasu na komunikację).
3.  **`torchrun`:** Nie uruchamiaj skryptów przez `python train.py`. Używaj `torchrun --nproc_per_node=4 train.py`. To automatycznie zarządza rangami.