# Diffusion Model Fundamentals

This notebook introduces the core concepts of diffusion models for time series:

1. Forward diffusion process (adding noise)
2. Reverse diffusion process (denoising)
3. Noise schedules (linear, cosine, sigmoid)
4. Loss functions and training objectives
5. Simple 1D example with financial data

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 1. The Intuition Behind Diffusion Models

Diffusion models work by:
1. **Forward process**: Gradually add noise to data until it becomes pure Gaussian noise
2. **Reverse process**: Learn to undo each noise step, recovering original data

The key insight is that it's easier to learn many small denoising steps than to generate data in one shot.

In [None]:
# Generate sample financial time series
def generate_sample_returns(n_samples=1000, n_timesteps=100):
    """Generate synthetic stock returns with realistic properties."""
    # Parameters
    mu = 0.0001  # Daily drift
    sigma = 0.02  # Daily volatility
    
    # Generate returns with volatility clustering (GARCH-like)
    returns = np.zeros((n_samples, n_timesteps))
    for i in range(n_samples):
        vol = sigma
        for t in range(n_timesteps):
            # Simple volatility clustering
            if t > 0:
                vol = 0.9 * vol + 0.1 * sigma * (1 + abs(returns[i, t-1]) / sigma)
            returns[i, t] = mu + vol * np.random.randn()
    
    return returns

# Generate data
returns = generate_sample_returns(n_samples=5000, n_timesteps=50)
print(f"Generated returns shape: {returns.shape}")

# Visualize a few samples
fig, axes = plt.subplots(1, 2, figsize=(14, 4))

# Sample trajectories
for i in range(5):
    axes[0].plot(np.cumsum(returns[i]), alpha=0.7, label=f'Sample {i+1}')
axes[0].set_title('Sample Cumulative Returns')
axes[0].set_xlabel('Time')
axes[0].set_ylabel('Cumulative Return')
axes[0].legend()

# Distribution of returns
axes[1].hist(returns.flatten(), bins=100, density=True, alpha=0.7)
axes[1].set_title('Distribution of Returns')
axes[1].set_xlabel('Return')
axes[1].set_ylabel('Density')

plt.tight_layout()
plt.show()

## 2. Forward Diffusion Process

The forward process is defined as:

$$q(x_t | x_{t-1}) = \mathcal{N}(x_t; \sqrt{1-\beta_t} x_{t-1}, \beta_t I)$$

Key property - we can sample $x_t$ directly from $x_0$:

$$q(x_t | x_0) = \mathcal{N}(x_t; \sqrt{\bar{\alpha}_t} x_0, (1-\bar{\alpha}_t) I)$$

where $\bar{\alpha}_t = \prod_{s=1}^{t} (1-\beta_s)$

In [None]:
class NoiseSchedule:
    """Different noise schedules for diffusion models."""
    
    def __init__(self, num_timesteps=1000, schedule_type='linear'):
        self.num_timesteps = num_timesteps
        self.schedule_type = schedule_type
        
        # Compute betas based on schedule type
        if schedule_type == 'linear':
            self.betas = self._linear_schedule()
        elif schedule_type == 'cosine':
            self.betas = self._cosine_schedule()
        elif schedule_type == 'sigmoid':
            self.betas = self._sigmoid_schedule()
        else:
            raise ValueError(f"Unknown schedule: {schedule_type}")
        
        # Compute derived quantities
        self.alphas = 1.0 - self.betas
        self.alphas_cumprod = np.cumprod(self.alphas)
        self.sqrt_alphas_cumprod = np.sqrt(self.alphas_cumprod)
        self.sqrt_one_minus_alphas_cumprod = np.sqrt(1.0 - self.alphas_cumprod)
    
    def _linear_schedule(self, beta_start=0.0001, beta_end=0.02):
        """Linear noise schedule."""
        return np.linspace(beta_start, beta_end, self.num_timesteps)
    
    def _cosine_schedule(self, s=0.008):
        """Cosine noise schedule (better for smaller sequences)."""
        steps = self.num_timesteps + 1
        t = np.linspace(0, self.num_timesteps, steps) / self.num_timesteps
        alphas_cumprod = np.cos((t + s) / (1 + s) * np.pi / 2) ** 2
        alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
        betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
        return np.clip(betas, 0.0001, 0.9999)
    
    def _sigmoid_schedule(self, beta_start=0.0001, beta_end=0.02):
        """Sigmoid noise schedule."""
        t = np.linspace(-6, 6, self.num_timesteps)
        sigmoid = 1 / (1 + np.exp(-t))
        return sigmoid * (beta_end - beta_start) + beta_start
    
    def add_noise(self, x_0, t, noise=None):
        """Add noise to data at timestep t."""
        if noise is None:
            noise = np.random.randn(*x_0.shape)
        
        sqrt_alpha = self.sqrt_alphas_cumprod[t]
        sqrt_one_minus_alpha = self.sqrt_one_minus_alphas_cumprod[t]
        
        return sqrt_alpha * x_0 + sqrt_one_minus_alpha * noise

In [None]:
# Compare different noise schedules
schedules = {
    'linear': NoiseSchedule(1000, 'linear'),
    'cosine': NoiseSchedule(1000, 'cosine'),
    'sigmoid': NoiseSchedule(1000, 'sigmoid')
}

fig, axes = plt.subplots(1, 3, figsize=(15, 4))

# Plot betas
for name, schedule in schedules.items():
    axes[0].plot(schedule.betas, label=name)
axes[0].set_title('Noise Schedule (β_t)')
axes[0].set_xlabel('Timestep')
axes[0].set_ylabel('β')
axes[0].legend()

# Plot cumulative alphas
for name, schedule in schedules.items():
    axes[1].plot(schedule.alphas_cumprod, label=name)
axes[1].set_title('Cumulative Alpha (ᾱ_t)')
axes[1].set_xlabel('Timestep')
axes[1].set_ylabel('ᾱ')
axes[1].legend()

# Plot signal-to-noise ratio
for name, schedule in schedules.items():
    snr = schedule.alphas_cumprod / (1 - schedule.alphas_cumprod)
    axes[2].plot(np.log(snr), label=name)
axes[2].set_title('Log Signal-to-Noise Ratio')
axes[2].set_xlabel('Timestep')
axes[2].set_ylabel('log(SNR)')
axes[2].legend()

plt.tight_layout()
plt.show()

In [None]:
# Visualize the forward diffusion process
schedule = NoiseSchedule(1000, 'cosine')

# Take one sample return series
x_0 = returns[0]

# Show noising at different timesteps
timesteps = [0, 100, 250, 500, 750, 999]

fig, axes = plt.subplots(2, 3, figsize=(15, 8))
axes = axes.flatten()

for i, t in enumerate(timesteps):
    if t == 0:
        x_t = x_0
    else:
        x_t = schedule.add_noise(x_0, t)
    
    axes[i].plot(np.cumsum(x_t), color='blue', alpha=0.8)
    axes[i].set_title(f't = {t} (SNR = {schedule.alphas_cumprod[t]/(1-schedule.alphas_cumprod[t]+1e-8):.2f})')
    axes[i].set_xlabel('Time')
    axes[i].set_ylabel('Cumulative Return')
    axes[i].axhline(y=0, color='gray', linestyle='--', alpha=0.5)

plt.suptitle('Forward Diffusion Process: Adding Noise', fontsize=14)
plt.tight_layout()
plt.show()

## 3. Reverse Diffusion Process (Denoising)

The reverse process learns to denoise:

$$p_\theta(x_{t-1} | x_t) = \mathcal{N}(x_{t-1}; \mu_\theta(x_t, t), \Sigma_\theta(x_t, t))$$

We train a neural network to predict the noise $\epsilon_\theta(x_t, t)$ added at each step.

In [None]:
class SinusoidalPositionEmbeddings(nn.Module):
    """Sinusoidal embeddings for timestep encoding."""
    
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
    
    def forward(self, time):
        device = time.device
        half_dim = self.dim // 2
        embeddings = np.log(10000) / (half_dim - 1)
        embeddings = torch.exp(torch.arange(half_dim, device=device) * -embeddings)
        embeddings = time[:, None] * embeddings[None, :]
        embeddings = torch.cat([torch.sin(embeddings), torch.cos(embeddings)], dim=-1)
        return embeddings


class SimpleDenoiser(nn.Module):
    """Simple MLP-based denoiser for 1D time series."""
    
    def __init__(self, seq_length, hidden_dim=256, time_emb_dim=64):
        super().__init__()
        self.seq_length = seq_length
        
        # Time embedding
        self.time_mlp = nn.Sequential(
            SinusoidalPositionEmbeddings(time_emb_dim),
            nn.Linear(time_emb_dim, hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, hidden_dim)
        )
        
        # Main network
        self.net = nn.Sequential(
            nn.Linear(seq_length + hidden_dim, hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, seq_length)
        )
    
    def forward(self, x, t):
        """Predict noise given noisy input x and timestep t."""
        # Get time embedding
        t_emb = self.time_mlp(t)
        
        # Concatenate input with time embedding
        x_combined = torch.cat([x, t_emb], dim=-1)
        
        # Predict noise
        return self.net(x_combined)

## 4. Training the Diffusion Model

Training objective (simplified):

$$\mathcal{L} = \mathbb{E}_{t, x_0, \epsilon} \left[ \| \epsilon - \epsilon_\theta(x_t, t) \|^2 \right]$$

Algorithm:
1. Sample $x_0$ from data
2. Sample $t \sim \text{Uniform}(1, T)$
3. Sample $\epsilon \sim \mathcal{N}(0, I)$
4. Compute $x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1-\bar{\alpha}_t} \epsilon$
5. Minimize $\| \epsilon - \epsilon_\theta(x_t, t) \|^2$

In [None]:
class DiffusionTrainer:
    """Training loop for diffusion models."""
    
    def __init__(self, model, schedule, lr=1e-4):
        self.model = model
        self.schedule = schedule
        self.optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        
        # Convert schedule to tensors
        self.sqrt_alphas_cumprod = torch.tensor(
            schedule.sqrt_alphas_cumprod, dtype=torch.float32
        ).to(device)
        self.sqrt_one_minus_alphas_cumprod = torch.tensor(
            schedule.sqrt_one_minus_alphas_cumprod, dtype=torch.float32
        ).to(device)
    
    def get_noisy_sample(self, x_0, t, noise=None):
        """Add noise to sample at timestep t."""
        if noise is None:
            noise = torch.randn_like(x_0)
        
        sqrt_alpha = self.sqrt_alphas_cumprod[t][:, None]
        sqrt_one_minus_alpha = self.sqrt_one_minus_alphas_cumprod[t][:, None]
        
        return sqrt_alpha * x_0 + sqrt_one_minus_alpha * noise
    
    def train_step(self, x_0):
        """Single training step."""
        batch_size = x_0.shape[0]
        
        # Sample random timesteps
        t = torch.randint(0, self.schedule.num_timesteps, (batch_size,), device=device)
        
        # Sample noise
        noise = torch.randn_like(x_0)
        
        # Get noisy samples
        x_t = self.get_noisy_sample(x_0, t, noise)
        
        # Predict noise
        noise_pred = self.model(x_t, t.float())
        
        # Compute loss
        loss = F.mse_loss(noise_pred, noise)
        
        # Backprop
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
    def train(self, dataloader, epochs=100):
        """Full training loop."""
        losses = []
        
        for epoch in tqdm(range(epochs), desc="Training"):
            epoch_losses = []
            for batch in dataloader:
                x_0 = batch[0].to(device)
                loss = self.train_step(x_0)
                epoch_losses.append(loss)
            
            avg_loss = np.mean(epoch_losses)
            losses.append(avg_loss)
            
            if (epoch + 1) % 20 == 0:
                print(f"Epoch {epoch+1}: Loss = {avg_loss:.6f}")
        
        return losses

In [None]:
# Prepare data
X = torch.tensor(returns, dtype=torch.float32)
dataset = TensorDataset(X)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# Initialize model and trainer
seq_length = returns.shape[1]
model = SimpleDenoiser(seq_length, hidden_dim=256, time_emb_dim=64).to(device)
schedule = NoiseSchedule(1000, 'cosine')
trainer = DiffusionTrainer(model, schedule, lr=1e-4)

print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

In [None]:
# Train the model
losses = trainer.train(dataloader, epochs=100)

# Plot training loss
plt.figure(figsize=(10, 4))
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.yscale('log')
plt.grid(True, alpha=0.3)
plt.show()

## 5. Sampling from the Diffusion Model

To generate new samples, we start from pure noise and iteratively denoise:

$$x_{t-1} = \frac{1}{\sqrt{\alpha_t}} \left( x_t - \frac{\beta_t}{\sqrt{1-\bar{\alpha}_t}} \epsilon_\theta(x_t, t) \right) + \sigma_t z$$

where $z \sim \mathcal{N}(0, I)$ and $\sigma_t = \sqrt{\beta_t}$

In [None]:
@torch.no_grad()
def sample(model, schedule, n_samples, seq_length, device):
    """Generate samples using DDPM sampling."""
    model.eval()
    
    # Convert schedule parameters to tensors
    betas = torch.tensor(schedule.betas, dtype=torch.float32, device=device)
    alphas = torch.tensor(schedule.alphas, dtype=torch.float32, device=device)
    alphas_cumprod = torch.tensor(schedule.alphas_cumprod, dtype=torch.float32, device=device)
    
    # Start from pure noise
    x = torch.randn(n_samples, seq_length, device=device)
    
    # Iteratively denoise
    for t in tqdm(reversed(range(schedule.num_timesteps)), desc="Sampling", total=schedule.num_timesteps):
        t_tensor = torch.full((n_samples,), t, device=device, dtype=torch.float32)
        
        # Predict noise
        noise_pred = model(x, t_tensor)
        
        # Compute coefficients
        alpha = alphas[t]
        alpha_cumprod = alphas_cumprod[t]
        beta = betas[t]
        
        # Denoise
        x = (1 / torch.sqrt(alpha)) * (x - (beta / torch.sqrt(1 - alpha_cumprod)) * noise_pred)
        
        # Add noise (except for last step)
        if t > 0:
            noise = torch.randn_like(x)
            x = x + torch.sqrt(beta) * noise
    
    return x.cpu().numpy()

In [None]:
# Generate samples
generated_samples = sample(model, schedule, n_samples=100, seq_length=seq_length, device=device)
print(f"Generated samples shape: {generated_samples.shape}")

In [None]:
# Compare real vs generated samples
fig, axes = plt.subplots(2, 3, figsize=(15, 8))

# Real samples - trajectories
for i in range(5):
    axes[0, 0].plot(np.cumsum(returns[i]), alpha=0.7)
axes[0, 0].set_title('Real: Sample Trajectories')
axes[0, 0].set_xlabel('Time')
axes[0, 0].set_ylabel('Cumulative Return')

# Generated samples - trajectories
for i in range(5):
    axes[0, 1].plot(np.cumsum(generated_samples[i]), alpha=0.7)
axes[0, 1].set_title('Generated: Sample Trajectories')
axes[0, 1].set_xlabel('Time')
axes[0, 1].set_ylabel('Cumulative Return')

# Distribution comparison
axes[0, 2].hist(returns.flatten(), bins=100, density=True, alpha=0.5, label='Real')
axes[0, 2].hist(generated_samples.flatten(), bins=100, density=True, alpha=0.5, label='Generated')
axes[0, 2].set_title('Return Distribution')
axes[0, 2].legend()

# Autocorrelation comparison
def compute_autocorr(data, max_lag=20):
    autocorrs = []
    for lag in range(max_lag):
        if lag == 0:
            autocorrs.append(1.0)
        else:
            corr = np.corrcoef(data[:-lag].flatten(), data[lag:].flatten())[0, 1]
            autocorrs.append(corr)
    return autocorrs

real_acf = compute_autocorr(returns)
gen_acf = compute_autocorr(generated_samples)

axes[1, 0].bar(range(20), real_acf, alpha=0.5, label='Real')
axes[1, 0].bar(range(20), gen_acf, alpha=0.5, label='Generated')
axes[1, 0].set_title('Autocorrelation')
axes[1, 0].set_xlabel('Lag')
axes[1, 0].legend()

# Volatility comparison (rolling std)
real_vol = np.std(returns, axis=1)
gen_vol = np.std(generated_samples, axis=1)

axes[1, 1].hist(real_vol, bins=50, density=True, alpha=0.5, label='Real')
axes[1, 1].hist(gen_vol, bins=50, density=True, alpha=0.5, label='Generated')
axes[1, 1].set_title('Volatility Distribution')
axes[1, 1].legend()

# QQ plot
from scipy import stats
real_sorted = np.sort(returns.flatten())
gen_sorted = np.sort(generated_samples.flatten())
# Subsample for QQ plot
idx = np.linspace(0, len(real_sorted)-1, 1000).astype(int)
axes[1, 2].scatter(real_sorted[idx], gen_sorted[idx], alpha=0.3, s=10)
axes[1, 2].plot([real_sorted.min(), real_sorted.max()], 
                [real_sorted.min(), real_sorted.max()], 'r--')
axes[1, 2].set_title('Q-Q Plot')
axes[1, 2].set_xlabel('Real Quantiles')
axes[1, 2].set_ylabel('Generated Quantiles')

plt.tight_layout()
plt.show()

## 6. Key Takeaways

1. **Diffusion models** learn to generate data by reversing a gradual noising process
2. **Forward process** adds Gaussian noise according to a schedule (linear, cosine, sigmoid)
3. **Reverse process** learns to denoise using a neural network that predicts the added noise
4. **Training** minimizes the MSE between predicted and actual noise
5. **Sampling** iteratively denoises from pure noise to generate new samples

### Advantages over GANs:
- More stable training (no adversarial dynamics)
- Better mode coverage
- Natural uncertainty quantification

### Limitations:
- Slow sampling (1000 denoising steps)
- Computationally expensive

### Next Steps:
- See notebook 02 for full DDPM implementation with U-Net
- See notebook 03 for TimeGrad applied to cryptocurrency forecasting