# Diffusion Models: From Theory to Practice
## A Comprehensive Hands-on Tutorial

**Based on**: Lecture 16 - Generative Models: Diffusion
**Author**: Ho-min Park
**Adapted for Interactive Learning**

---

## 🎯 Learning Objectives

By the end of this notebook, you will:
1. Understand the mathematical foundations of diffusion models
2. Implement forward and reverse diffusion processes
3. Build a simple diffusion model from scratch
4. Explore advanced techniques like DDIM and classifier-free guidance
5. Apply diffusion models to real-world tasks

---

## 📚 Table of Contents

1. **Setup and Prerequisites**
2. **Part 1: Mathematical Foundations** (Exercises 1-3)
3. **Part 2: Forward and Reverse Processes** (Exercises 4-6)
4. **Part 3: Building a Simple Diffusion Model** (Exercises 7-8)
5. **Part 4: Advanced Techniques** (Exercises 9-10)
6. **Summary and Further Resources**

## 📦 Setup and Imports

First, let's import all necessary libraries and set up our environment.

In [None]:
# Core libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

# Deep learning libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Visualization
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from IPython.display import HTML, display
import imageio
from PIL import Image
from tqdm.notebook import tqdm

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# Configure plotting
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')
%matplotlib inline

print('✅ All libraries imported successfully!')
print(f'PyTorch version: {torch.__version__}')
print(f'CUDA available: {torch.cuda.is_available()}')

---
# Part 1: Mathematical Foundations 🔢

## Understanding Diffusion Models

Diffusion models work by gradually adding noise to data (forward process) and then learning to reverse this process (reverse process). The key insight is that if we can learn to denoise slightly noisy data, we can generate new samples by starting from pure noise and iteratively denoising.

## Exercise 1: Understanding Gaussian Noise Addition

### 📖 Concept
The forward diffusion process gradually transforms data into Gaussian noise through a Markov chain:
$$x_t = \sqrt{1-\beta_t} \cdot x_{t-1} + \sqrt{\beta_t} \cdot \epsilon$$
where $\epsilon \sim \mathcal{N}(0, I)$ and $\beta_t$ is the noise schedule.

### 💻 Implementation

In [None]:
def add_gaussian_noise(x, noise_level):
    """Add Gaussian noise to data
    Args:
        x: Input data
        noise_level: Standard deviation of noise (beta_t)
    Returns:
        Noisy data and the noise added
    """
    noise = np.random.randn(*x.shape) * noise_level
    noisy_x = x + noise
    return noisy_x, noise

# Generate synthetic 2D data
np.random.seed(42)
n_samples = 1000

# Create a spiral dataset
theta = np.linspace(0, 4*np.pi, n_samples)
r = theta / (4*np.pi)
x_original = np.column_stack([
    r * np.cos(theta),
    r * np.sin(theta)
])

# Add noise at different levels
noise_levels = [0.0, 0.05, 0.1, 0.2, 0.5]
fig, axes = plt.subplots(1, 5, figsize=(15, 3))

for idx, noise_level in enumerate(noise_levels):
    x_noisy, _ = add_gaussian_noise(x_original, noise_level)
    axes[idx].scatter(x_noisy[:, 0], x_noisy[:, 1], s=1, alpha=0.5)
    axes[idx].set_title(f'β = {noise_level}')
    axes[idx].set_xlim(-1.5, 1.5)
    axes[idx].set_ylim(-1.5, 1.5)
    axes[idx].set_aspect('equal')

plt.suptitle('Progressive Noise Addition (Forward Process)', fontsize=14)
plt.tight_layout()
plt.show()

print("💡 Key Insight: As β increases, the structure gradually disappears into noise.")

## Exercise 2: Implementing Noise Schedules

### 📖 Concept
The noise schedule $\{\beta_t\}_{t=1}^T$ controls how quickly noise is added. Common schedules include:
- **Linear**: $\beta_t$ increases linearly from $\beta_1$ to $\beta_T$
- **Cosine**: Smoother transition, better for high-resolution images
- **Quadratic**: Accelerated noise addition

### 💻 Implementation

In [None]:
class NoiseSchedule:
    """Different noise schedule implementations"""
    
    def __init__(self, num_timesteps=1000, beta_start=1e-4, beta_end=0.02):
        self.num_timesteps = num_timesteps
        self.beta_start = beta_start
        self.beta_end = beta_end
    
    def linear_schedule(self):
        """Linear noise schedule (original DDPM)"""
        return np.linspace(self.beta_start, self.beta_end, self.num_timesteps)
    
    def cosine_schedule(self, s=0.008):
        """Cosine noise schedule (improved DDPM)"""
        steps = self.num_timesteps + 1
        t = np.linspace(0, self.num_timesteps, steps)
        alphas_cumprod = np.cos(((t / self.num_timesteps) + s) / (1 + s) * np.pi * 0.5) ** 2
        alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
        betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
        return np.clip(betas, 0.0001, 0.999)
    
    def quadratic_schedule(self):
        """Quadratic noise schedule"""
        t = np.linspace(0, 1, self.num_timesteps)
        return self.beta_start + (self.beta_end - self.beta_start) * t ** 2

# Compare different schedules
scheduler = NoiseSchedule(num_timesteps=1000)

schedules = {
    'Linear': scheduler.linear_schedule(),
    'Cosine': scheduler.cosine_schedule(),
    'Quadratic': scheduler.quadratic_schedule()
}

# Visualize schedules
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

for idx, (name, betas) in enumerate(schedules.items()):
    # Plot beta values
    axes[idx].plot(betas, label='β_t', color='blue', alpha=0.7)
    
    # Calculate and plot cumulative product of alphas
    alphas = 1 - betas
    alphas_cumprod = np.cumprod(alphas)
    axes[idx].plot(alphas_cumprod, label='ᾱ_t', color='red', alpha=0.7)
    
    axes[idx].set_xlabel('Timestep')
    axes[idx].set_ylabel('Value')
    axes[idx].set_title(f'{name} Schedule')
    axes[idx].legend()
    axes[idx].grid(True, alpha=0.3)

plt.suptitle('Comparison of Noise Schedules', fontsize=14)
plt.tight_layout()
plt.show()

print("💡 Key Insight: The cosine schedule provides a smoother transition,")
print("   preventing sudden information loss in early steps.")

### 🎯 Your Turn!
Modify the `NoiseSchedule` class to implement an exponential schedule where $\beta_t = \beta_1 \cdot e^{t \cdot \log(\beta_T/\beta_1) / T}$. Compare it with the existing schedules.

## Exercise 3: The Reparameterization Trick

### 📖 Concept
Instead of computing $x_t$ step by step, we can directly sample it from $x_0$ using:
$$x_t = \sqrt{\bar{\alpha}_t} \cdot x_0 + \sqrt{1-\bar{\alpha}_t} \cdot \epsilon$$
where $\bar{\alpha}_t = \prod_{s=1}^{t} \alpha_s$ and $\alpha_t = 1 - \beta_t$.

### 💻 Implementation

In [None]:
class ForwardDiffusion:
    """Forward diffusion process with reparameterization"""
    
    def __init__(self, betas):
        self.betas = betas
        self.alphas = 1 - betas
        self.alphas_cumprod = np.cumprod(self.alphas)
        self.sqrt_alphas_cumprod = np.sqrt(self.alphas_cumprod)
        self.sqrt_one_minus_alphas_cumprod = np.sqrt(1 - self.alphas_cumprod)
    
    def q_sample(self, x_0, t, noise=None):
        """Sample x_t from x_0 using reparameterization trick
        Args:
            x_0: Original data
            t: Timestep
            noise: Optional pre-generated noise
        Returns:
            x_t: Noisy data at timestep t
        """
        if noise is None:
            noise = np.random.randn(*x_0.shape)
        
        sqrt_alpha_t = self.sqrt_alphas_cumprod[t]
        sqrt_one_minus_alpha_t = self.sqrt_one_minus_alphas_cumprod[t]
        
        return sqrt_alpha_t * x_0 + sqrt_one_minus_alpha_t * noise

# Demonstrate reparameterization on an image
# Create a simple synthetic image (checkerboard pattern)
def create_checkerboard(size=32, square_size=4):
    """Create a checkerboard pattern"""
    img = np.zeros((size, size))
    for i in range(0, size, square_size*2):
        for j in range(0, size, square_size*2):
            img[i:i+square_size, j:j+square_size] = 1
            img[i+square_size:i+2*square_size, j+square_size:j+2*square_size] = 1
    return img

# Create image and diffusion process
img = create_checkerboard(64, 8)
scheduler = NoiseSchedule(num_timesteps=1000)
betas = scheduler.cosine_schedule()
forward_process = ForwardDiffusion(betas)

# Sample at different timesteps
timesteps = [0, 100, 250, 500, 750, 999]
fig, axes = plt.subplots(1, 6, figsize=(15, 3))

for idx, t in enumerate(timesteps):
    if t == 0:
        noisy_img = img
    else:
        noisy_img = forward_process.q_sample(img, t)
    
    axes[idx].imshow(noisy_img, cmap='gray', vmin=-2, vmax=2)
    axes[idx].set_title(f't = {t}')
    axes[idx].axis('off')

plt.suptitle('Direct Sampling with Reparameterization Trick', fontsize=14)
plt.tight_layout()
plt.show()

print("💡 Key Insight: The reparameterization trick allows us to directly sample")
print("   x_t from x_0 without computing all intermediate steps.")

---
# Part 2: Forward and Reverse Processes 🔄

Now let's implement the core components of diffusion models: the forward process that adds noise and the reverse process that removes it.

## Exercise 4: Understanding the Score Function

### 📖 Concept
The score function $\nabla_x \log p(x)$ indicates the direction of increasing probability density. In diffusion models, we approximate this with a neural network that predicts the noise:
$$\epsilon_\theta(x_t, t) \approx -\sqrt{1-\bar{\alpha}_t} \cdot \nabla_{x_t} \log p(x_t)$$

### 💻 Implementation

In [None]:
class SimpleNoisePredictor(nn.Module):
    """Simple neural network for noise prediction"""
    
    def __init__(self, input_dim=2, hidden_dim=128, time_dim=32):
        super().__init__()
        
        # Time embedding layers
        self.time_embed = nn.Sequential(
            nn.Linear(1, time_dim),
            nn.ReLU(),
            nn.Linear(time_dim, time_dim)
        )
        
        # Main network
        self.net = nn.Sequential(
            nn.Linear(input_dim + time_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )
    
    def forward(self, x, t):
        """Predict noise given noisy data and timestep"""
        # Embed timestep
        t_emb = self.time_embed(t.unsqueeze(-1))
        
        # Concatenate with input
        h = torch.cat([x, t_emb], dim=-1)
        
        # Predict noise
        return self.net(h)

# Create and visualize the model
model = SimpleNoisePredictor()
print("Model Architecture:")
print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")
print("\nModel structure:")
print(model)

# Test the model with random input
x_test = torch.randn(32, 2)  # Batch of 32 2D points
t_test = torch.rand(32)  # Random timesteps
noise_pred = model(x_test, t_test)
print(f"\nInput shape: {x_test.shape}")
print(f"Timestep shape: {t_test.shape}")
print(f"Output (predicted noise) shape: {noise_pred.shape}")

## Exercise 5: Training a Simple Diffusion Model

### 📖 Concept
The training algorithm:
1. Sample data $x_0 \sim p_{data}$
2. Sample timestep $t \sim \text{Uniform}\{1, ..., T\}$
3. Sample noise $\epsilon \sim \mathcal{N}(0, I)$
4. Create noisy sample: $x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1-\bar{\alpha}_t} \epsilon$
5. Predict noise: $\hat{\epsilon} = \epsilon_\theta(x_t, t)$
6. Loss: $L = \|\epsilon - \hat{\epsilon}\|^2$

### 💻 Implementation

In [None]:
def train_diffusion_model(model, data, n_epochs=100, batch_size=128, lr=1e-3):
    """Train a simple diffusion model"""
    
    # Setup
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    # Create noise schedule
    scheduler = NoiseSchedule(num_timesteps=1000)
    betas = torch.tensor(scheduler.cosine_schedule(), dtype=torch.float32, device=device)
    alphas = 1 - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)
    sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
    sqrt_one_minus_alphas_cumprod = torch.sqrt(1 - alphas_cumprod)
    
    # Convert data to tensor
    data_tensor = torch.tensor(data, dtype=torch.float32, device=device)
    
    # Training loop
    losses = []
    model.train()
    
    for epoch in tqdm(range(n_epochs), desc='Training'):
        epoch_losses = []
        
        # Shuffle data
        perm = torch.randperm(len(data_tensor))
        data_shuffled = data_tensor[perm]
        
        for i in range(0, len(data_tensor), batch_size):
            # Get batch
            batch = data_shuffled[i:i+batch_size]
            batch_size_actual = len(batch)
            
            # Sample random timesteps
            t = torch.randint(0, 1000, (batch_size_actual,), device=device)
            
            # Sample noise
            noise = torch.randn_like(batch)
            
            # Create noisy samples
            sqrt_alpha_t = sqrt_alphas_cumprod[t].unsqueeze(-1)
            sqrt_one_minus_alpha_t = sqrt_one_minus_alphas_cumprod[t].unsqueeze(-1)
            x_noisy = sqrt_alpha_t * batch + sqrt_one_minus_alpha_t * noise
            
            # Predict noise
            t_normalized = t.float() / 1000.0  # Normalize timesteps
            noise_pred = model(x_noisy, t_normalized)
            
            # Compute loss
            loss = F.mse_loss(noise_pred, noise)
            
            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            epoch_losses.append(loss.item())
        
        avg_loss = np.mean(epoch_losses)
        losses.append(avg_loss)
        
        if (epoch + 1) % 20 == 0:
            print(f"Epoch {epoch+1}/{n_epochs}, Loss: {avg_loss:.4f}")
    
    return losses

# Generate training data (Swiss roll dataset)
from sklearn.datasets import make_swiss_roll
n_samples = 5000
noise = 0.1
X, _ = make_swiss_roll(n_samples=n_samples, noise=noise)
# Use only 2D projection
X_2d = X[:, [0, 2]] / 10.0  # Normalize

# Train the model
model = SimpleNoisePredictor()
losses = train_diffusion_model(model, X_2d, n_epochs=100)

# Plot training loss
plt.figure(figsize=(10, 4))
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.grid(True, alpha=0.3)
plt.show()

print("\n💡 Key Insight: The model learns to predict the noise that was added to the data.")

## Exercise 6: Sampling from the Trained Model

### 📖 Concept
DDPM Sampling algorithm:
1. Start with pure noise: $x_T \sim \mathcal{N}(0, I)$
2. For $t = T, T-1, ..., 1$:
   - Predict noise: $\hat{\epsilon} = \epsilon_\theta(x_t, t)$
   - Compute mean: $\mu_\theta(x_t, t)$
   - Sample: $x_{t-1} \sim \mathcal{N}(\mu_\theta, \sigma_t^2 I)$
3. Return $x_0$

### 💻 Implementation

In [None]:
@torch.no_grad()
def ddpm_sample(model, n_samples=100, n_steps=1000, device='cpu'):
    """Sample from diffusion model using DDPM"""
    
    model.eval()
    
    # Setup noise schedule
    scheduler = NoiseSchedule(num_timesteps=n_steps)
    betas = torch.tensor(scheduler.cosine_schedule(), dtype=torch.float32, device=device)
    alphas = 1 - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)
    alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
    
    sqrt_recip_alphas = torch.sqrt(1.0 / alphas)
    sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
    sqrt_one_minus_alphas_cumprod = torch.sqrt(1 - alphas_cumprod)
    
    posterior_variance = betas * (1 - alphas_cumprod_prev) / (1 - alphas_cumprod)
    
    # Start from pure noise
    x = torch.randn(n_samples, 2, device=device)
    
    # Store intermediate steps for visualization
    trajectory = [x.cpu().numpy()]
    
    # Reverse diffusion process
    for t in tqdm(reversed(range(n_steps)), desc='Sampling', total=n_steps):
        # Create batch of timesteps
        t_batch = torch.full((n_samples,), t, device=device, dtype=torch.float32)
        
        # Predict noise
        noise_pred = model(x, t_batch / n_steps)
        
        # Compute x_{t-1}
        mean = sqrt_recip_alphas[t] * (x - betas[t] / sqrt_one_minus_alphas_cumprod[t] * noise_pred)
        
        if t > 0:
            noise = torch.randn_like(x)
            std = torch.sqrt(posterior_variance[t])
            x = mean + std * noise
        else:
            x = mean
        
        # Store every 100th step
        if t % 100 == 0:
            trajectory.append(x.cpu().numpy())
    
    return x.cpu().numpy(), trajectory

# Sample from the trained model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
samples, trajectory = ddpm_sample(model, n_samples=500, device=device)

# Visualize the sampling process
fig, axes = plt.subplots(2, 5, figsize=(15, 6))

# Show trajectory (reverse process)
for i, (ax, step) in enumerate(zip(axes[0], trajectory[:5])):
    ax.scatter(step[:, 0], step[:, 1], s=5, alpha=0.5)
    ax.set_xlim(-3, 3)
    ax.set_ylim(-3, 3)
    ax.set_title(f't = {1000 - i*100}')
    ax.set_aspect('equal')

# Compare with original data
axes[1, 0].scatter(X_2d[:500, 0], X_2d[:500, 1], s=5, alpha=0.5, label='Original')
axes[1, 0].set_title('Original Data')
axes[1, 0].set_xlim(-3, 3)
axes[1, 0].set_ylim(-3, 3)
axes[1, 0].set_aspect('equal')

axes[1, 1].scatter(samples[:, 0], samples[:, 1], s=5, alpha=0.5, label='Generated')
axes[1, 1].set_title('Generated Samples')
axes[1, 1].set_xlim(-3, 3)
axes[1, 1].set_ylim(-3, 3)
axes[1, 1].set_aspect('equal')

# Hide unused axes
for ax in axes[1, 2:]:
    ax.axis('off')

plt.suptitle('DDPM Sampling Process', fontsize=14)
plt.tight_layout()
plt.show()

print("\n💡 Key Insight: The model successfully generates samples that match the data distribution!")

### 🎯 Your Turn!
Implement DDIM sampling (deterministic) by setting the variance to 0 in the sampling process. Compare the results with DDPM sampling.

---
# Part 3: Building a Complete Diffusion Model 🏗️

Let's build a more sophisticated diffusion model with U-Net architecture for image generation.

## Exercise 7: Implementing a Mini U-Net

### 📖 Concept
U-Net is the standard architecture for diffusion models. It features:
- **Encoder path**: Downsamples while increasing channels
- **Decoder path**: Upsamples while decreasing channels
- **Skip connections**: Preserve fine details
- **Time embedding**: Conditions the network on timestep

### 💻 Implementation

In [None]:
class SinusoidalPositionalEmbedding(nn.Module):
    """Sinusoidal time embeddings"""
    
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
    
    def forward(self, t):
        device = t.device
        half_dim = self.dim // 2
        embeddings = torch.exp(
            -torch.log(torch.tensor(10000.0)) * 
            torch.arange(half_dim, device=device) / half_dim
        )
        embeddings = t[:, None] * embeddings[None, :]
        embeddings = torch.cat([torch.sin(embeddings), torch.cos(embeddings)], dim=-1)
        return embeddings

class ResidualBlock(nn.Module):
    """Residual block with time embedding"""
    
    def __init__(self, in_channels, out_channels, time_channels):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, padding=1)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.time_emb = nn.Linear(time_channels, out_channels)
        self.norm1 = nn.GroupNorm(8, out_channels)
        self.norm2 = nn.GroupNorm(8, out_channels)
        self.relu = nn.ReLU()
        
        if in_channels != out_channels:
            self.shortcut = nn.Conv2d(in_channels, out_channels, 1)
        else:
            self.shortcut = nn.Identity()
    
    def forward(self, x, t):
        h = self.conv1(x)
        h = self.norm1(h)
        h = self.relu(h)
        h = h + self.time_emb(t)[:, :, None, None]
        h = self.conv2(h)
        h = self.norm2(h)
        h = self.relu(h)
        return h + self.shortcut(x)

class MiniUNet(nn.Module):
    """Simplified U-Net for diffusion models"""
    
    def __init__(self, in_channels=1, base_channels=32, time_dim=128):
        super().__init__()
        
        # Time embedding
        self.time_mlp = nn.Sequential(
            SinusoidalPositionalEmbedding(time_dim),
            nn.Linear(time_dim, time_dim * 2),
            nn.ReLU(),
            nn.Linear(time_dim * 2, time_dim),
        )
        
        # Encoder
        self.down1 = ResidualBlock(in_channels, base_channels, time_dim)
        self.down2 = ResidualBlock(base_channels, base_channels * 2, time_dim)
        self.pool = nn.MaxPool2d(2)
        
        # Bottleneck
        self.bottleneck = ResidualBlock(base_channels * 2, base_channels * 2, time_dim)
        
        # Decoder
        self.up2 = ResidualBlock(base_channels * 4, base_channels * 2, time_dim)
        self.up1 = ResidualBlock(base_channels * 3, base_channels, time_dim)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)
        
        # Output
        self.out_conv = nn.Conv2d(base_channels, in_channels, 1)
    
    def forward(self, x, t):
        # Time embedding
        t_emb = self.time_mlp(t)
        
        # Encoder
        d1 = self.down1(x, t_emb)
        x1 = self.pool(d1)
        d2 = self.down2(x1, t_emb)
        x2 = self.pool(d2)
        
        # Bottleneck
        bottleneck = self.bottleneck(x2, t_emb)
        
        # Decoder
        u2 = self.upsample(bottleneck)
        u2 = torch.cat([u2, d2], dim=1)
        u2 = self.up2(u2, t_emb)
        
        u1 = self.upsample(u2)
        u1 = torch.cat([u1, d1], dim=1)
        u1 = self.up1(u1, t_emb)
        
        return self.out_conv(u1)

# Create and test the model
unet = MiniUNet(in_channels=1, base_channels=32)
print("Mini U-Net Architecture:")
print(f"Total parameters: {sum(p.numel() for p in unet.parameters()):,}")

# Test with random input
x = torch.randn(4, 1, 32, 32)  # Batch of 4 grayscale 32x32 images
t = torch.rand(4) * 1000  # Random timesteps
output = unet(x, t)
print(f"\nInput shape: {x.shape}")
print(f"Output shape: {output.shape}")
print("\n💡 Key Insight: U-Net preserves spatial dimensions while processing at multiple scales.")

## Exercise 8: Image Generation with Diffusion

### 📖 Concept
Let's train our U-Net on simple geometric shapes and generate new samples.

### 💻 Implementation

In [None]:
def create_simple_dataset(n_samples=1000, img_size=32):
    """Create a dataset of simple geometric shapes"""
    images = []
    
    for _ in range(n_samples):
        img = np.zeros((img_size, img_size))
        shape_type = np.random.choice(['circle', 'square', 'triangle'])
        
        if shape_type == 'circle':
            center = np.random.randint(8, img_size-8, 2)
            radius = np.random.randint(4, 8)
            y, x = np.ogrid[:img_size, :img_size]
            mask = (x - center[1])**2 + (y - center[0])**2 <= radius**2
            img[mask] = 1
        
        elif shape_type == 'square':
            size = np.random.randint(8, 16)
            x = np.random.randint(0, img_size - size)
            y = np.random.randint(0, img_size - size)
            img[y:y+size, x:x+size] = 1
        
        else:  # triangle
            pts = np.random.randint(4, img_size-4, (3, 2))
            # Simple triangle fill (approximate)
            for i in range(img_size):
                for j in range(img_size):
                    # Check if point is inside triangle (simplified)
                    if np.random.random() > 0.7:  # Simplified for speed
                        img[i, j] = 0.5
        
        images.append(img)
    
    return np.array(images)[:, np.newaxis, :, :]

# Create dataset
print("Creating geometric shapes dataset...")
shape_data = create_simple_dataset(n_samples=2000)
print(f"Dataset shape: {shape_data.shape}")

# Visualize some samples
fig, axes = plt.subplots(2, 5, figsize=(12, 5))
for i, ax in enumerate(axes.flat):
    ax.imshow(shape_data[i, 0], cmap='gray')
    ax.axis('off')
plt.suptitle('Sample Geometric Shapes', fontsize=14)
plt.show()

# Quick training (simplified for demonstration)
def train_image_diffusion(model, images, n_epochs=50, batch_size=32, lr=1e-3):
    """Train diffusion model on images"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    # Prepare data
    images_tensor = torch.tensor(images, dtype=torch.float32, device=device)
    
    # Noise schedule
    n_timesteps = 1000
    beta = torch.linspace(1e-4, 0.02, n_timesteps, device=device)
    alpha = 1 - beta
    alpha_bar = torch.cumprod(alpha, dim=0)
    
    losses = []
    model.train()
    
    for epoch in tqdm(range(n_epochs), desc='Training'):
        epoch_losses = []
        
        for i in range(0, len(images_tensor), batch_size):
            batch = images_tensor[i:i+batch_size]
            batch_size_actual = len(batch)
            
            # Random timesteps
            t = torch.randint(0, n_timesteps, (batch_size_actual,), device=device)
            
            # Add noise
            noise = torch.randn_like(batch)
            noisy_images = (torch.sqrt(alpha_bar[t])[:, None, None, None] * batch + 
                          torch.sqrt(1 - alpha_bar[t])[:, None, None, None] * noise)
            
            # Predict noise
            noise_pred = model(noisy_images, t.float())
            
            # Loss
            loss = F.mse_loss(noise_pred, noise)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            epoch_losses.append(loss.item())
        
        losses.append(np.mean(epoch_losses))
    
    return losses

# Train the model (reduced epochs for demonstration)
unet_model = MiniUNet(in_channels=1, base_channels=16)  # Smaller model for speed
print("\nTraining U-Net on geometric shapes...")
losses = train_image_diffusion(unet_model, shape_data, n_epochs=30)

# Plot loss
plt.figure(figsize=(8, 4))
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss for Image Diffusion')
plt.grid(True, alpha=0.3)
plt.show()

---
# Part 4: Advanced Techniques 🚀

Let's explore advanced techniques that make modern diffusion models powerful.

## Exercise 9: DDIM - Fast Deterministic Sampling

### 📖 Concept
DDIM (Denoising Diffusion Implicit Models) allows:
- **Deterministic sampling**: Same noise → same output
- **Faster generation**: Skip timesteps (e.g., 50 steps instead of 1000)
- **Interpolation**: Smooth transitions between samples

The DDIM update rule:
$$x_{t-1} = \sqrt{\bar{\alpha}_{t-1}} \cdot \hat{x}_0 + \sqrt{1-\bar{\alpha}_{t-1}} \cdot \hat{\epsilon}_t$$

### 💻 Implementation

In [None]:
@torch.no_grad()
def ddim_sample(model, image_size=32, n_samples=4, n_steps=50, eta=0.0, device='cpu'):
    """DDIM sampling - deterministic and fast
    Args:
        model: Trained diffusion model
        n_steps: Number of denoising steps (can be < training timesteps)
        eta: Interpolation parameter (0=deterministic, 1=DDPM)
    """
    model.eval()
    
    # Setup
    n_training_steps = 1000
    beta = torch.linspace(1e-4, 0.02, n_training_steps, device=device)
    alpha = 1 - beta
    alpha_bar = torch.cumprod(alpha, dim=0)
    
    # Select subset of timesteps
    timesteps = torch.linspace(0, n_training_steps - 1, n_steps, dtype=torch.long, device=device)
    
    # Start from noise
    x = torch.randn(n_samples, 1, image_size, image_size, device=device)
    
    # Store trajectory
    trajectory = [x.cpu()]
    
    for i in tqdm(reversed(range(len(timesteps))), desc='DDIM Sampling'):
        t = timesteps[i]
        t_prev = timesteps[i - 1] if i > 0 else 0
        
        # Current and previous alpha values
        alpha_t = alpha_bar[t]
        alpha_prev = alpha_bar[t_prev] if i > 0 else torch.tensor(1.0)
        
        # Predict noise
        t_batch = torch.full((n_samples,), t, device=device, dtype=torch.float32)
        noise_pred = model(x, t_batch)
        
        # Predict x_0
        x0_pred = (x - torch.sqrt(1 - alpha_t) * noise_pred) / torch.sqrt(alpha_t)
        
        # DDIM update
        sigma = eta * torch.sqrt((1 - alpha_prev) / (1 - alpha_t)) * torch.sqrt(1 - alpha_t / alpha_prev)
        noise = torch.randn_like(x) if i > 0 else 0
        
        x = (torch.sqrt(alpha_prev) * x0_pred + 
             torch.sqrt(1 - alpha_prev - sigma**2) * noise_pred + 
             sigma * noise)
        
        if i % 10 == 0:
            trajectory.append(x.cpu())
    
    return x.cpu(), trajectory

# Compare DDPM vs DDIM
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
unet_model = unet_model.to(device)

print("Generating with DDIM (50 steps, deterministic)...")
ddim_samples, ddim_traj = ddim_sample(unet_model, n_samples=8, n_steps=50, eta=0.0, device=device)

print("Generating with DDIM (50 steps, stochastic)...")
ddim_stoch_samples, _ = ddim_sample(unet_model, n_samples=8, n_steps=50, eta=1.0, device=device)

# Visualize results
fig, axes = plt.subplots(2, 8, figsize=(16, 4))

for i in range(8):
    axes[0, i].imshow(ddim_samples[i, 0], cmap='gray')
    axes[0, i].axis('off')
    axes[0, i].set_title(f'Det. {i+1}')
    
    axes[1, i].imshow(ddim_stoch_samples[i, 0], cmap='gray')
    axes[1, i].axis('off')
    axes[1, i].set_title(f'Stoch. {i+1}')

plt.suptitle('DDIM Sampling: Deterministic (η=0) vs Stochastic (η=1)', fontsize=14)
plt.tight_layout()
plt.show()

print("\n💡 Key Insight: DDIM enables fast sampling with controllable stochasticity.")

## Exercise 10: Classifier-Free Guidance

### 📖 Concept
Classifier-free guidance improves conditional generation without needing a separate classifier:
$$\tilde{\epsilon}_\theta = \epsilon_\theta(x_t, \emptyset) + w \cdot (\epsilon_\theta(x_t, c) - \epsilon_\theta(x_t, \emptyset))$$

Where:
- $\epsilon_\theta(x_t, c)$: Conditional prediction
- $\epsilon_\theta(x_t, \emptyset)$: Unconditional prediction
- $w$: Guidance scale (higher = stronger conditioning)

### 💻 Implementation

In [None]:
class ConditionalDiffusion(nn.Module):
    """Simple conditional diffusion model for demonstration"""
    
    def __init__(self, input_dim=2, n_classes=3, hidden_dim=128, time_dim=32):
        super().__init__()
        
        # Class embedding
        self.class_embed = nn.Embedding(n_classes + 1, hidden_dim)  # +1 for unconditional
        
        # Time embedding
        self.time_embed = nn.Sequential(
            nn.Linear(1, time_dim),
            nn.ReLU(),
            nn.Linear(time_dim, time_dim)
        )
        
        # Main network
        self.net = nn.Sequential(
            nn.Linear(input_dim + hidden_dim + time_dim, hidden_dim * 2),
            nn.ReLU(),
            nn.Linear(hidden_dim * 2, hidden_dim * 2),
            nn.ReLU(),
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )
    
    def forward(self, x, t, c=None):
        """Forward pass
        Args:
            x: Noisy data
            t: Timestep
            c: Class label (None for unconditional)
        """
        # Time embedding
        t_emb = self.time_embed(t.unsqueeze(-1))
        
        # Class embedding (use last index for unconditional)
        if c is None:
            c = torch.full((x.shape[0],), 3, device=x.device, dtype=torch.long)
        c_emb = self.class_embed(c)
        
        # Concatenate and predict
        h = torch.cat([x, c_emb, t_emb], dim=-1)
        return self.net(h)

def generate_class_data(n_samples=1000, n_classes=3):
    """Generate 2D data with different classes"""
    data = []
    labels = []
    
    samples_per_class = n_samples // n_classes
    
    for class_idx in range(n_classes):
        if class_idx == 0:  # Circle
            theta = np.random.uniform(0, 2*np.pi, samples_per_class)
            r = 1.0 + np.random.randn(samples_per_class) * 0.1
            x = r * np.cos(theta)
            y = r * np.sin(theta)
        elif class_idx == 1:  # Square
            x = np.random.uniform(-1, 1, samples_per_class)
            y = np.random.uniform(-1, 1, samples_per_class)
        else:  # Gaussian blob
            x = np.random.randn(samples_per_class) * 0.3
            y = np.random.randn(samples_per_class) * 0.3
        
        data.append(np.column_stack([x, y]))
        labels.append(np.full(samples_per_class, class_idx))
    
    return np.vstack(data), np.concatenate(labels)

# Generate conditional data
X_cond, y_cond = generate_class_data(n_samples=3000)

# Visualize classes
fig, axes = plt.subplots(1, 3, figsize=(12, 4))
class_names = ['Circle', 'Square', 'Gaussian']

for i in range(3):
    mask = y_cond == i
    axes[i].scatter(X_cond[mask, 0], X_cond[mask, 1], s=5, alpha=0.5)
    axes[i].set_title(f'Class {i}: {class_names[i]}')
    axes[i].set_xlim(-2, 2)
    axes[i].set_ylim(-2, 2)
    axes[i].set_aspect('equal')

plt.suptitle('Conditional Data Classes', fontsize=14)
plt.tight_layout()
plt.show()

# Classifier-free guidance sampling
@torch.no_grad()
def sample_with_guidance(model, class_label, guidance_scale=2.0, n_samples=100, device='cpu'):
    """Sample with classifier-free guidance"""
    model.eval()
    
    # Start from noise
    x = torch.randn(n_samples, 2, device=device)
    
    # Sampling loop (simplified)
    n_steps = 100
    for t in tqdm(reversed(range(n_steps)), desc=f'Sampling class {class_label}'):
        t_batch = torch.full((n_samples,), t / n_steps, device=device)
        
        # Conditional and unconditional predictions
        c = torch.full((n_samples,), class_label, device=device, dtype=torch.long)
        eps_cond = model(x, t_batch, c)
        eps_uncond = model(x, t_batch, None)
        
        # Apply guidance
        eps = eps_uncond + guidance_scale * (eps_cond - eps_uncond)
        
        # Simple denoising step
        x = x - 0.01 * eps
    
    return x.cpu().numpy()

print("\n💡 Key Insight: Classifier-free guidance amplifies the conditional signal")
print("   by extrapolating away from the unconditional prediction.")

### 🎯 Your Turn!
1. Train the conditional diffusion model on the class data
2. Sample from each class with different guidance scales (w=0, 1, 2, 5)
3. Observe how guidance strength affects sample quality and diversity

---
# Summary and Key Takeaways 📝

## What We've Learned

### 1. **Mathematical Foundations**
- Forward process gradually adds Gaussian noise
- Reparameterization trick enables efficient training
- Score function connects to denoising

### 2. **Core Algorithms**
- **DDPM**: Original stochastic sampling
- **DDIM**: Deterministic and fast sampling
- Training objective: predict the added noise

### 3. **Architecture**
- U-Net with skip connections
- Time embeddings condition the network
- Attention mechanisms for long-range dependencies

### 4. **Advanced Techniques**
- Classifier-free guidance for better conditioning
- Latent diffusion for efficiency (Stable Diffusion)
- Various noise schedules (linear, cosine)

## Key Insights

1. **Gradual is Good**: Small denoising steps are easier to learn than direct generation
2. **Flexibility**: Same model can be sampled deterministically or stochastically
3. **Scalability**: Works from toy 2D data to high-resolution images
4. **Control**: Conditioning mechanisms enable precise control over generation

## Applications

- **Text-to-Image**: DALL-E 2, Stable Diffusion, Midjourney
- **Image Editing**: Inpainting, style transfer, super-resolution
- **3D Generation**: DreamFusion, Point-E
- **Audio/Video**: AudioLDM, Video diffusion models
- **Science**: Molecular design, protein folding

## Next Steps

1. **Experiment** with different architectures and datasets
2. **Implement** advanced sampling techniques (DPM-Solver, PNDM)
3. **Explore** conditioning mechanisms (ControlNet, IP-Adapter)
4. **Apply** to your specific domain or problem

## Resources for Further Learning

- [The Annotated Diffusion Model](https://huggingface.co/blog/annotated-diffusion)
- [Understanding Diffusion Models: A Unified Perspective](https://arxiv.org/abs/2208.11970)
- [Diffusers Library by HuggingFace](https://github.com/huggingface/diffusers)
- Original Papers: [DDPM](https://arxiv.org/abs/2006.11239), [DDIM](https://arxiv.org/abs/2010.02502), [Stable Diffusion](https://arxiv.org/abs/2112.10752)

## 🏆 Final Challenge

Create your own diffusion model for a specific task:
1. Choose a dataset (e.g., MNIST digits, simple patterns, time series)
2. Design an appropriate architecture
3. Train with different noise schedules
4. Implement both DDPM and DDIM sampling
5. Add conditioning (class labels, text, etc.)
6. Evaluate quality vs diversity trade-offs

Share your results and learnings!

In [None]:
# Your code here!
# Start by implementing a diffusion model for your chosen task
