# Chapter 4: Optimization Algorithms for ML Engineers

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/jmamath/interview_prep/blob/main/chapter_04_optimization_algorithms.ipynb)

## Introduction

Picture this: You're training a large language model with billions of parameters. The training process will take weeks and cost hundreds of thousands of dollars in compute. You choose vanilla gradient descent with a fixed learning rate. After two days, the loss hasn't budged. The project is on hold, and your team is frustrated.

Now imagine a different scenario: You use Adam optimizer with a carefully tuned learning rate schedule that includes warmup and cosine decay. The model converges smoothly, the loss drops steadily, and training completes successfully. The difference? A deep understanding of optimization algorithms.

Optimization is the beating heart of machine learning. Every time you train a neural network, fine-tune a transformer, or update a recommendation system, you're running an optimization algorithm. Yet, many ML engineers treat optimizers as black boxes, reaching for Adam without understanding why it works or when it might fail.

In this chapter, we'll demystify the optimization algorithms that power modern machine learning. We'll start with the fundamentals of gradient descent and momentum, build up to adaptive methods like Adam, explore learning rate schedules used in state-of-the-art LLM training, and tackle practical constraints like gradient clipping that prevent training instabilities. By the end, you'll have both the theoretical understanding and practical skills to choose, implement, and debug optimization algorithms for any ML project.

## Learning Objectives
- Implement gradient descent variants (SGD, momentum, Nesterov)
- Understand adaptive learning rate methods (Adam, RMSprop)
- Design and implement learning rate schedules for stable training
- Apply constrained optimization with proximal operators
- Handle gradient instabilities with clipping and trust regions


---


## Problem 1: Gradient Descent Variants (Easy)

### Contextual Introduction
When training a convolutional neural network on MNIST digit classification, you notice that vanilla stochastic gradient descent (SGD) takes many iterations to converge and the training loss oscillates wildly. Your colleague suggests adding momentum, and suddenly the training becomes much smoother and converges faster. What's happening?

Momentum is one of the most important enhancements to gradient descent. Think of it like a ball rolling down a hill: it doesn't just respond to the current slope, but also carries velocity from previous steps. This helps the optimizer navigate ravines and plateaus in the loss landscape much more efficiently.

### Key Concepts
- **Vanilla SGD**: Updates parameters directly using the gradient: θ = θ - lr × ∇L
- **Momentum**: Accumulates a velocity vector: v = β × v + ∇L, then θ = θ - lr × v
- **Nesterov Momentum**: Looks ahead before computing the gradient, providing better convergence
- **Convergence**: Momentum typically provides faster convergence and smoother training curves

### Problem Statement
Implement a gradient descent optimizer that supports vanilla SGD, momentum, and Nesterov accelerated gradient. Your implementation should follow PyTorch's optimizer interface.

**Requirements**:
- Implement vanilla SGD parameter updates
- Implement momentum with velocity accumulation
- Implement Nesterov momentum (lookahead)
- Support multiple parameters

**Example**:
```python
optimizer = GradientDescentOptimizer([param], lr=0.1, momentum=0.9)
optimizer.step()  # Update parameters
```


### Example: Understanding Momentum


In [None]:
import torch
import numpy as np

# Let's optimize a simple quadratic function: f(x) = x^2
# Starting point: x = 10.0
# Learning rate: 0.1
# Momentum: 0.9

print("Optimizing f(x) = x^2 starting from x=10.0\\n")
print("=" * 70)
print("VANILLA SGD (no momentum):")
print("=" * 70)
x_sgd = 10.0
lr = 0.1
for step in range(5):
    grad = 2 * x_sgd  # Gradient of x^2 is 2x
    x_sgd = x_sgd - lr * grad
    print(f"Step {step+1}: x = {x_sgd:.4f}, gradient = {grad:.4f}")

print("\\n" + "=" * 70)
print("SGD WITH MOMENTUM (beta=0.9):")
print("=" * 70)
x_momentum = 10.0
velocity = 0.0
beta = 0.9
for step in range(5):
    grad = 2 * x_momentum
    velocity = beta * velocity + grad  # Accumulate velocity
    x_momentum = x_momentum - lr * velocity
    print(f"Step {step+1}: x = {x_momentum:.4f}, velocity = {velocity:.4f}, gradient = {grad:.4f}")

print("\\n" + "=" * 70)
print(f"RESULT: Vanilla SGD reached x={x_sgd:.4f}, Momentum reached x={x_momentum:.4f}")
print(f"Momentum converges faster due to velocity accumulation!")
print("=" * 70)


### Starter Code


In [None]:
import torch
from typing import List

class GradientDescentOptimizer:
    def __init__(self, params: List[torch.Tensor], lr: float = 0.01, momentum: float = 0.0, nesterov: bool = False):
        """
        Initialize the gradient descent optimizer.
        
        Args:
            params: List of parameters to optimize
            lr: Learning rate
            momentum: Momentum coefficient (0 means no momentum)
            nesterov: Whether to use Nesterov momentum
        """
        self.params = params
        self.lr = lr
        self.momentum = momentum
        self.nesterov = nesterov
        
        # TODO: Initialize velocity buffers for each parameter
        # Hint: Create a list of zero tensors with the same shape as params
        self.velocities = None
    
    def step(self):
        """
        Perform a single optimization step.
        """
        # TODO: Implement the parameter update logic
        # If momentum == 0: vanilla SGD (theta = theta - lr * grad)
        # If momentum > 0 and not nesterov: momentum SGD
        #   v = momentum * v + grad
        #   theta = theta - lr * v
        # If momentum > 0 and nesterov: Nesterov momentum
        #   v = momentum * v + grad
        #   theta = theta - lr * (grad + momentum * v)
        pass
    
    def zero_grad(self):
        """Zero out the gradients of all parameters."""
        for param in self.params:
            if param.grad is not None:
                param.grad.zero_()


def test_gradient_descent():
    """Test the gradient descent optimizer."""
    print("Testing Gradient Descent Optimizer...\\n")
    
    # Test 1: Vanilla SGD on quadratic function
    print("Test 1: Vanilla SGD")
    x = torch.tensor([10.0], requires_grad=True)
    optimizer = GradientDescentOptimizer([x], lr=0.1, momentum=0.0)
    
    for _ in range(5):
        optimizer.zero_grad()
        loss = x ** 2
        loss.backward()
        optimizer.step()
    
    assert x.item() < 5.0, f"Test 1 Failed: SGD should reduce x, got {x.item()}"
    print(f"✓ Vanilla SGD: x converged from 10.0 to {x.item():.4f}\\n")
    
    # Test 2: Momentum should converge faster
    print("Test 2: Momentum SGD")
    x_momentum = torch.tensor([10.0], requires_grad=True)
    optimizer_momentum = GradientDescentOptimizer([x_momentum], lr=0.1, momentum=0.9)
    
    for _ in range(5):
        optimizer_momentum.zero_grad()
        loss = x_momentum ** 2
        loss.backward()
        optimizer_momentum.step()
    
    assert x_momentum.item() < x.item(), f"Test 2 Failed: Momentum should converge faster than vanilla SGD"
    print(f"✓ Momentum SGD: x converged to {x_momentum.item():.4f} (faster than vanilla)\\n")
    
    # Test 3: Nesterov momentum
    print("Test 3: Nesterov Momentum")
    x_nesterov = torch.tensor([10.0], requires_grad=True)
    optimizer_nesterov = GradientDescentOptimizer([x_nesterov], lr=0.1, momentum=0.9, nesterov=True)
    
    for _ in range(5):
        optimizer_nesterov.zero_grad()
        loss = x_nesterov ** 2
        loss.backward()
        optimizer_nesterov.step()
    
    assert x_nesterov.item() < 5.0, f"Test 3 Failed: Nesterov should converge, got {x_nesterov.item()}"
    print(f"✓ Nesterov Momentum: x converged to {x_nesterov.item():.4f}\\n")
    
    print("✓ All gradient descent tests passed!")

test_gradient_descent()


<details>
<summary><b>Hint</b> (click to expand)</summary>

For momentum SGD:
1. Initialize velocities as zeros with the same shape as parameters
2. Update velocity: `v = momentum * v + param.grad`
3. Update parameter: `param = param - lr * v`

For Nesterov momentum:
- The key difference is looking ahead: update using `grad + momentum * v` instead of just `v`

</details>


---

## Problem 2: Adam Optimizer (Medium)

### Contextual Introduction
You're fine-tuning a BERT model for text classification. The model has millions of parameters with vastly different scales: embedding weights might need large updates while attention weights need tiny, careful adjustments. With vanilla SGD, you'd need to carefully tune the learning rate for each parameter group, which is impractical.

Enter Adam (Adaptive Moment Estimation): an optimizer that automatically adapts the learning rate for each parameter based on the history of gradients. It combines the benefits of momentum (first moment) with adaptive learning rates (second moment), making it the default choice for training transformers and other deep networks.

### Key Concepts
- **First Moment (m)**: Exponential moving average of gradients (like momentum)
- **Second Moment (v)**: Exponential moving average of squared gradients (for adaptive learning rates)
- **Bias Correction**: Early iterations have biased estimates; we correct for this
- **Update Rule**: θ = θ - lr × m̂ / (√v̂ + ε), where m̂ and v̂ are bias-corrected moments

### Problem Statement
Implement the Adam optimizer from scratch. Your implementation should maintain first and second moment estimates, apply bias correction, and handle numerical stability.

**Requirements**:
- Initialize moment buffers (m and v)
- Update moments with exponential moving averages
- Apply bias correction: m̂ = m / (1 - β1^t), v̂ = v / (1 - β2^t)
- Handle numerical stability with epsilon

**Example**:
```python
optimizer = AdamOptimizer([param], lr=0.001, betas=(0.9, 0.999))
optimizer.step()
```

### Example: Adam Step-by-Step


In [None]:
import torch

# Let's trace Adam updates for 3 iterations
# Parameter: theta = 1.0
# Gradient: g = 2.0 (constant for simplicity)
# Hyperparameters: lr=0.1, beta1=0.9, beta2=0.999, epsilon=1e-8

theta = 1.0
g = 2.0
lr = 0.1
beta1 = 0.9
beta2 = 0.999
epsilon = 1e-8

m = 0.0  # First moment
v = 0.0  # Second moment

print("Adam Optimizer - 3 Iteration Trace")
print("=" * 80)
print(f"Initial: theta={theta:.6f}, m={m:.6f}, v={v:.6f}\\n")

for t in range(1, 4):
    print(f"Iteration {t}:")
    print("-" * 80)
    
    # Update biased first moment
    m = beta1 * m + (1 - beta1) * g
    print(f"  1. Update m: m = {beta1} * {0 if t==1 else m/(beta1 + (1-beta1)):.6f} + {1-beta1} * {g:.6f} = {m:.6f}")
    
    # Update biased second moment
    v = beta2 * v + (1 - beta2) * (g ** 2)
    print(f"  2. Update v: v = {beta2} * {0 if t==1 else v/(beta2 + (1-beta2)):.6f} + {1-beta2} * {g**2:.6f} = {v:.6f}")
    
    # Bias correction
    m_hat = m / (1 - beta1 ** t)
    v_hat = v / (1 - beta2 ** t)
    print(f"  3. Bias correction: m_hat = {m:.6f} / {1 - beta1**t:.6f} = {m_hat:.6f}")
    print(f"                      v_hat = {v:.6f} / {1 - beta2**t:.6f} = {v_hat:.6f}")
    
    # Parameter update
    theta = theta - lr * m_hat / (v_hat ** 0.5 + epsilon)
    print(f"  4. Update theta: theta = {theta + lr * m_hat / (v_hat ** 0.5 + epsilon):.6f} - {lr} * {m_hat:.6f} / sqrt({v_hat:.6f}) = {theta:.6f}")
    print()

print("=" * 80)
print(f"\\nFinal: theta={theta:.6f}")
print("\\nNotice how bias correction in early iterations prevents slow startup!")


### Starter Code


In [None]:
import torch
from typing import List, Tuple

class AdamOptimizer:
    def __init__(self, params: List[torch.Tensor], lr: float = 0.001, 
                 betas: Tuple[float, float] = (0.9, 0.999), eps: float = 1e-8):
        """
        Initialize the Adam optimizer.
        
        Args:
            params: List of parameters to optimize
            lr: Learning rate
            betas: Coefficients for computing running averages (beta1, beta2)
            eps: Term added for numerical stability
        """
        self.params = params
        self.lr = lr
        self.beta1, self.beta2 = betas
        self.eps = eps
        self.t = 0  # Time step
        
        # TODO: Initialize first moment (m) and second moment (v) buffers
        # Hint: Create lists of zero tensors matching parameter shapes
        self.m = None
        self.v = None
    
    def step(self):
        """
        Perform a single optimization step.
        """
        self.t += 1
        
        # TODO: Implement Adam update rule for each parameter
        # 1. Update biased first moment: m = beta1 * m + (1 - beta1) * grad
        # 2. Update biased second moment: v = beta2 * v + (1 - beta2) * grad^2
        # 3. Compute bias-corrected moments:
        #    m_hat = m / (1 - beta1^t)
        #    v_hat = v / (1 - beta2^t)
        # 4. Update parameters: param = param - lr * m_hat / (sqrt(v_hat) + eps)
        pass
    
    def zero_grad(self):
        """Zero out the gradients of all parameters."""
        for param in self.params:
            if param.grad is not None:
                param.grad.zero_()


def test_adam_optimizer():
    """Test the Adam optimizer implementation."""
    print("Testing Adam Optimizer...\\n")
    
    # Test 1: Basic convergence
    print("Test 1: Adam convergence on quadratic")
    x = torch.tensor([5.0], requires_grad=True)
    optimizer = AdamOptimizer([x], lr=0.1)
    
    for _ in range(10):
        optimizer.zero_grad()
        loss = x ** 2
        loss.backward()
        optimizer.step()
    
    assert x.item() < 1.0, f"Test 1 Failed: Adam should converge, got {x.item()}"
    print(f"✓ Adam converged from 5.0 to {x.item():.6f}\\n")
    
    # Test 2: Bias correction (early iterations)
    print("Test 2: Bias correction in early iterations")
    x2 = torch.tensor([1.0], requires_grad=True)
    optimizer2 = AdamOptimizer([x2], lr=0.1, betas=(0.9, 0.999))
    
    # First step
    optimizer2.zero_grad()
    loss = x2 ** 2
    loss.backward()
    initial_x = x2.item()
    optimizer2.step()
    
    # Check that parameter changed
    assert abs(x2.item() - initial_x) > 1e-6, "Test 2 Failed: Parameter should update"
    print(f"✓ Parameter updated from {initial_x:.6f} to {x2.item():.6f}\\n")
    
    # Test 3: Handles different gradient scales
    print("Test 3: Adaptive learning rates for different scales")
    x3 = torch.tensor([10.0, 0.1], requires_grad=True)
    optimizer3 = AdamOptimizer([x3], lr=0.1)
    
    for _ in range(20):
        optimizer3.zero_grad()
        # Different scales: x3[0] has large gradients, x3[1] has small gradients
        loss = x3[0] ** 2 + 100 * x3[1] ** 2
        loss.backward()
        optimizer3.step()
    
    assert x3[0].item() < 2.0 and x3[1].item() < 0.05, "Test 3 Failed: Both parameters should converge"
    print(f"✓ Both parameters converged: x[0]={x3[0].item():.6f}, x[1]={x3[1].item():.6f}\\n")
    
    print("✓ All Adam optimizer tests passed!")

test_adam_optimizer()


<details>
<summary><b>Hint</b> (click to expand)</summary>

Key steps for Adam:
1. Initialize m and v as zero tensors: `self.m = [torch.zeros_like(p) for p in params]`
2. Update moments with exponential moving average (EMA)
3. Bias correction is crucial in early iterations: divide by `(1 - beta^t)`
4. Use `torch.sqrt()` for the square root and add epsilon inside the sqrt for stability
5. Remember to use `.data` when updating parameters to avoid building computation graph

</details>


---

## Problem 3: Learning Rate Scheduling (Medium)

### Contextual Introduction
You're pretraining a GPT-style language model from scratch. If you start with a high learning rate, the training diverges immediately—the loss shoots to infinity. If you start with a low learning rate, training is stable but painfully slow. The solution? A learning rate schedule with warmup.

Modern LLM training uses sophisticated learning rate schedules: linear warmup for the first few thousand steps (to prevent early instability), followed by cosine decay to zero (for smooth convergence). These schedules are not just nice-to-haves; they're essential for stable, efficient training of large models.

### Key Concepts
- **Warmup**: Gradually increase learning rate from 0 to max over initial steps
- **Step Decay**: Reduce learning rate by a factor every N epochs
- **Cosine Annealing**: Smooth decay following a cosine curve: lr = lr_min + 0.5 × (lr_max - lr_min) × (1 + cos(π × t / T))
- **Combined Schedules**: Warmup + cosine decay is the standard for transformer training

### Problem Statement
Implement a learning rate scheduler that supports step decay, cosine annealing, and combined warmup with cosine decay. The scheduler should track the current step and compute the learning rate dynamically.

**Requirements**:
- Implement step decay (reduce by factor every N steps)
- Implement cosine annealing with configurable min/max
- Implement warmup + cosine decay combination
- Return current learning rate for any step

**Example**:
```python
scheduler = LRScheduler(base_lr=0.001, warmup_steps=100, total_steps=1000)
lr = scheduler.get_lr()  # Get current learning rate
```

### Example: Learning Rate Schedules Visualized


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Let's visualize 3 different learning rate schedules
total_steps = 1000
base_lr = 0.1
steps = np.arange(total_steps)

print("Learning Rate Schedule Examples")
print("=" * 80)

# 1. Step Decay: Reduce by 0.5 every 250 steps
print("\\n1. STEP DECAY (reduce by 0.5 every 250 steps):")
step_decay_lr = []
for step in range(total_steps):
    decay_factor = 0.5 ** (step // 250)
    lr = base_lr * decay_factor
    step_decay_lr.append(lr)
    if step in [0, 250, 500, 750]:
        print(f"   Step {step:4d}: lr = {base_lr:.3f} * 0.5^{step//250} = {lr:.6f}")

# 2. Cosine Annealing: Smooth decay from base_lr to 0
print("\\n2. COSINE ANNEALING (smooth decay to 0):")
cosine_lr = []
for step in range(total_steps):
    # Formula: lr = lr_min + 0.5 * (lr_max - lr_min) * (1 + cos(pi * t / T))
    progress = step / total_steps
    lr = 0.5 * base_lr * (1 + np.cos(np.pi * progress))
    cosine_lr.append(lr)
    if step in [0, 250, 500, 750, 999]:
        print(f"   Step {step:4d}: progress={progress:.2f}, lr = {lr:.6f}")

# 3. Warmup + Cosine: Linear warmup (0-100 steps) then cosine decay
print("\\n3. WARMUP + COSINE (100 step warmup, then cosine decay):")
warmup_steps = 100
warmup_cosine_lr = []
for step in range(total_steps):
    if step < warmup_steps:
        # Linear warmup: lr increases from 0 to base_lr
        lr = base_lr * (step / warmup_steps)
    else:
        # Cosine decay from base_lr to 0
        progress = (step - warmup_steps) / (total_steps - warmup_steps)
        lr = 0.5 * base_lr * (1 + np.cos(np.pi * progress))
    warmup_cosine_lr.append(lr)
    if step in [0, 50, 100, 500, 999]:
        phase = "warmup" if step < warmup_steps else "cosine"
        print(f"   Step {step:4d}: {phase:7s}, lr = {lr:.6f}")

# Plot all three schedules
plt.figure(figsize=(12, 6))
plt.plot(steps, step_decay_lr, label='Step Decay', linewidth=2)
plt.plot(steps, cosine_lr, label='Cosine Annealing', linewidth=2)
plt.plot(steps, warmup_cosine_lr, label='Warmup + Cosine', linewidth=2)
plt.xlabel('Training Step', fontsize=12)
plt.ylabel('Learning Rate', fontsize=12)
plt.title('Learning Rate Schedules Comparison', fontsize=14)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print("\\n" + "=" * 80)
print("Note: Warmup + Cosine is the most common schedule for LLM training!")


### Starter Code


In [None]:
import numpy as np

class LRScheduler:
    def __init__(self, base_lr: float, warmup_steps: int = 0, total_steps: int = 1000):
        """
        Initialize the learning rate scheduler.
        
        Args:
            base_lr: Base learning rate (maximum LR after warmup)
            warmup_steps: Number of warmup steps
            total_steps: Total number of training steps
        """
        self.base_lr = base_lr
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps
        self.current_step = 0
    
    def step(self):
        """Increment the step counter."""
        self.current_step += 1
    
    def step_decay(self, decay_rate: float = 0.5, decay_steps: int = 100) -> float:
        """
        Compute learning rate with step decay.
        
        Args:
            decay_rate: Factor to multiply LR by at each decay step
            decay_steps: Number of steps between each decay
            
        Returns:
            Current learning rate
        """
        # TODO: Implement step decay
        # Formula: lr = base_lr * decay_rate^(current_step // decay_steps)
        pass
    
    def cosine_annealing(self, min_lr: float = 0.0) -> float:
        """
        Compute learning rate with cosine annealing.
        
        Args:
            min_lr: Minimum learning rate
            
        Returns:
            Current learning rate
        """
        # TODO: Implement cosine annealing
        # Formula: lr = min_lr + 0.5 * (base_lr - min_lr) * (1 + cos(pi * t / T))
        # where t is current_step and T is total_steps
        pass
    
    def warmup_cosine(self, min_lr: float = 0.0) -> float:
        """
        Compute learning rate with warmup followed by cosine decay.
        
        Args:
            min_lr: Minimum learning rate after decay
            
        Returns:
            Current learning rate
        """
        # TODO: Implement warmup + cosine schedule
        # If current_step < warmup_steps:
        #     Linear warmup: lr = base_lr * (current_step / warmup_steps)
        # Else:
        #     Cosine decay from base_lr to min_lr
        pass


def test_lr_scheduler():
    """Test the learning rate scheduler."""
    print("Testing Learning Rate Scheduler...\\n")
    
    # Test 1: Step decay
    print("Test 1: Step Decay")
    scheduler = LRScheduler(base_lr=0.1, total_steps=1000)
    
    # Check decay at boundaries
    scheduler.current_step = 0
    lr0 = scheduler.step_decay(decay_rate=0.5, decay_steps=250)
    assert abs(lr0 - 0.1) < 1e-6, f"Test 1a Failed: Expected 0.1, got {lr0}"
    
    scheduler.current_step = 250
    lr250 = scheduler.step_decay(decay_rate=0.5, decay_steps=250)
    assert abs(lr250 - 0.05) < 1e-6, f"Test 1b Failed: Expected 0.05, got {lr250}"
    
    scheduler.current_step = 500
    lr500 = scheduler.step_decay(decay_rate=0.5, decay_steps=250)
    assert abs(lr500 - 0.025) < 1e-6, f"Test 1c Failed: Expected 0.025, got {lr500}"
    
    print(f"✓ Step decay: lr[0]={lr0:.4f}, lr[250]={lr250:.4f}, lr[500]={lr500:.4f}\\n")
    
    # Test 2: Cosine annealing
    print("Test 2: Cosine Annealing")
    scheduler2 = LRScheduler(base_lr=0.1, total_steps=1000)
    
    scheduler2.current_step = 0
    lr_start = scheduler2.cosine_annealing(min_lr=0.0)
    assert abs(lr_start - 0.1) < 1e-6, f"Test 2a Failed: Should start at base_lr"
    
    scheduler2.current_step = 1000
    lr_end = scheduler2.cosine_annealing(min_lr=0.0)
    assert lr_end < 0.01, f"Test 2b Failed: Should decay close to 0"
    
    print(f"✓ Cosine annealing: start={lr_start:.4f}, end={lr_end:.6f}\\n")
    
    # Test 3: Warmup + cosine
    print("Test 3: Warmup + Cosine")
    scheduler3 = LRScheduler(base_lr=0.1, warmup_steps=100, total_steps=1000)
    
    # Check warmup phase
    scheduler3.current_step = 50
    lr_warmup = scheduler3.warmup_cosine(min_lr=0.0)
    assert 0.04 < lr_warmup < 0.06, f"Test 3a Failed: Warmup at step 50 should be ~0.05, got {lr_warmup}"
    
    scheduler3.current_step = 100
    lr_peak = scheduler3.warmup_cosine(min_lr=0.0)
    assert abs(lr_peak - 0.1) < 0.01, f"Test 3b Failed: Should reach base_lr after warmup, got {lr_peak}"
    
    scheduler3.current_step = 1000
    lr_final = scheduler3.warmup_cosine(min_lr=0.0)
    assert lr_final < 0.01, f"Test 3c Failed: Should decay to ~0, got {lr_final}"
    
    print(f"✓ Warmup+Cosine: warmup[50]={lr_warmup:.4f}, peak[100]={lr_peak:.4f}, end={lr_final:.6f}\\n")
    
    print("✓ All learning rate scheduler tests passed!")

test_lr_scheduler()


<details>
<summary><b>Hint</b> (click to expand)</summary>

Key formulas:

**Step Decay:**
```python
num_decays = current_step // decay_steps
lr = base_lr * (decay_rate ** num_decays)
```

**Cosine Annealing:**
```python
progress = current_step / total_steps
lr = min_lr + 0.5 * (base_lr - min_lr) * (1 + np.cos(np.pi * progress))
```

**Warmup + Cosine:**
- During warmup: linear interpolation from 0 to base_lr
- After warmup: apply cosine annealing from base_lr to min_lr
- Remember to adjust the progress calculation for the cosine phase!

</details>


---


## Problem 4: Projected Gradient Descent for Constraints (Medium-Hard)

### Contextual Introduction
You're training a sparse neural network where you want most weights to be exactly zero (not just small). Standard gradient descent will give you small weights, but not exact zeros. Or perhaps you're training a reinforcement learning agent where certain parameters must stay within bounds for safety. How do you enforce these constraints during optimization?

Projected Gradient Descent (PGD) is the answer. The idea is simple: after each gradient step, project the parameters back onto the constraint set. For L1 regularization, this projection is called the "proximal operator" or "soft thresholding," and it's what actually creates sparse solutions.

### Key Concepts
- **Projection**: Map a point to the nearest point in the constraint set
- **Soft Thresholding (L1 Proximal Operator)**: For threshold λ: if |x| < λ, set x = 0; else shrink x toward 0 by λ
- **Box Constraints**: Project onto [min, max] by clipping
- **Sparsity**: Soft thresholding creates exact zeros, inducing sparsity

### Problem Statement
Implement projected gradient descent with support for L1 regularization (soft thresholding) and box constraints. Your implementation should perform a gradient step followed by projection.

**Requirements**:
- Implement soft thresholding (L1 proximal operator)
- Implement box constraint projection
- Combine gradient step with projection
- Demonstrate sparsity-inducing behavior

**Example**:
```python
optimizer = ProjectedGD([param], lr=0.1, constraint='l1', threshold=0.5)
optimizer.step()  # Gradient step + soft thresholding
```



### Example: Soft Thresholding Creates Sparsity


In [None]:
import torch

# Let's see how soft thresholding (L1 proximal operator) creates sparsity
# We'll optimize a simple problem and see weights go to exactly zero

print("Soft Thresholding Example: Creating Sparsity")
print("=" * 80)

# Soft thresholding function (L1 proximal operator)
def soft_threshold(x, threshold):
    """
    Apply soft thresholding: shrinks x toward 0 by threshold, sets to 0 if |x| < threshold
    Formula: sign(x) * max(|x| - threshold, 0)
    """
    return torch.sign(x) * torch.maximum(torch.abs(x) - threshold, torch.tensor(0.0))

# Example values before and after soft thresholding with threshold=0.5
values = torch.tensor([2.0, 1.0, 0.8, 0.4, 0.2, -0.3, -0.7, -1.5])
threshold = 0.5

print(f"\\nThreshold: {threshold}")
print(f"\\n{'Value':<10} {'After Soft Threshold':<25} {'Explanation'}")
print("-" * 80)

for val in values:
    result = soft_threshold(val, threshold)
    if torch.abs(val) < threshold:
        explanation = f"|{val.item():.1f}| < {threshold} → set to 0"
    else:
        explanation = f"shrink by {threshold}: {val.item():.1f} → {result.item():.1f}"
    print(f"{val.item():<10.1f} {result.item():<25.1f} {explanation}")

print("\\n" + "=" * 80)
print("\\nNow let's see soft thresholding in action over 3 gradient steps:")
print("Optimizing f(w) = w^2 with learning rate 0.3 and threshold 0.5\\n")

w = 2.0
lr = 0.3
threshold = 0.5

print(f"Initial: w = {w:.4f}\\n")

for step in range(1, 4):
    print(f"Step {step}:")
    # Gradient step
    grad = 2 * w  # Gradient of w^2
    w_after_grad = w - lr * grad
    print(f"  1. Gradient step: w = {w:.4f} - {lr} * {grad:.4f} = {w_after_grad:.4f}")
    
    # Soft thresholding (projection)
    w = float(soft_threshold(torch.tensor(w_after_grad), threshold).item())
    print(f"  2. Soft threshold: apply threshold {threshold} → w = {w:.4f}")
    print()

print("=" * 80)
print(f"\\nFinal: w = {w:.4f}")
if abs(w) < 1e-6:
    print("\\n🎯 Soft thresholding drove w to EXACTLY zero (sparsity achieved!)")
else:
    print("\\n📉 Soft thresholding is shrinking w toward zero")


### Starter Code


In [None]:
import torch
from typing import List, Optional

class ProjectedGD:
    def __init__(self, params: List[torch.Tensor], lr: float = 0.01, 
                 constraint: str = 'none', threshold: float = 0.0,
                 box_min: Optional[float] = None, box_max: Optional[float] = None):
        """
        Initialize the Projected Gradient Descent optimizer.
        
        Args:
            params: List of parameters to optimize
            lr: Learning rate
            constraint: Type of constraint ('none', 'l1', 'box')
            threshold: Threshold for L1 soft thresholding
            box_min: Minimum value for box constraints
            box_max: Maximum value for box constraints
        """
        self.params = params
        self.lr = lr
        self.constraint = constraint
        self.threshold = threshold
        self.box_min = box_min
        self.box_max = box_max
    
    def soft_threshold(self, x: torch.Tensor, threshold: float) -> torch.Tensor:
        """
        Apply soft thresholding (L1 proximal operator).
        
        Args:
            x: Input tensor
            threshold: Threshold value
            
        Returns:
            Soft-thresholded tensor
        """
        # TODO: Implement soft thresholding
        # Formula: sign(x) * max(|x| - threshold, 0)
        # This sets values with |x| < threshold to exactly 0
        # and shrinks other values toward 0 by threshold
        pass
    
    def project_box(self, x: torch.Tensor, min_val: float, max_val: float) -> torch.Tensor:
        """
        Project onto box constraints [min_val, max_val].
        
        Args:
            x: Input tensor
            min_val: Minimum value
            max_val: Maximum value
            
        Returns:
            Projected tensor
        """
        # TODO: Implement box projection
        # Hint: Use torch.clamp to clip values to [min_val, max_val]
        pass
    
    def step(self):
        """
        Perform a single optimization step with projection.
        """
        # TODO: Implement gradient step + projection
        # 1. Gradient step: param = param - lr * grad
        # 2. Apply projection based on self.constraint:
        #    - 'l1': apply soft_threshold with self.threshold
        #    - 'box': apply project_box with self.box_min and self.box_max
        #    - 'none': no projection
        pass
    
    def zero_grad(self):
        """Zero out the gradients of all parameters."""
        for param in self.params:
            if param.grad is not None:
                param.grad.zero_()


def test_projected_gd():
    """Test the Projected Gradient Descent optimizer."""
    print("Testing Projected Gradient Descent...\\n")
    
    # Test 1: Soft thresholding
    print("Test 1: Soft Thresholding (L1 proximal operator)")
    x = torch.tensor([2.0, 0.5, 0.3, -0.4, -1.0])
    optimizer = ProjectedGD([x], lr=0.1, constraint='l1', threshold=0.5)
    
    result = optimizer.soft_threshold(x, 0.5)
    expected = torch.tensor([1.5, 0.0, 0.0, 0.0, -0.5])  # Shrink by 0.5, set small values to 0
    assert torch.allclose(result, expected, atol=1e-6), f"Test 1 Failed: Expected {expected}, got {result}"
    print(f"✓ Soft thresholding: {x.tolist()} → {result.tolist()}\\n")
    
    # Test 2: Box constraints
    print("Test 2: Box Constraint Projection")
    x2 = torch.tensor([5.0, 0.0, -3.0, 2.0])
    optimizer2 = ProjectedGD([x2], lr=0.1, constraint='box', box_min=-1.0, box_max=3.0)
    
    result2 = optimizer2.project_box(x2, -1.0, 3.0)
    expected2 = torch.tensor([3.0, 0.0, -1.0, 2.0])  # Clip to [-1, 3]
    assert torch.allclose(result2, expected2, atol=1e-6), f"Test 2 Failed: Expected {expected2}, got {result2}"
    print(f"✓ Box projection to [-1, 3]: {x2.tolist()} → {result2.tolist()}\\n")
    
    # Test 3: L1 induces sparsity
    print("Test 3: L1 Constraint Induces Sparsity")
    x3 = torch.tensor([1.0, 0.5, 0.3], requires_grad=True)
    optimizer3 = ProjectedGD([x3], lr=0.5, constraint='l1', threshold=0.3)
    
    for _ in range(5):
        optimizer3.zero_grad()
        loss = (x3 ** 2).sum()  # Minimize squared values
        loss.backward()
        optimizer3.step()
    
    # Check that small values were driven to exactly 0
    num_zeros = (x3 == 0).sum().item()
    assert num_zeros >= 1, f"Test 3 Failed: L1 should create sparsity (zeros), got {x3}"
    print(f"✓ L1 regularization created {num_zeros} exact zeros: {x3.tolist()}\\n")
    
    print("✓ All projected gradient descent tests passed!")

test_projected_gd()


<details>
<summary><b>Hint</b> (click to expand)</summary>

**Soft Thresholding:**
```python
def soft_threshold(x, threshold):
    return torch.sign(x) * torch.maximum(torch.abs(x) - threshold, torch.tensor(0.0))
```

**Box Projection:**
```python
def project_box(x, min_val, max_val):
    return torch.clamp(x, min=min_val, max=max_val)
```

**Key Insight:**
- Soft thresholding creates **exact zeros**, not just small values
- This is why L1 regularization leads to sparse models
- Always apply projection **after** the gradient step

</details>


---


## Problem 5: Gradient Clipping and Practical Constraints (Hard)

### Contextual Introduction
You're training a large transformer model and everything seems fine for the first few hours. Then suddenly, at step 5000, the loss explodes to infinity. The culprit? Gradient explosion. Or perhaps you're implementing RLHF (Reinforcement Learning from Human Feedback) for a language model, and you need to ensure the new policy doesn't drift too far from the reference policy.

Gradient clipping is one of the most important practical techniques in deep learning. It's not theoretically elegant, but it's absolutely essential for training RNNs, transformers, and other deep networks. Similarly, trust region methods keep optimization steps conservative, preventing catastrophic policy updates.

### Key Concepts
- **Gradient Norm**: Total magnitude of gradients: ||g|| = sqrt(sum(g_i^2))
- **Gradient Clipping by Norm**: If ||g|| > threshold, scale g to have norm = threshold
- **Gradient Clipping by Value**: Clip each gradient element to [-threshold, threshold]
- **Trust Region**: Constrain updates to stay within a "trust region" using KL divergence or L2 penalty

### Problem Statement
Implement an optimizer with gradient clipping (by norm and by value) and support for constrained optimization with KL penalties. Your implementation should detect and prevent gradient explosion while maintaining stable training.

**Requirements**:
- Implement gradient norm clipping (rescale if ||g|| > threshold)
- Implement per-parameter value clipping
- Support constrained optimization with penalty term
- Track gradient statistics for adaptive clipping

**Example**:
```python
optimizer = ClippingOptimizer([param], lr=0.01, clip_norm=1.0)
optimizer.step()  # Clips gradients if norm exceeds 1.0
```


### Example: Gradient Explosion and Clipping


In [None]:
import torch
import numpy as np

print("Gradient Clipping Example: Preventing Explosion")
print("=" * 80)

# Simulate a scenario where gradients grow over time (common in RNNs)
# We'll show what happens with and without clipping

def compute_grad_norm(grads):
    """Compute the L2 norm of gradients."""
    return torch.sqrt(sum(torch.sum(g ** 2) for g in grads)).item()

def clip_by_norm(grads, max_norm):
    """Clip gradients by their global norm."""
    total_norm = compute_grad_norm(grads)
    if total_norm > max_norm:
        scale = max_norm / total_norm
        for g in grads:
            g.mul_(scale)
    return total_norm

# Example: Gradient explosion scenario
print("\\nScenario: Gradients doubling each step (like vanishing gradient problem in reverse)\\n")

print("WITHOUT CLIPPING:")
print("-" * 80)
grad = torch.tensor([1.0, 1.0])  # Initial gradient
for step in range(1, 6):
    norm = compute_grad_norm([grad])
    print(f"Step {step}: gradient = {grad.tolist()}, norm = {norm:.2f}")
    grad = grad * 2  # Gradients double (explosion!)
    if norm > 100:
        print("\\n💥 GRADIENT EXPLOSION! Training would diverge here.")
        break

print("\\n" + "=" * 80)
print("WITH NORM CLIPPING (max_norm=5.0):")
print("-" * 80)
grad = torch.tensor([1.0, 1.0])  # Reset
max_norm = 5.0

for step in range(1, 6):
    original_norm = compute_grad_norm([grad])
    clipped_norm = clip_by_norm([grad], max_norm)
    
    if original_norm > max_norm:
        print(f"Step {step}: original norm = {original_norm:.2f} → CLIPPED to {max_norm:.2f}")
        print(f"         gradient after clipping = {grad.tolist()}")
    else:
        print(f"Step {step}: norm = {original_norm:.2f} (no clipping needed)")
        print(f"         gradient = {grad.tolist()}")
    
    grad = grad * 2  # Try to double (but will be clipped next iteration)

print("\\n✅ Clipping prevents explosion! Training remains stable.")

print("\\n" + "=" * 80)
print("\\nCLIPPING BY VALUE vs BY NORM:")
print("-" * 80)

# Show difference between two clipping strategies
grad_example = torch.tensor([3.0, 4.0])  # Norm = 5.0
print(f"Original gradient: {grad_example.tolist()}, norm = {compute_grad_norm([grad_example]):.2f}")

# Clip by norm (rescale entire gradient)
grad_norm_clip = grad_example.clone()
clip_by_norm([grad_norm_clip], max_norm=2.5)
print(f"\\nClip by NORM (max_norm=2.5): {grad_norm_clip.tolist()}")
print(f"  → Preserves direction, scales magnitude to 2.5")

# Clip by value (clip each element independently)
grad_value_clip = torch.clamp(grad_example, min=-2.5, max=2.5)
print(f"\\nClip by VALUE ([-2.5, 2.5]): {grad_value_clip.tolist()}")
print(f"  → Changes direction! Each element clipped independently")

print("\\n" + "=" * 80)
print("\\n🎯 Best Practice: Use NORM clipping for training (preserves gradient direction)")


### Starter Code


In [None]:
import torch
from typing import List, Optional

class ClippingOptimizer:
    def __init__(self, params: List[torch.Tensor], lr: float = 0.01,
                 clip_norm: Optional[float] = None, clip_value: Optional[float] = None,
                 kl_penalty: float = 0.0):
        """
        Initialize optimizer with gradient clipping.
        
        Args:
            params: List of parameters to optimize
            lr: Learning rate
            clip_norm: Maximum gradient norm (None = no clipping)
            clip_value: Maximum absolute value for each gradient (None = no clipping)
            kl_penalty: KL divergence penalty coefficient for trust region
        """
        self.params = params
        self.lr = lr
        self.clip_norm = clip_norm
        self.clip_value = clip_value
        self.kl_penalty = kl_penalty
        self.grad_norms = []  # Track gradient norms for statistics
    
    def compute_grad_norm(self) -> float:
        """
        Compute the global L2 norm of all gradients.
        
        Returns:
            Total gradient norm
        """
        # TODO: Implement gradient norm computation
        # Formula: sqrt(sum of squared gradients across all parameters)
        # Hint: Use torch.sum(param.grad ** 2) for each parameter
        pass
    
    def clip_by_norm(self):
        """
        Clip gradients by their global norm.
        If total_norm > clip_norm, scale all gradients by clip_norm / total_norm.
        """
        # TODO: Implement gradient clipping by norm
        # 1. Compute total gradient norm
        # 2. If total_norm > self.clip_norm:
        #    - Compute scale = self.clip_norm / total_norm
        #    - Multiply each gradient by scale
        pass
    
    def clip_by_value(self):
        """
        Clip each gradient element to [-clip_value, clip_value].
        """
        # TODO: Implement gradient clipping by value
        # Hint: Use torch.clamp(param.grad, -self.clip_value, self.clip_value)
        pass
    
    def step_with_constraint(self, reference_params: Optional[List[torch.Tensor]] = None):
        """
        Perform optimization step with optional KL constraint penalty.
        
        Args:
            reference_params: Reference parameters for KL penalty (trust region)
        """
        # TODO: Implement constrained optimization step
        # 1. Apply gradient clipping (if enabled)
        # 2. If kl_penalty > 0 and reference_params provided:
        #    - Add penalty to gradient: grad += kl_penalty * (param - reference_param)
        # 3. Update parameters: param = param - lr * grad
        pass
    
    def step(self):
        """
        Perform a standard optimization step with clipping.
        """
        # Record gradient norm before clipping
        if self.clip_norm is not None or self.clip_value is not None:
            original_norm = self.compute_grad_norm()
            self.grad_norms.append(original_norm)
        
        # Apply clipping
        if self.clip_norm is not None:
            self.clip_by_norm()
        if self.clip_value is not None:
            self.clip_by_value()
        
        # Update parameters
        for param in self.params:
            if param.grad is not None:
                param.data -= self.lr * param.grad
    
    def zero_grad(self):
        """Zero out the gradients of all parameters."""
        for param in self.params:
            if param.grad is not None:
                param.grad.zero_()


def test_clipping_optimizer():
    """Test the clipping optimizer."""
    print("Testing Gradient Clipping Optimizer...\\n")
    
    # Test 1: Gradient norm computation
    print("Test 1: Gradient Norm Computation")
    x = torch.tensor([3.0, 4.0], requires_grad=True)
    loss = (x ** 2).sum()
    loss.backward()
    
    optimizer = ClippingOptimizer([x], lr=0.1)
    norm = optimizer.compute_grad_norm()
    expected_norm = 10.0  # sqrt(6^2 + 8^2) = sqrt(100) = 10
    assert abs(norm - expected_norm) < 1e-5, f"Test 1 Failed: Expected norm {expected_norm}, got {norm}"
    print(f"✓ Gradient norm: {norm:.2f} (gradients: {x.grad.tolist()})\\n")
    
    # Test 2: Clipping triggers when norm exceeds threshold
    print("Test 2: Norm Clipping Triggers")
    x2 = torch.tensor([3.0, 4.0], requires_grad=True)
    loss2 = (x2 ** 2).sum()
    loss2.backward()
    
    optimizer2 = ClippingOptimizer([x2], lr=0.1, clip_norm=5.0)
    original_grad = x2.grad.clone()
    optimizer2.clip_by_norm()
    
    clipped_norm = optimizer2.compute_grad_norm()
    assert abs(clipped_norm - 5.0) < 1e-5, f"Test 2 Failed: Clipped norm should be 5.0, got {clipped_norm}"
    print(f"✓ Clipping: {original_grad.tolist()} (norm=10.0) → {x2.grad.tolist()} (norm=5.0)\\n")
    
    # Test 3: Value clipping
    print("Test 3: Value Clipping")
    x3 = torch.tensor([10.0, -8.0, 2.0], requires_grad=True)
    loss3 = (x3 ** 2).sum()
    loss3.backward()
    
    optimizer3 = ClippingOptimizer([x3], lr=0.1, clip_value=5.0)
    optimizer3.clip_by_value()
    
    expected = torch.tensor([5.0, -5.0, 4.0])  # Clip to [-5, 5]
    assert torch.allclose(x3.grad, expected, atol=1e-5), f"Test 3 Failed: Expected {expected}, got {x3.grad}"
    print(f"✓ Value clipping to [-5, 5]: {[20.0, -16.0, 4.0]} → {x3.grad.tolist()}\\n")
    
    print("✓ All clipping optimizer tests passed!")

test_clipping_optimizer()


<details>
<summary><b>Hint</b> (click to expand)</summary>

**Computing Gradient Norm:**
```python
total_norm = 0.0
for param in self.params:
    if param.grad is not None:
        total_norm += torch.sum(param.grad ** 2).item()
return total_norm ** 0.5
```

**Clipping by Norm:**
```python
total_norm = self.compute_grad_norm()
if total_norm > self.clip_norm:
    scale = self.clip_norm / total_norm
    for param in self.params:
        if param.grad is not None:
            param.grad.mul_(scale)  # In-place scaling
```

**Key Insights:**
- Norm clipping preserves gradient direction (scales uniformly)
- Value clipping can change gradient direction (clips independently)
- For RNNs and transformers, norm clipping is standard (typically max_norm=1.0)
- Track gradient norms over time to detect instabilities early

</details>


---

## Conclusion

Congratulations! You've now implemented the core optimization algorithms that power modern machine learning. Let's recap what we've covered:

1. **Gradient Descent Variants**: From vanilla SGD to momentum and Nesterov, understanding how velocity accumulation leads to faster convergence.

2. **Adam Optimizer**: The workhorse of deep learning, combining adaptive learning rates with momentum for robust training across diverse parameter scales.

3. **Learning Rate Scheduling**: Essential techniques for stable training, especially warmup and cosine decay used in state-of-the-art LLM training.

4. **Projected Gradient Descent**: Handling constraints through soft thresholding and projections, crucial for sparse models and constrained optimization.

5. **Gradient Clipping**: A practical necessity for preventing training instabilities in RNNs, transformers, and other deep networks.

These aren't just theoretical concepts—they're the tools you'll use every day as an ML engineer. Whether you're debugging a training run that won't converge, choosing hyperparameters for a new model, or implementing custom optimization logic, the deep understanding you've gained here will serve you well.

In the next chapter, we'll build on this foundation to explore training efficiency and memory optimization—because understanding optimization is just the first step toward training models efficiently at scale.
