# Distributed Training - Multi-GPU Jobs

## Distributed Training Overview

### Single GPU vs Multi-GPU

| Aspect | Single GPU | 4 GPUs (DDP) | 8 GPUs (DDP) |
|--------|-----------|--------------|---------------|
| **Batch Size** | 32 | 128 (32×4) | 256 (32×8) |
| **Training Time** | 10 hours | ~2.5 hours | ~1.25 hours |
| **Speedup** | 1× | ~4× | ~8× |
| **Memory** | 16GB | 64GB total | 128GB total |

### When to Use Multi-GPU
- ✅ Large datasets (>100K samples)
- ✅ Large models (>100M parameters)
- ✅ Production training (time-critical)
- ✅ Hyperparameter sweeps (parallel experiments)
- ❌ Small experiments (<1 hour on 1 GPU)

In [2]:
import sys
sys.path.insert(0, '..')

from src.scheduler.job_queue import JobConfig, get_job_queue, Priority
from src.resources.resource_pool import ResourcePoolManager, PoolType
from src.resources.gpu_manager import get_gpu_manager
from src.monitoring.logger import setup_logging

setup_logging(level="INFO")

# Create resource pool manager instance
resource_pool_manager = ResourcePoolManager()

print("✓ Setup complete!")

2025-10-09 21:09:53 - root - [32mINFO[0m - Logging initialized at level INFO [logger.py:202]
✓ Setup complete!


## 1. Multi-GPU Resource Allocation

In [5]:
# Check available GPUs
gpu_manager = get_gpu_manager()
pool_manager = resource_pool_manager

print(f"Total GPUs: {gpu_manager.num_gpus}")

# Check production pool capacity
prod_pool = pool_manager.get_pool(PoolType.PRODUCTION)


Total GPUs: 0


## 2. Distributed Training Job Configuration

In [6]:
# Configure 4-GPU distributed training job
distributed_job = JobConfig(
    job_id="distributed-training-001",
    user_id="ml-team",
    job_type="fine_tuning",
    
    # Multi-GPU settings
    pool_type="production",
    num_gpus=4,  # Request 4 GPUs
    is_preemptible=False,  # Don't interrupt
    
    # Model and data
    model_name="bert-large-uncased",  # Large model
    dataset_path="./data/large_dataset.csv",
    output_dir="./output/distributed/run_001",
    
    priority="MEDIUM",
    estimated_duration=7200,  # 2 hours
    
    config={
        # Distributed training config
        "distributed": True,
        "backend": "nccl",  # NVIDIA Collective Communications Library
        "world_size": 4,  # 4 GPUs
        
        # Per-GPU batch size (total = 32 × 4 = 128)
        "per_device_train_batch_size": 32,
        "per_device_eval_batch_size": 32,
        
        # Optimization
        "gradient_accumulation_steps": 2,  # Effective batch = 128 × 2 = 256
        "fp16": True,  # Mixed precision for speed
        
        # Training
        "learning_rate": 3e-5,
        "num_train_epochs": 3,
        "max_seq_length": 512,
        
        # Checkpointing
        "save_steps": 1000,
        "save_total_limit": 3,
        "eval_steps": 500,
        
        # Distributed-specific
        "ddp_find_unused_parameters": False,
        "dataloader_num_workers": 4,
        "dataloader_pin_memory": True
    }
)

print("Distributed Training Configuration:")
print(f"  GPUs: {distributed_job.num_gpus}")
print(f"  Backend: {distributed_job.config['backend']}")
print(f"  Per-GPU Batch: {distributed_job.config['per_device_train_batch_size']}")
print(f"  Total Batch: {distributed_job.num_gpus * distributed_job.config['per_device_train_batch_size']}")
print(f"  With Grad Accum: {distributed_job.num_gpus * distributed_job.config['per_device_train_batch_size'] * distributed_job.config['gradient_accumulation_steps']}")
print(f"  Mixed Precision: {distributed_job.config['fp16']}")

Distributed Training Configuration:
  GPUs: 4
  Backend: nccl
  Per-GPU Batch: 32
  Total Batch: 128
  With Grad Accum: 256
  Mixed Precision: True


## 3. PyTorch DistributedDataParallel (DDP) Setup

This is how the platform sets up distributed training internally.

In [7]:
# Example DDP setup code (for reference)
print("""
Internal DDP Setup (PyTorch):

```python
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# Initialize process group
dist.init_process_group(
    backend='nccl',
    init_method='env://',  # Uses MASTER_ADDR, MASTER_PORT
    world_size=4,          # 4 GPUs
    rank=local_rank         # 0, 1, 2, 3
)

# Set device for this process
torch.cuda.set_device(local_rank)
device = torch.device(f'cuda:{local_rank}')

# Wrap model with DDP
model = create_model()
model = model.to(device)
model = DDP(model, device_ids=[local_rank])

# Create distributed sampler
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset,
    num_replicas=4,
    rank=local_rank
)

# DataLoader with distributed sampler
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=32,  # Per-GPU batch
    sampler=train_sampler,
    num_workers=4,
    pin_memory=True
)

# Training loop
for epoch in range(num_epochs):
    train_sampler.set_epoch(epoch)  # Shuffle differently each epoch
    
    for batch in train_loader:
        # Each GPU processes different batch
        outputs = model(batch)
        loss = compute_loss(outputs)
        
        loss.backward()
        # Gradients averaged across GPUs automatically
        
        optimizer.step()
        optimizer.zero_grad()
```

Key Points:
- Each GPU runs separate process (rank 0-3)
- Data split across GPUs (DistributedSampler)
- Gradients synchronized via all-reduce
- Only rank 0 saves checkpoints
""")


Internal DDP Setup (PyTorch):

```python
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# Initialize process group
dist.init_process_group(
    backend='nccl',
    init_method='env://',  # Uses MASTER_ADDR, MASTER_PORT
    world_size=4,          # 4 GPUs
    rank=local_rank         # 0, 1, 2, 3
)

# Set device for this process
torch.cuda.set_device(local_rank)
device = torch.device(f'cuda:{local_rank}')

# Wrap model with DDP
model = create_model()
model = model.to(device)
model = DDP(model, device_ids=[local_rank])

# Create distributed sampler
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset,
    num_replicas=4,
    rank=local_rank
)

# DataLoader with distributed sampler
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=32,  # Per-GPU batch
    sampler=train_sampler,
    num_workers=4,
    pin_memory=True
)

# Training loop
for epoch in range(num_epochs):
    t

## 4. Submit Distributed Job

In [10]:
job_queue = get_job_queue()

try:
    job_id = job_queue.submit_job(distributed_job, Priority.MEDIUM)
    print(f"✓ Distributed job submitted: {job_id}")
    print(f"  Waiting for 4 GPUs...")
    print(f"  Job will start when resources available")
except Exception as e:
    print(f"✗ Submission failed: {e}")

2025-10-09 21:17:02 - src.scheduler.job_queue - [32mINFO[0m - Submitted job distributed-001 to queue 'default' (priority=MEDIUM) [job_queue.py:340]
✓ Distributed job submitted: distributed-001
  Waiting for 4 GPUs...
  Job will start when resources available


## 5. Performance Optimization Tips

In [11]:
print("""
🚀 Distributed Training Optimization:

1. **Batch Size Scaling**
   - Scale batch size linearly with GPUs
   - 1 GPU: batch=32 → 4 GPUs: batch=128
   - Adjust learning rate: LR × sqrt(N_GPUs)

2. **Gradient Accumulation**
   - Simulate larger batches
   - 4 GPUs × batch 32 × accum 2 = effective 256
   - Useful when GPU memory limited

3. **Mixed Precision (FP16)**
   - 2× faster training
   - 2× less memory
   - Enable with: fp16=True
   - Uses NVIDIA Apex/AMP

4. **Data Loading**
   - num_workers=4 per GPU
   - pin_memory=True for faster GPU transfer
   - Prefetch data during GPU compute

5. **NCCL Backend**
   - Fastest for NVIDIA GPUs
   - Optimized all-reduce
   - Requires CUDA-aware MPI

6. **Gradient Checkpointing**
   - Save memory for large models
   - Trade compute for memory
   - Enable with: gradient_checkpointing=True

7. **Communication Optimization**
   - ddp_find_unused_parameters=False (faster)
   - Bucket gradients for fewer syncs
   - Overlap compute and communication
""")


🚀 Distributed Training Optimization:

1. **Batch Size Scaling**
   - Scale batch size linearly with GPUs
   - 1 GPU: batch=32 → 4 GPUs: batch=128
   - Adjust learning rate: LR × sqrt(N_GPUs)

2. **Gradient Accumulation**
   - Simulate larger batches
   - 4 GPUs × batch 32 × accum 2 = effective 256
   - Useful when GPU memory limited

3. **Mixed Precision (FP16)**
   - 2× faster training
   - 2× less memory
   - Enable with: fp16=True
   - Uses NVIDIA Apex/AMP

4. **Data Loading**
   - num_workers=4 per GPU
   - pin_memory=True for faster GPU transfer
   - Prefetch data during GPU compute

5. **NCCL Backend**
   - Fastest for NVIDIA GPUs
   - Optimized all-reduce
   - Requires CUDA-aware MPI

6. **Gradient Checkpointing**
   - Save memory for large models
   - Trade compute for memory
   - Enable with: gradient_checkpointing=True

7. **Communication Optimization**
   - ddp_find_unused_parameters=False (faster)
   - Bucket gradients for fewer syncs
   - Overlap compute and communication

## 6. Monitoring Distributed Training

In [12]:
# Monitor all GPUs in distributed job
if gpu_manager.num_gpus >= 4:
    print("GPU Utilization (All 4 GPUs):\n")
    
    for gpu_id in range(min(4, gpu_manager.num_gpus)):
        gpu_info = gpu_manager.get_gpu_info(gpu_id)
        if gpu_info:
            print(f"GPU {gpu_id}:")
            print(f"  Utilization: {gpu_info.utilization:.1f}%")
            print(f"  Memory: {gpu_info.used_memory/(1024**3):.1f}GB / {gpu_info.total_memory/(1024**3):.1f}GB")
            print(f"  Temperature: {gpu_info.temperature:.1f}°C\n")
    
    print("\n✓ All GPUs should show similar utilization (~95-100%)")
    print("  If one GPU is low, check for data loading bottleneck")
else:
    print("Fewer than 4 GPUs available")

Fewer than 4 GPUs available


## 7. Scaling Comparison

### Example: BERT-Large Fine-tuning

In [13]:
import pandas as pd

# Scaling efficiency data
scaling_data = pd.DataFrame([
    {"GPUs": 1, "Batch": 16, "Time (hours)": 10.0, "Speedup": 1.0, "Efficiency": 100.0},
    {"GPUs": 2, "Batch": 32, "Time (hours)": 5.2, "Speedup": 1.92, "Efficiency": 96.0},
    {"GPUs": 4, "Batch": 64, "Time (hours": 2.6, "Speedup": 3.85, "Efficiency": 96.2},
    {"GPUs": 8, "Batch": 128, "Time (hours)": 1.4, "Speedup": 7.14, "Efficiency": 89.3},
])

print("BERT-Large Fine-tuning Scaling:\n")
print(scaling_data.to_string(index=False))

print("\n📊 Analysis:")
print("  - Near-linear speedup up to 4 GPUs (96% efficiency)")
print("  - 8 GPUs: 7.14× speedup (89% efficiency)")
print("  - Efficiency loss due to communication overhead")
print("  - Sweet spot: 4 GPUs for most use cases")

BERT-Large Fine-tuning Scaling:

 GPUs  Batch  Time (hours)  Speedup  Efficiency  Time (hours
    1     16          10.0     1.00       100.0          NaN
    2     32           5.2     1.92        96.0          NaN
    4     64           NaN     3.85        96.2          2.6
    8    128           1.4     7.14        89.3          NaN

📊 Analysis:
  - Near-linear speedup up to 4 GPUs (96% efficiency)
  - 8 GPUs: 7.14× speedup (89% efficiency)
  - Efficiency loss due to communication overhead
  - Sweet spot: 4 GPUs for most use cases
