# Homework: DDPM for Covariance Shrinkage in Portfolio Optimization

## Overview

In this homework, you will implement a **Denoising Diffusion Probabilistic Model (DDPM)** and use it for covariance matrix estimation in portfolio optimization. You will compare three covariance estimation methods:

1. **Empirical Covariance**: Sample covariance (baseline)
2. **Ledoit-Wolf Shrinkage**: Classic shrinkage estimator
3. **DDPM Diffusion**: Train a diffusion model, generate samples, compute covariance

For each method, we compute both **Minimum Variance (MVP)** and **Maximum Sharpe** portfolios.

## Learning Objectives

By completing this homework, you will:
- Understand the forward and reverse diffusion processes in DDPM
- Implement key components of a diffusion model
- Apply diffusion models to financial covariance estimation
- Compare different covariance estimators in a portfolio optimization context

## Instructions

- Fill in the code where you see `### YOUR CODE HERE ###`
- Answer the conceptual and interpretation questions in the markdown cells
- Run all cells to verify your implementation
- The backtest may take 10-30 minutes depending on your hardware

## Grading Rubric

| Section | Points |
|---------|--------|
| Exercise 1 (ScoreNet) | 10 |
| Exercise 2 (add_noise) | 15 |
| Exercise 3 (sample) | 15 |
| Exercise 4 (MVP weights) | 10 |
| Exercise 5 (MaxSharpe) | 10 |
| Exercise 6 (DiffusionCov) | 15 |
| Conceptual Q1-Q3 | 10 |
| Interpretation Q4-Q7 | 15 |
| **Total** | **100** |

---

## Part 1: Setup and Data Loading

Run the following cells to set up the environment and load the ETF data.

In [None]:
%matplotlib inline

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from typing import Tuple, Optional

# PyTorch
import torch
import torch.nn as nn

# Sklearn covariance estimators
from sklearn.covariance import LedoitWolf, EmpiricalCovariance

# Scipy for portfolio optimization
from scipy.optimize import minimize

# Plot settings
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

# Device setup
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
print(f"PyTorch version: {torch.__version__}")

In [None]:
# Load and clean ETF data
df_raw = pd.read_csv('etf_db.csv')
print(f"Raw data shape: {df_raw.shape}")
print(f"ETFs in dataset: {df_raw['etf'].unique().tolist()}")

# Select ETFs (excluding SPY as benchmark)
etf_names = ['XLK', 'XLI', 'XLV', 'XLE', 'XLP', 'XLY', 'XTL', 'XLRE', 'XLB', 'XLF', 'XLU']
df_raw = df_raw[df_raw['etf'].isin(etf_names)].copy()

# Parse dates and clean data
df_raw['Date'] = pd.to_datetime(df_raw['Date'])
df_raw = df_raw[df_raw['Date'].dt.dayofweek < 5]  # Remove weekends

# Remove March 2020 (COVID crash)
march_2020_mask = (df_raw['Date'].dt.year == 2020) & (df_raw['Date'].dt.month == 3)
df_raw = df_raw[~march_2020_mask]

# Compute returns
df_raw = df_raw.sort_values(['etf', 'Date']).reset_index(drop=True)
df_raw['pct_return'] = df_raw.groupby('etf')['Adj Close'].transform(lambda x: x.pct_change())
df_raw = df_raw.dropna(subset=['pct_return'])

# Pivot to returns matrix
returns_df = df_raw.pivot(index='Date', columns='etf', values='pct_return')
returns_df = returns_df.sort_index().dropna()

print(f"\nReturns matrix shape: {returns_df.shape}")
print(f"Date range: {returns_df.index.min().date()} to {returns_df.index.max().date()}")
print(f"Number of assets: {returns_df.shape[1]}")

---

## Part 2: Understanding Diffusion Models (Conceptual Questions)

Before implementing, let's make sure you understand the key concepts behind DDPM.

### Question 1 (3 points)

In the **forward diffusion process** of DDPM, which of the following is TRUE?

A) We gradually remove noise from data to recover the original signal  
B) We gradually add Gaussian noise to data until it becomes pure noise  
C) We train a neural network to predict the next timestep  
D) We use a deterministic transformation to encode data  

**Your Answer:** [Write A, B, C, or D]

**Explanation (1-2 sentences):**

[Your explanation here]

### Question 2 (3 points)

The noise schedule in DDPM is defined by $\beta_t$ values that typically increase from $\beta_1 \approx 10^{-4}$ to $\beta_T \approx 0.02$.

**Why do we use small $\beta_t$ values at the beginning and larger values at the end?**

[Your answer here - 2-3 sentences]

### Question 3 (4 points)

In DDPM, the neural network is trained to predict the **noise** $\epsilon$ that was added, rather than predicting the clean data $x_0$ directly.

**What is the advantage of predicting noise instead of the clean data?**

[Your answer here - 2-3 sentences]

---

## Part 3: Implement the Score Network

The ScoreNet is a neural network that predicts the noise added at each timestep. It takes:
- `x`: The noisy data at timestep t
- `t`: The timestep

And outputs the predicted noise.

### Exercise 1 (10 points)

Complete the `forward()` method of the ScoreNet class.

**Steps:**
1. Normalize the timestep `t` by dividing by 1000.0 and add a dimension
2. Pass the normalized timestep through `self.time_embed` to get the time embedding
3. Concatenate `x` and the time embedding along the last dimension
4. Pass the concatenated tensor through `self.net`

In [None]:
class ScoreNet(nn.Module):
    """
    Simple MLP for noise prediction in DDPM.
    
    Takes noisy data x_t and timestep t, predicts the noise that was added.
    """
    
    def __init__(self, input_dim: int = 2, hidden_dim: int = 128, time_dim: int = 32):
        super().__init__()
        
        # Time embedding network
        self.time_embed = nn.Sequential(
            nn.Linear(1, time_dim),
            nn.ReLU(),
            nn.Linear(time_dim, time_dim)
        )
        
        # Main network: takes concatenated [x, time_embedding]
        self.net = nn.Sequential(
            nn.Linear(input_dim + time_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )
    
    def forward(self, x: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
        """
        Forward pass.
        
        Args:
            x: Noisy data [batch_size, input_dim]
            t: Timesteps [batch_size] (integer values from 0 to T-1)
            
        Returns:
            Predicted noise [batch_size, input_dim]
        """
        ### YOUR CODE HERE ###
        # Step 1: Normalize timestep and add dimension for the linear layer
        # Hint: t is shape [batch_size], we need [batch_size, 1]
        t_normalized = None  # TODO
        
        # Step 2: Get time embedding
        t_emb = None  # TODO
        
        # Step 3: Concatenate x and time embedding
        # Hint: use torch.cat with dim=-1
        x_with_time = None  # TODO
        
        # Step 4: Pass through main network
        output = None  # TODO
        
        return output
        ### END YOUR CODE ###

In [None]:
# Test your ScoreNet implementation
test_model = ScoreNet(input_dim=11, hidden_dim=64, time_dim=16).to(device)
test_x = torch.randn(32, 11).to(device)
test_t = torch.randint(0, 1000, (32,)).to(device)

try:
    test_output = test_model(test_x, test_t)
    assert test_output.shape == (32, 11), f"Expected shape (32, 11), got {test_output.shape}"
    print("ScoreNet test PASSED!")
    print(f"Output shape: {test_output.shape}")
except Exception as e:
    print(f"ScoreNet test FAILED: {e}")

---

## Part 4: Implement the DDPM Forward Process

The forward process gradually adds noise to the data according to:

$$q(x_t | x_0) = \mathcal{N}(x_t; \sqrt{\bar{\alpha}_t} x_0, (1 - \bar{\alpha}_t) I)$$

This can be reparameterized as:

$$x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon$$

where $\epsilon \sim \mathcal{N}(0, I)$ and $\bar{\alpha}_t = \prod_{s=1}^{t} \alpha_s = \prod_{s=1}^{t} (1 - \beta_s)$

### Exercise 2 (15 points)

Complete the `add_noise()` method of the DDPM class.

**Steps:**
1. Sample random noise $\epsilon$ with the same shape as $x_0$
2. Get $\sqrt{\bar{\alpha}_t}$ for the given timesteps (already precomputed)
3. Get $\sqrt{1 - \bar{\alpha}_t}$ for the given timesteps
4. Compute $x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon$

**Hint:** Use `unsqueeze(-1)` to add a dimension for broadcasting.

In [None]:
class DDPM:
    """
    Denoising Diffusion Probabilistic Model.
    
    Implements the forward (noising) and reverse (denoising) diffusion processes.
    """
    
    def __init__(self, timesteps: int = 1000, beta_start: float = 1e-4, beta_end: float = 0.02):
        self.timesteps = timesteps
        
        # Linear noise schedule: beta_t increases linearly from beta_start to beta_end
        self.betas = torch.linspace(beta_start, beta_end, timesteps).to(device)
        
        # alpha_t = 1 - beta_t
        self.alphas = 1.0 - self.betas
        
        # alpha_bar_t = cumulative product of alphas
        self.alpha_cumprod = torch.cumprod(self.alphas, dim=0)
        
        # Precomputed values for the forward process
        self.sqrt_alpha_cumprod = torch.sqrt(self.alpha_cumprod)
        self.sqrt_one_minus_alpha_cumprod = torch.sqrt(1.0 - self.alpha_cumprod)
    
    def add_noise(self, x_0: torch.Tensor, t: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Forward diffusion: add noise to data.
        
        Implements: x_t = sqrt(alpha_bar_t) * x_0 + sqrt(1 - alpha_bar_t) * noise
        
        Args:
            x_0: Clean data [batch_size, dim]
            t: Timesteps [batch_size] (integer indices)
            
        Returns:
            x_t: Noisy data [batch_size, dim]
            noise: The noise that was added [batch_size, dim]
        """
        ### YOUR CODE HERE ###
        # Step 1: Sample random Gaussian noise with same shape as x_0
        noise = None  # TODO
        
        # Step 2: Get sqrt(alpha_bar_t) for the given timesteps
        # Hint: Index into self.sqrt_alpha_cumprod using t, then unsqueeze for broadcasting
        sqrt_alpha_cumprod_t = None  # TODO
        
        # Step 3: Get sqrt(1 - alpha_bar_t) for the given timesteps
        sqrt_one_minus_alpha_cumprod_t = None  # TODO
        
        # Step 4: Compute x_t using the reparameterization
        x_t = None  # TODO
        
        return x_t, noise
        ### END YOUR CODE ###
    
    def sample(self, model: nn.Module, n_samples: int, input_dim: int, 
               show_progress: bool = True) -> torch.Tensor:
        """
        Reverse diffusion: generate samples from noise.
        This method will be completed in Exercise 3.
        """
        # Placeholder - will be implemented in Exercise 3
        pass

In [None]:
# Test your add_noise implementation
test_ddpm = DDPM(timesteps=1000)
test_x0 = torch.randn(32, 11).to(device)
test_t = torch.randint(0, 1000, (32,)).to(device)

try:
    x_t, noise = test_ddpm.add_noise(test_x0, test_t)
    assert x_t.shape == test_x0.shape, f"x_t shape mismatch: {x_t.shape} vs {test_x0.shape}"
    assert noise.shape == test_x0.shape, f"noise shape mismatch: {noise.shape} vs {test_x0.shape}"
    
    # Check that noise is approximately standard normal
    noise_mean = noise.mean().item()
    noise_std = noise.std().item()
    assert abs(noise_mean) < 0.5, f"Noise mean too far from 0: {noise_mean}"
    assert 0.5 < noise_std < 1.5, f"Noise std unexpected: {noise_std}"
    
    print("add_noise test PASSED!")
    print(f"x_t shape: {x_t.shape}")
    print(f"Noise mean: {noise_mean:.4f}, std: {noise_std:.4f}")
except Exception as e:
    print(f"add_noise test FAILED: {e}")

---

## Part 5: Implement the DDPM Reverse Process

The reverse process iteratively denoises from $x_T$ (pure noise) back to $x_0$ (clean data).

At each step, we compute:

$$\mu_\theta(x_t, t) = \frac{1}{\sqrt{\alpha_t}} \left( x_t - \frac{\beta_t}{\sqrt{1 - \bar{\alpha}_t}} \epsilon_\theta(x_t, t) \right)$$

Then sample:
$$x_{t-1} = \mu_\theta(x_t, t) + \sqrt{\beta_t} \cdot z$$

where $z \sim \mathcal{N}(0, I)$ (except for $t=0$ where we don't add noise).

### Exercise 3 (15 points)

Complete the `sample()` method of the DDPM class.

The skeleton is provided. You need to fill in the computation of `x_mean` (the denoised estimate at each step).

In [None]:
class DDPM:
    """
    Denoising Diffusion Probabilistic Model - Complete Implementation.
    """
    
    def __init__(self, timesteps: int = 1000, beta_start: float = 1e-4, beta_end: float = 0.02):
        self.timesteps = timesteps
        
        # Linear noise schedule
        self.betas = torch.linspace(beta_start, beta_end, timesteps).to(device)
        self.alphas = 1.0 - self.betas
        self.alpha_cumprod = torch.cumprod(self.alphas, dim=0)
        
        # Precomputed values
        self.sqrt_alpha_cumprod = torch.sqrt(self.alpha_cumprod)
        self.sqrt_one_minus_alpha_cumprod = torch.sqrt(1.0 - self.alpha_cumprod)
    
    def add_noise(self, x_0: torch.Tensor, t: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """Forward diffusion - copy your implementation from Exercise 2."""
        ### YOUR CODE HERE (copy from Exercise 2) ###
        noise = None  # TODO
        sqrt_alpha_cumprod_t = None  # TODO
        sqrt_one_minus_alpha_cumprod_t = None  # TODO
        x_t = None  # TODO
        return x_t, noise
        ### END YOUR CODE ###
    
    def sample(self, model: nn.Module, n_samples: int, input_dim: int, 
               show_progress: bool = True) -> torch.Tensor:
        """
        Reverse diffusion: generate samples from noise.
        
        Args:
            model: Trained noise prediction model (ScoreNet)
            n_samples: Number of samples to generate
            input_dim: Dimension of each sample
            show_progress: Whether to show progress bar
            
        Returns:
            Generated samples [n_samples, input_dim]
        """
        model.eval()
        
        # Start from pure noise x_T ~ N(0, I)
        x = torch.randn(n_samples, input_dim).to(device)
        
        # Iterate from t = T-1 down to t = 0
        iterator = reversed(range(self.timesteps))
        if show_progress:
            iterator = tqdm(iterator, desc="Sampling", total=self.timesteps)
        
        with torch.no_grad():
            for t in iterator:
                # Create timestep tensor
                t_tensor = torch.full((n_samples,), t, dtype=torch.long).to(device)
                
                # Predict the noise using the model
                predicted_noise = model(x, t_tensor)
                
                # Get the coefficients for this timestep
                alpha_t = self.alphas[t]
                alpha_cumprod_t = self.alpha_cumprod[t]
                beta_t = self.betas[t]
                
                ### YOUR CODE HERE ###
                # Compute x_mean (the denoised estimate)
                # Formula: x_mean = (1/sqrt(alpha_t)) * (x - (beta_t/sqrt(1-alpha_bar_t)) * predicted_noise)
                #
                # Hint: Use torch.sqrt() for square roots
                x_mean = None  # TODO
                ### END YOUR CODE ###
                
                if t > 0:
                    # Add noise for all steps except the last
                    noise = torch.randn_like(x)
                    x = x_mean + torch.sqrt(beta_t) * noise
                else:
                    # Last step: no noise added
                    x = x_mean
        
        model.train()
        return x

---

## Part 6: Training Function

This function is provided for you. It trains the DDPM by:
1. Sampling random timesteps
2. Adding noise to the data
3. Predicting the noise with the model
4. Computing MSE loss between predicted and actual noise

In [None]:
def train_ddpm(
    model: nn.Module,
    data: np.ndarray,
    ddpm: DDPM,
    epochs: int = 200,
    batch_size: int = 64,
    lr: float = 1e-3,
    verbose: bool = False
) -> list:
    """
    Train the DDPM model.
    
    Args:
        model: ScoreNet model
        data: Training data [n_samples, dim]
        ddpm: DDPM instance
        epochs: Number of training epochs
        batch_size: Batch size
        lr: Learning rate
        verbose: Print progress
        
    Returns:
        List of losses per epoch
    """
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    model.train()
    
    # Convert to tensor
    data_tensor = torch.from_numpy(data.astype(np.float32)).to(device)
    n_samples = len(data_tensor)
    
    losses = []
    
    for epoch in range(epochs):
        epoch_losses = []
        
        # Mini-batch training
        indices = torch.randperm(n_samples)
        for i in range(0, n_samples, batch_size):
            batch_idx = indices[i:i+batch_size]
            batch = data_tensor[batch_idx]
            
            # Sample random timesteps
            t = torch.randint(0, ddpm.timesteps, (len(batch),)).to(device)
            
            # Add noise
            x_t, noise = ddpm.add_noise(batch, t)
            
            # Predict noise
            predicted_noise = model(x_t, t)
            
            # MSE loss
            loss = nn.MSELoss()(predicted_noise, noise)
            
            # Backprop
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            epoch_losses.append(loss.item())
        
        avg_loss = np.mean(epoch_losses)
        losses.append(avg_loss)
        
        if verbose and (epoch + 1) % 50 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.6f}")
    
    return losses

---

## Part 7: Portfolio Optimization

Now let's implement the portfolio optimization functions.

### Exercise 4 (10 points)

Implement the **Minimum Variance Portfolio** weights.

The analytical solution is:
$$w_{MVP} = \frac{\Sigma^{-1} \mathbf{1}}{\mathbf{1}^T \Sigma^{-1} \mathbf{1}}$$

where $\Sigma$ is the covariance matrix and $\mathbf{1}$ is a vector of ones.

In [None]:
def get_minimum_variance_weights(cov_matrix: np.ndarray) -> np.ndarray:
    """
    Compute minimum variance portfolio weights.
    
    Args:
        cov_matrix: Covariance matrix [n_assets, n_assets]
        
    Returns:
        Portfolio weights [n_assets, 1]
    """
    n_assets = cov_matrix.shape[0]
    ones = np.ones((n_assets, 1))
    
    # Add small regularization for numerical stability
    cov_reg = cov_matrix + np.eye(n_assets) * 1e-8
    
    ### YOUR CODE HERE ###
    # Step 1: Compute the inverse of the covariance matrix
    # Hint: Use np.linalg.inv() or np.linalg.pinv() for pseudo-inverse
    try:
        cov_inv = None  # TODO
    except np.linalg.LinAlgError:
        cov_inv = np.linalg.pinv(cov_reg)
    
    # Step 2: Compute unnormalized weights: Sigma^{-1} @ ones
    weights_unnorm = None  # TODO
    
    # Step 3: Normalize so weights sum to 1
    # Formula: weights = weights_unnorm / (ones.T @ Sigma^{-1} @ ones)
    weights = None  # TODO
    
    return weights
    ### END YOUR CODE ###

In [None]:
# Test MVP weights
test_cov = np.array([[0.04, 0.01], [0.01, 0.09]])  # 2-asset example
mvp_weights = get_minimum_variance_weights(test_cov)

try:
    assert mvp_weights.shape == (2, 1), f"Shape mismatch: {mvp_weights.shape}"
    assert abs(mvp_weights.sum() - 1.0) < 1e-6, f"Weights don't sum to 1: {mvp_weights.sum()}"
    # Expected: higher weight on lower variance asset
    assert mvp_weights[0] > mvp_weights[1], "Lower variance asset should have higher weight"
    print("MVP weights test PASSED!")
    print(f"Weights: {mvp_weights.flatten()}")
except Exception as e:
    print(f"MVP weights test FAILED: {e}")

### Exercise 5 (10 points)

Implement the **Maximum Sharpe Portfolio** optimization.

We maximize the Sharpe ratio:
$$\text{Sharpe} = \frac{w^T \mu - r_f}{\sqrt{w^T \Sigma w}}$$

Subject to: $\sum_i w_i = 1$

We'll use scipy.optimize.minimize to minimize the **negative** Sharpe ratio.

In [None]:
def get_maximum_sharpe_weights(
    cov_matrix: np.ndarray, 
    expected_returns: np.ndarray,
    risk_free_rate: float = 0.0
) -> np.ndarray:
    """
    Compute maximum Sharpe ratio portfolio weights using optimization.
    
    Args:
        cov_matrix: Covariance matrix [n_assets, n_assets]
        expected_returns: Expected returns [n_assets, 1] or [n_assets,]
        risk_free_rate: Annual risk-free rate (daily scale will be computed)
        
    Returns:
        Portfolio weights [n_assets, 1]
    """
    n_assets = cov_matrix.shape[0]
    expected_returns = expected_returns.flatten()
    
    ### YOUR CODE HERE ###
    def neg_sharpe(weights):
        """
        Compute negative Sharpe ratio (we minimize this).
        
        Args:
            weights: Portfolio weights [n_assets,]
        Returns:
            Negative Sharpe ratio (scalar)
        """
        # Step 1: Compute portfolio return: w^T @ mu
        port_return = None  # TODO
        
        # Step 2: Compute portfolio volatility: sqrt(w^T @ Sigma @ w)
        # Hint: Use np.dot for matrix multiplication
        port_vol = None  # TODO
        
        # Handle edge case of zero volatility
        if port_vol < 1e-10:
            return 1e10
        
        # Step 3: Compute Sharpe ratio and return its negative
        # Note: risk_free_rate is annual, divide by 252 for daily
        sharpe = None  # TODO
        
        return -sharpe  # Return negative because we're minimizing
    ### END YOUR CODE ###
    
    # Constraint: weights sum to 1
    constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
    
    # Bounds: allow short selling (-1 to 1)
    bounds = [(-1, 1) for _ in range(n_assets)]
    
    # Initial guess: equal weights
    w0 = np.ones(n_assets) / n_assets
    
    result = minimize(
        neg_sharpe, 
        w0, 
        method='SLSQP',
        bounds=bounds,
        constraints=constraints,
        options={'maxiter': 1000}
    )
    
    return result.x.reshape(-1, 1)

In [None]:
# Test MaxSharpe weights
test_cov = np.array([[0.04, 0.01], [0.01, 0.09]])
test_returns = np.array([[0.10], [0.05]])  # First asset has higher return

try:
    ms_weights = get_maximum_sharpe_weights(test_cov, test_returns, risk_free_rate=0.02)
    assert ms_weights.shape == (2, 1), f"Shape mismatch: {ms_weights.shape}"
    assert abs(ms_weights.sum() - 1.0) < 1e-4, f"Weights don't sum to 1: {ms_weights.sum()}"
    # Higher return & lower vol asset should have higher weight
    assert ms_weights[0] > ms_weights[1], "Better risk-adjusted asset should have higher weight"
    print("MaxSharpe weights test PASSED!")
    print(f"Weights: {ms_weights.flatten()}")
except Exception as e:
    print(f"MaxSharpe weights test FAILED: {e}")

---

## Part 8: Diffusion Covariance Estimator

Now let's put it all together into a covariance estimator that:
1. Trains a DDPM on return data
2. Generates synthetic samples
3. Computes covariance from the generated samples

### Exercise 6 (15 points)

Complete the `fit()` method of the DiffusionCovarianceEstimator class.

In [None]:
class DiffusionCovarianceEstimator:
    """
    Covariance estimator using DDPM diffusion model.
    
    The approach:
    1. Train a generative diffusion model on historical returns
    2. Generate many synthetic samples from the learned distribution
    3. Compute covariance from the generated samples
    
    This can help "denoise" the sample covariance by learning the underlying distribution.
    """
    
    def __init__(
        self, 
        timesteps: int = 1000,
        hidden_dim: int = 128,
        time_dim: int = 32,
        epochs: int = 200,
        batch_size: int = 64,
        lr: float = 1e-3,
        n_gen_samples: int = 1000
    ):
        self.timesteps = timesteps
        self.hidden_dim = hidden_dim
        self.time_dim = time_dim
        self.epochs = epochs
        self.batch_size = batch_size
        self.lr = lr
        self.n_gen_samples = n_gen_samples
        
        self.model = None
        self.ddpm = None
        self.covariance_ = None
        self.input_dim = None
        self.training_mean = None
        self.training_std = None
    
    def fit(self, X: np.ndarray):
        """
        Fit the diffusion model on returns data.
        
        Args:
            X: Returns data [n_samples, n_assets]
            
        Returns:
            self
        """
        self.input_dim = X.shape[1]
        
        ### YOUR CODE HERE ###
        # Step 1: Standardize the data for better training
        # Compute mean and std, then standardize: X_std = (X - mean) / std
        self.training_mean = None  # TODO: X.mean(axis=0)
        self.training_std = None  # TODO: X.std(axis=0) + 1e-8 (add small epsilon)
        X_standardized = None  # TODO
        
        # Step 2: Initialize the ScoreNet model
        self.model = None  # TODO: Create ScoreNet with appropriate dimensions
        
        # Step 3: Initialize the DDPM
        self.ddpm = None  # TODO: Create DDPM with self.timesteps
        
        # Step 4: Train the model (use train_ddpm function)
        # TODO: Call train_ddpm with appropriate arguments
        
        # Step 5: Generate samples using the trained model
        # TODO: Use self.ddpm.sample() to generate self.n_gen_samples samples
        generated = None  # TODO
        
        # Step 6: Convert to numpy and unstandardize
        generated_np = None  # TODO: Convert to numpy
        generated_unstd = None  # TODO: Reverse the standardization
        
        # Step 7: Compute covariance from generated samples
        self.covariance_ = None  # TODO: Use np.cov with rowvar=False
        ### END YOUR CODE ###
        
        return self

In [None]:
# Test the DiffusionCovarianceEstimator with a small example
print("Testing DiffusionCovarianceEstimator (this may take a minute)...")

# Use a small subset of data for testing
test_returns = returns_df.iloc[:100].values  # 100 days, 11 assets

try:
    estimator = DiffusionCovarianceEstimator(
        timesteps=100,  # Reduced for testing
        hidden_dim=64,
        epochs=50,      # Reduced for testing
        n_gen_samples=200
    )
    estimator.fit(test_returns)
    
    assert estimator.covariance_ is not None, "Covariance not computed"
    assert estimator.covariance_.shape == (11, 11), f"Wrong shape: {estimator.covariance_.shape}"
    
    # Check symmetry
    assert np.allclose(estimator.covariance_, estimator.covariance_.T), "Covariance not symmetric"
    
    print("DiffusionCovarianceEstimator test PASSED!")
    print(f"Covariance shape: {estimator.covariance_.shape}")
except Exception as e:
    print(f"DiffusionCovarianceEstimator test FAILED: {e}")
    import traceback
    traceback.print_exc()

---

## Part 9: Run the Backtest

Now we'll run the full backtest comparing all three covariance estimation methods. This may take 10-30 minutes.

In [None]:
# Backtest configuration
LOOKBACK_WINDOW = 252  # 1 year of trading days
REBALANCE_FREQ = 21    # Monthly rebalancing
RISK_FREE_RATE = 0.02  # Annual risk-free rate

# Weight constraints
MAX_WEIGHT = 0.30
MIN_WEIGHT = -0.10

# DDPM parameters (reduced for faster execution)
DDPM_TIMESTEPS = 500
DDPM_HIDDEN_DIM = 128
DDPM_TIME_DIM = 32
DDPM_EPOCHS = 100
DDPM_BATCH_SIZE = 64
DDPM_LR = 1e-3
DDPM_GEN_SAMPLES = 500

print(f"Lookback window: {LOOKBACK_WINDOW} trading days")
print(f"Rebalance frequency: {REBALANCE_FREQ} trading days")
print(f"Weight bounds: [{MIN_WEIGHT:.0%}, {MAX_WEIGHT:.0%}]")
print(f"\nBacktest period: {returns_df.index[LOOKBACK_WINDOW].date()} to {returns_df.index[-1].date()}")

In [None]:
def run_backtest(
    returns_df: pd.DataFrame,
    lookback_window: int,
    rebalance_freq: int,
    max_weight: float = 0.5,
    min_weight: float = -0.5,
    ddpm_config: dict = None
):
    """
    Run backtest comparing Empirical, Ledoit-Wolf, and Diffusion covariance estimators.
    """
    n_assets = returns_df.shape[1]
    dates = returns_df.index[lookback_window:]
    returns_array = returns_df.values
    
    # Methods and strategies
    methods = ['Empirical', 'Ledoit-Wolf', 'Diffusion']
    strategies = ['MVP', 'MaxSharpe']
    portfolio_names = [f"{m}_{s}" for m in methods for s in strategies]
    
    # Initialize
    portfolio_values = {name: [1.0] for name in portfolio_names}
    portfolio_values['EqualWeight'] = [1.0]
    
    equal_weights = np.ones((n_assets, 1)) / n_assets
    current_weights = {name: equal_weights.copy() for name in portfolio_names}
    
    weights_history = {name: [] for name in portfolio_names}
    rebalance_dates = []
    
    days_since_rebalance = rebalance_freq
    
    def clip_and_normalize(weights, min_w, max_w):
        clipped = np.clip(weights, min_w, max_w)
        return clipped / clipped.sum()
    
    empirical_est = EmpiricalCovariance()
    lw_est = LedoitWolf()
    
    if ddpm_config is None:
        ddpm_config = {}
    
    for t in tqdm(range(lookback_window, len(returns_df)), desc='Backtesting'):
        if days_since_rebalance >= rebalance_freq:
            trailing_returns = returns_array[t - lookback_window:t, :]
            mean_returns = np.mean(trailing_returns, axis=0).reshape(-1, 1)
            
            # Empirical
            empirical_est.fit(trailing_returns)
            cov_empirical = empirical_est.covariance_
            
            # Ledoit-Wolf
            lw_est.fit(trailing_returns)
            cov_lw = lw_est.covariance_
            
            # Diffusion
            diffusion_est = DiffusionCovarianceEstimator(**ddpm_config)
            diffusion_est.fit(trailing_returns)
            cov_diffusion = diffusion_est.covariance_
            
            covariances = {
                'Empirical': cov_empirical,
                'Ledoit-Wolf': cov_lw,
                'Diffusion': cov_diffusion
            }
            
            for method, cov_matrix in covariances.items():
                try:
                    mvp_weights = get_minimum_variance_weights(cov_matrix)
                    current_weights[f"{method}_MVP"] = clip_and_normalize(
                        mvp_weights, min_weight, max_weight
                    )
                except Exception:
                    pass
                
                try:
                    ms_weights = get_maximum_sharpe_weights(cov_matrix, mean_returns)
                    current_weights[f"{method}_MaxSharpe"] = clip_and_normalize(
                        ms_weights, min_weight, max_weight
                    )
                except Exception:
                    pass
            
            for name in portfolio_names:
                weights_history[name].append(current_weights[name].flatten())
            
            rebalance_dates.append(returns_df.index[t])
            days_since_rebalance = 0
        
        today_returns = returns_array[t, :].reshape(-1, 1)
        
        for name in portfolio_names:
            port_return = (current_weights[name].T @ today_returns).item()
            portfolio_values[name].append(
                portfolio_values[name][-1] * (1 + port_return)
            )
        
        eq_return = (equal_weights.T @ today_returns).item()
        portfolio_values['EqualWeight'].append(
            portfolio_values['EqualWeight'][-1] * (1 + eq_return)
        )
        
        days_since_rebalance += 1
    
    all_dates = [returns_df.index[lookback_window - 1]] + list(dates)
    portfolio_df = pd.DataFrame(portfolio_values, index=all_dates)
    
    weights_dfs = {}
    for name in portfolio_names:
        weights_dfs[name] = pd.DataFrame(
            weights_history[name],
            index=rebalance_dates,
            columns=returns_df.columns
        )
    
    return {
        'portfolio_values': portfolio_df,
        'weights': weights_dfs
    }

In [None]:
# Run the backtest
ddpm_config = {
    'timesteps': DDPM_TIMESTEPS,
    'hidden_dim': DDPM_HIDDEN_DIM,
    'time_dim': DDPM_TIME_DIM,
    'epochs': DDPM_EPOCHS,
    'batch_size': DDPM_BATCH_SIZE,
    'lr': DDPM_LR,
    'n_gen_samples': DDPM_GEN_SAMPLES
}

n_rebalances = (len(returns_df) - LOOKBACK_WINDOW) // REBALANCE_FREQ + 1
print(f"Running backtest...")
print(f"Number of rebalancing periods: {n_rebalances}")
print(f"Estimated time: 10-30 minutes\n")

results = run_backtest(
    returns_df,
    LOOKBACK_WINDOW,
    REBALANCE_FREQ,
    max_weight=MAX_WEIGHT,
    min_weight=MIN_WEIGHT,
    ddpm_config=ddpm_config
)

portfolio_values = results['portfolio_values']
weights_dfs = results['weights']

print(f"\nBacktest complete!")

---

## Part 10: Performance Analysis

In [None]:
def compute_performance_metrics(portfolio_values: pd.DataFrame, risk_free_rate: float = 0.02):
    """Compute performance metrics for all portfolios."""
    metrics = {}
    
    for col in portfolio_values.columns:
        values = portfolio_values[col]
        returns = values.pct_change().dropna()
        
        total_return = values.iloc[-1] / values.iloc[0] - 1
        n_years = len(returns) / 252
        ann_return = (1 + total_return) ** (1 / n_years) - 1
        ann_vol = returns.std() * np.sqrt(252)
        sharpe = (ann_return - risk_free_rate) / ann_vol
        
        cummax = values.cummax()
        drawdown = (values - cummax) / cummax
        max_drawdown = drawdown.min()
        calmar = ann_return / abs(max_drawdown) if max_drawdown != 0 else np.nan
        
        metrics[col] = {
            'Total Return': total_return,
            'Ann. Return': ann_return,
            'Ann. Volatility': ann_vol,
            'Sharpe Ratio': sharpe,
            'Max Drawdown': max_drawdown,
            'Calmar Ratio': calmar
        }
    
    return pd.DataFrame(metrics)

metrics_df = compute_performance_metrics(portfolio_values, RISK_FREE_RATE)

In [None]:
# Display results
mvp_cols = [c for c in metrics_df.columns if 'MVP' in c]
maxsharpe_cols = [c for c in metrics_df.columns if 'MaxSharpe' in c]

print("\n" + "=" * 80)
print("MINIMUM VARIANCE PORTFOLIOS")
print("=" * 80)
display_df = metrics_df[mvp_cols + ['EqualWeight']].copy()
for col in display_df.columns:
    display_df[col] = display_df[col].apply(lambda x: f"{x:.2%}" if abs(x) < 10 else f"{x:.2f}")
print(display_df)

print("\n" + "=" * 80)
print("MAXIMUM SHARPE PORTFOLIOS")
print("=" * 80)
display_df = metrics_df[maxsharpe_cols + ['EqualWeight']].copy()
for col in display_df.columns:
    display_df[col] = display_df[col].apply(lambda x: f"{x:.2%}" if abs(x) < 10 else f"{x:.2f}")
print(display_df)

In [None]:
# Plot cumulative returns
fig, axes = plt.subplots(2, 1, figsize=(14, 12))

# MVP portfolios
ax = axes[0]
colors = {'Empirical_MVP': 'blue', 'Ledoit-Wolf_MVP': 'green', 'Diffusion_MVP': 'red', 'EqualWeight': 'gray'}
for col in mvp_cols + ['EqualWeight']:
    style = '--' if col == 'EqualWeight' else '-'
    lw = 2.5 if 'Diffusion' in col else 1.5
    portfolio_values[col].plot(ax=ax, label=col, linestyle=style, linewidth=lw, color=colors.get(col))
ax.set_title('Minimum Variance Portfolios', fontsize=14)
ax.set_ylabel('Portfolio Value')
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)

# MaxSharpe portfolios
ax = axes[1]
colors = {'Empirical_MaxSharpe': 'blue', 'Ledoit-Wolf_MaxSharpe': 'green', 'Diffusion_MaxSharpe': 'red', 'EqualWeight': 'gray'}
for col in maxsharpe_cols + ['EqualWeight']:
    style = '--' if col == 'EqualWeight' else '-'
    lw = 2.5 if 'Diffusion' in col else 1.5
    portfolio_values[col].plot(ax=ax, label=col, linestyle=style, linewidth=lw, color=colors.get(col))
ax.set_title('Maximum Sharpe Portfolios', fontsize=14)
ax.set_ylabel('Portfolio Value')
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Sharpe ratio ranking
print("=" * 60)
print("RANKING BY SHARPE RATIO")
print("=" * 60)

print("\nMinimum Variance Portfolios:")
print("-" * 40)
mvp_sharpe = metrics_df.loc['Sharpe Ratio', mvp_cols].sort_values(ascending=False)
for i, (name, sharpe) in enumerate(mvp_sharpe.items(), 1):
    print(f"{i}. {name}: {sharpe:.3f}")

print("\nMaximum Sharpe Portfolios:")
print("-" * 40)
maxsharpe_sharpe = metrics_df.loc['Sharpe Ratio', maxsharpe_cols].sort_values(ascending=False)
for i, (name, sharpe) in enumerate(maxsharpe_sharpe.items(), 1):
    print(f"{i}. {name}: {sharpe:.3f}")

print(f"\nEqual Weight: {metrics_df.loc['Sharpe Ratio', 'EqualWeight']:.3f}")

---

## Part 11: Interpretation Questions

Based on the backtest results above, answer the following questions.

### Question 4 (4 points)

Compare the performance of the **Diffusion** method between MVP and MaxSharpe strategies.

**Questions:**
1. Which strategy performed better with Diffusion covariance estimation?
2. What is the Sharpe ratio difference between the two strategies?

**Your Answer:**

[Your answer here - 2-3 sentences]

### Question 5 (4 points)

The Diffusion method shows different relative performance for MVP vs MaxSharpe compared to traditional methods.

**Why might the Diffusion-based covariance perform differently for these two optimization objectives?**

Hint: Think about what each optimization objective is sensitive to (variance structure vs. mean-variance tradeoff).

**Your Answer:**

[Your answer here - 3-4 sentences]

### Question 6 (4 points)

Look at the weight evolution plots for the different methods.

**What do you observe about the stability of weights over time for each covariance estimation method? Which method produces more stable allocations?**

**Your Answer:**

[Your answer here - 2-3 sentences]

In [None]:
# Plot weight evolution for comparison
fig, axes = plt.subplots(2, 3, figsize=(18, 10))

comparisons = [
    ('Empirical_MVP', 'Empirical - MVP'),
    ('Ledoit-Wolf_MVP', 'Ledoit-Wolf - MVP'),
    ('Diffusion_MVP', 'Diffusion - MVP'),
    ('Empirical_MaxSharpe', 'Empirical - MaxSharpe'),
    ('Ledoit-Wolf_MaxSharpe', 'Ledoit-Wolf - MaxSharpe'),
    ('Diffusion_MaxSharpe', 'Diffusion - MaxSharpe'),
]

for ax, (key, title) in zip(axes.flat, comparisons):
    weights_dfs[key].plot(ax=ax, linewidth=1, legend=False)
    ax.set_title(title, fontsize=11)
    ax.set_ylabel('Weight')
    ax.axhline(y=0, color='black', linestyle='-', linewidth=0.5)
    ax.grid(True, alpha=0.3)

axes[1, 2].legend(loc='center left', bbox_to_anchor=(1, 0.5), fontsize=8)

plt.tight_layout()
plt.show()

### Question 7 (3 points)

**What are the main trade-offs of using diffusion-based covariance estimation compared to traditional methods like Ledoit-Wolf shrinkage?**

Consider: computational cost, sample efficiency, assumptions, interpretability.

**Your Answer:**

[Your answer here - 3-4 sentences]

---

## Part 12: Bonus Challenge (Optional - 10 extra points)

Experiment with different DDPM hyperparameters and analyze their impact on portfolio performance.

**Choose ONE of the following experiments:**

A) Increase `n_gen_samples` from 500 to 2000. Does generating more samples improve covariance estimation?

B) Increase `epochs` from 100 to 300. Does longer training help?

C) Try a different `timesteps` value (e.g., 200 vs 1000). How does the noise schedule length affect results?

**Document your findings below:**

In [None]:
# BONUS: Your experiment code here
# Example: Re-run with different parameters

# ddpm_config_bonus = {
#     'timesteps': DDPM_TIMESTEPS,
#     'hidden_dim': DDPM_HIDDEN_DIM,
#     'time_dim': DDPM_TIME_DIM,
#     'epochs': 300,  # Changed
#     'batch_size': DDPM_BATCH_SIZE,
#     'lr': DDPM_LR,
#     'n_gen_samples': DDPM_GEN_SAMPLES
# }

# results_bonus = run_backtest(...)

**Bonus Findings:**

[Describe your experiment and findings here]

---

## Summary

In this homework, you implemented:

1. **ScoreNet**: A neural network for noise prediction in DDPM
2. **Forward diffusion**: Adding noise to data according to a schedule
3. **Reverse diffusion**: Iteratively denoising to generate samples
4. **Portfolio optimization**: MVP and Maximum Sharpe portfolios
5. **Diffusion covariance estimator**: Using generated samples for covariance estimation

### Key Takeaways:

- Diffusion models can learn the underlying distribution of financial returns
- Generated samples can be used to compute "denoised" covariance estimates
- The effectiveness depends on the optimization objective (MVP vs MaxSharpe)
- There are computational trade-offs compared to analytical shrinkage methods