# Advanced Portfolio Optimization

**Addressing Limitations of Classical Mean-Variance Optimization (MVO)**

This notebook implements modern portfolio optimization techniques that address the well-documented theoretical limitations of classical MVO:

1. **Black-Litterman Model** - Fixes the "Estimation Error Maximizer" problem
2. **Ledoit-Wolf Shrinkage Estimator** - Fixes sample covariance matrix instability  
3. **EWMA/Dynamic Volatility** - Provides forward-looking risk estimates
4. **Downside Risk Optimization (CVaR/Sortino)** - Handles non-normal return distributions
5. **Hierarchical Risk Parity (HRP)** - Machine learning-based portfolio construction
6. **Walk-Forward Optimization** - Proper out-of-sample backtesting

---

*IAPM Portfolio Project - Bharath, Chelsea, Gaurav, Vikram, Utkarsh, Yash*

In [None]:
# ============================================================
# IMPORTS AND CONFIGURATION
# ============================================================

import os
import io
import zipfile
import warnings
import itertools
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import glob
from typing import Optional, Dict, List, Tuple, Union
from scipy.optimize import minimize
from scipy.cluster.hierarchy import linkage, leaves_list, dendrogram
from scipy.spatial.distance import squareform

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# Configuration Constants
TRADING_DAYS = 252
RISK_FREE_RATE = 0.06 / TRADING_DAYS  # Daily risk-free rate (6% annual / 252 trading days)

print("Advanced Portfolio Optimization - Setup Complete")

## Data Loading Utilities

Functions to load and prepare stock data from the existing data structure.

In [None]:
# ============================================================
# DATA LOADING UTILITIES
# ============================================================

def find_data_root(base_folder: str) -> Optional[str]:
    """
    Recursively search for Companies_list.csv to detect valid data root.
    """
    for root, dirs, files in os.walk(base_folder):
        if "Companies_list.csv" in files and "HISTORICAL_DATA" in dirs:
            return root
    return None


def load_company_csv_strict(path: str, symbol: str) -> Optional[pd.DataFrame]:
    """
    Load single CSV only if it contains 'Date' and a close-like column.
    """
    try:
        raw = pd.read_csv(path, low_memory=False)
        raw.columns = [str(c).strip() for c in raw.columns]
        date_col = next((c for c in raw.columns if c.lower() == "date"), None)
        if date_col is None:
            return None
        raw[date_col] = pd.to_datetime(raw[date_col], errors="coerce")
        raw = raw.dropna(subset=[date_col])
        close_candidates = [c for c in raw.columns if c.lower() in 
                          ("adj_close", "adj close", "close", "close_price", "close price")]
        if not close_candidates:
            return None
        close_col = close_candidates[0]
        df = raw[[date_col, close_col]].rename(columns={date_col: "Date", close_col: symbol})
        df = df.dropna().drop_duplicates(subset=["Date"]).sort_values("Date").set_index("Date")
        return df
    except Exception:
        return None


def read_stock_data_from_folder(folder_path: str) -> pd.DataFrame:
    """
    Reads stock dataset from folder structure with Companies_list.csv
    and HISTORICAL_DATA/*.csv files.
    
    Returns DataFrame with date index and stock symbols as columns.
    """
    companies_csv = os.path.join(folder_path, "Companies_list.csv")
    hist_dir = os.path.join(folder_path, "HISTORICAL_DATA")
    
    if not os.path.exists(companies_csv):
        raise Exception("Companies_list.csv not found")
    if not os.path.exists(hist_dir):
        raise Exception("HISTORICAL_DATA folder not found")
    
    companies_df = pd.read_csv(companies_csv)
    companies_df.columns = [c.strip() for c in companies_df.columns]
    symbol_col = next(
        (c for c in companies_df.columns
         if c.lower() in ("symbol", "ticker", "company", "stock", "code")),
        companies_df.columns[0]
    )
    symbols = companies_df[symbol_col].astype(str).tolist()
    
    files = glob.glob(os.path.join(hist_dir, "*_data.csv"))
    file_map = {}
    for f in files:
        name = os.path.basename(f)
        prefix = name.replace("_data.csv", "")
        file_map[prefix.upper()] = f
    
    frames = []
    loaded_symbols = []
    for sym in symbols:
        path = file_map.get(sym.upper())
        if path:
            df = load_company_csv_strict(path, sym)
            if df is not None:
                frames.append(df)
                loaded_symbols.append(sym)
    
    if not frames:
        raise Exception("No valid stock data found")
    
    combined = pd.concat(frames, axis=1)
    combined = combined.dropna(how='all')
    combined = combined.reset_index().rename(columns={'index': 'date', 'Date': 'date'})
    
    return combined


def filter_dataset(
    df: pd.DataFrame,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    company_codes: Optional[List[str]] = None
) -> pd.DataFrame:
    """
    Filters DataFrame by date range and/or company codes.
    """
    filtered = df.copy()
    filtered['date'] = pd.to_datetime(filtered['date'])
    
    if start_date:
        filtered = filtered[filtered['date'] >= pd.to_datetime(start_date)]
    if end_date:
        filtered = filtered[filtered['date'] <= pd.to_datetime(end_date)]
    if company_codes:
        cols = ['date'] + [c for c in company_codes if c in filtered.columns]
        filtered = filtered[cols]
    
    return filtered.dropna()


def compute_returns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Computes daily returns from price DataFrame.
    """
    price_cols = [c for c in df.columns if c != 'date']
    prices = df[price_cols].copy()
    returns = prices.pct_change().dropna()
    return returns


def generate_sample_data(
    n_assets: int = 10,
    n_days: int = 500,
    seed: int = 42
) -> pd.DataFrame:
    """
    Generates synthetic stock price data for demonstration.
    
    Parameters
    ----------
    n_assets : int
        Number of synthetic stocks to generate.
    n_days : int
        Number of trading days.
    seed : int
        Random seed for reproducibility.
    
    Returns
    -------
    pd.DataFrame
        DataFrame with 'date' column and stock price columns.
    """
    np.random.seed(seed)
    
    # Create date range
    dates = pd.date_range(start='2020-01-01', periods=n_days, freq='B')
    
    # Generate correlated returns using Cholesky decomposition
    # Create a random correlation matrix
    A = np.random.randn(n_assets, n_assets)
    corr_matrix = A @ A.T
    d = np.sqrt(np.diag(corr_matrix))
    corr_matrix = corr_matrix / np.outer(d, d)
    
    # Generate daily volatilities (1.5-3.5% daily std, roughly 24-56% annualized)
    daily_vols = np.random.uniform(0.015, 0.035, n_assets)
    
    # Generate daily expected returns (0.02-0.06% daily, roughly 5-15% annualized)
    daily_means = np.random.uniform(0.0002, 0.0006, n_assets)
    
    # Generate correlated returns
    L = np.linalg.cholesky(corr_matrix)
    uncorrelated = np.random.randn(n_days, n_assets)
    correlated = uncorrelated @ L.T
    returns = correlated * daily_vols + daily_means
    
    # Convert to prices
    prices = np.zeros((n_days, n_assets))
    prices[0] = 100  # Initial price of $100
    for t in range(1, n_days):
        prices[t] = prices[t-1] * (1 + returns[t])
    
    # Create DataFrame
    symbols = [f'STOCK_{i+1:02d}' for i in range(n_assets)]
    df = pd.DataFrame(prices, columns=symbols)
    df['date'] = dates
    df = df[['date'] + symbols]
    
    return df


print("Data loading utilities defined.")

---

## 1. Black-Litterman Model

### The Problem: "Estimation Error Maximizer"

Classical MVO uses historical mean returns to estimate future expected returns. This approach:
- Heavily overweights assets with abnormally high historical returns
- Underweights assets with poor past performance
- Blindly assumes the past perfectly dictates the future

### The Solution: Black-Litterman Model

The Black-Litterman model:
1. Starts with market equilibrium returns implied by market cap weights
2. Allows blending with investor views (absolute or relative forecasts)
3. Produces more stable and intuitive portfolio weights

In [None]:
# ============================================================
# BLACK-LITTERMAN MODEL
# ============================================================

def compute_implied_equilibrium_returns(
    Sigma: np.ndarray,
    market_weights: np.ndarray,
    risk_aversion: float = 2.5,
    risk_free_rate: float = 0.0
) -> np.ndarray:
    """
    Compute implied equilibrium returns using reverse optimization.
    
    The market portfolio is assumed to be optimal, so we back out
    what expected returns would make these weights optimal.
    
    Parameters
    ----------
    Sigma : np.ndarray
        Covariance matrix of returns (n x n).
    market_weights : np.ndarray
        Market capitalization weights (n,).
    risk_aversion : float
        Risk aversion parameter (lambda). Higher = more risk averse.
    risk_free_rate : float
        Risk-free rate.
    
    Returns
    -------
    np.ndarray
        Implied equilibrium expected returns (n,).
    """
    # Pi = lambda * Sigma * w_mkt
    pi = risk_aversion * (Sigma @ market_weights)
    return pi


def black_litterman_posterior(
    Sigma: np.ndarray,
    equilibrium_returns: np.ndarray,
    P: Optional[np.ndarray] = None,
    Q: Optional[np.ndarray] = None,
    omega: Optional[np.ndarray] = None,
    tau: float = 0.05
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Compute Black-Litterman posterior expected returns and covariance.
    
    Parameters
    ----------
    Sigma : np.ndarray
        Covariance matrix of returns (n x n).
    equilibrium_returns : np.ndarray
        Implied equilibrium returns from market cap weights (n,).
    P : np.ndarray, optional
        View matrix (k x n) where k is number of views.
        Each row represents one view on assets.
    Q : np.ndarray, optional
        View returns (k,). Expected returns for each view.
    omega : np.ndarray, optional
        View uncertainty matrix (k x k). Confidence in each view.
        If None, uses proportional to implied variance.
    tau : float
        Scaling parameter for prior uncertainty. Typically 0.01-0.05.
    
    Returns
    -------
    Tuple[np.ndarray, np.ndarray]
        (posterior_returns, posterior_covariance)
    """
    n = len(equilibrium_returns)
    tau_Sigma = tau * Sigma
    
    # If no views provided, return equilibrium
    if P is None or Q is None:
        return equilibrium_returns, Sigma + tau_Sigma
    
    P = np.atleast_2d(P)
    Q = np.atleast_1d(Q)
    k = P.shape[0]
    
    # Default omega: proportional to view portfolio variance
    if omega is None:
        omega = np.diag(np.diag(P @ tau_Sigma @ P.T))
    
    # Black-Litterman formula for posterior mean
    # E[R] = [(tau*Sigma)^-1 + P'*Omega^-1*P]^-1 * [(tau*Sigma)^-1*Pi + P'*Omega^-1*Q]
    tau_Sigma_inv = np.linalg.inv(tau_Sigma)
    omega_inv = np.linalg.inv(omega)
    
    M = tau_Sigma_inv + P.T @ omega_inv @ P
    posterior_returns = np.linalg.solve(M, tau_Sigma_inv @ equilibrium_returns + P.T @ omega_inv @ Q)
    
    # Posterior covariance
    posterior_cov = Sigma + np.linalg.inv(M)
    
    return posterior_returns, posterior_cov


def black_litterman_portfolio(
    returns_df: pd.DataFrame,
    market_caps: Optional[np.ndarray] = None,
    views: Optional[List[Dict]] = None,
    risk_aversion: float = 2.5,
    tau: float = 0.05,
    risk_free_rate: float = 0.0
) -> Dict:
    """
    Complete Black-Litterman portfolio optimization.
    
    Parameters
    ----------
    returns_df : pd.DataFrame
        Returns DataFrame with assets as columns.
    market_caps : np.ndarray, optional
        Market capitalizations. If None, uses equal weights.
    views : List[Dict], optional
        List of views. Each view is a dict with:
        - 'assets': list of asset names or single asset
        - 'return': expected return
        - 'type': 'absolute' or 'relative'
        - 'confidence': optional confidence level (default 1.0)
    risk_aversion : float
        Risk aversion parameter.
    tau : float
        Prior uncertainty parameter.
    risk_free_rate : float
        Risk-free rate.
    
    Returns
    -------
    Dict
        Portfolio results including weights, expected returns, risk.
    
    Examples
    --------
    >>> views = [
    ...     {'assets': 'AAPL', 'return': 0.10, 'type': 'absolute'},
    ...     {'assets': ['GOOG', 'MSFT'], 'return': 0.02, 'type': 'relative'},  # GOOG outperforms MSFT by 2%
    ... ]
    """
    returns = returns_df.values
    n = returns.shape[1]
    asset_names = list(returns_df.columns)
    
    # Compute sample covariance
    Sigma = np.cov(returns, rowvar=False)
    
    # Market weights (equal if not provided)
    if market_caps is None:
        market_weights = np.ones(n) / n
    else:
        market_weights = market_caps / market_caps.sum()
    
    # Implied equilibrium returns
    pi = compute_implied_equilibrium_returns(
        Sigma, market_weights, risk_aversion, risk_free_rate
    )
    
    # Process views into P and Q matrices
    P = None
    Q = None
    omega = None
    
    if views:
        P_list = []
        Q_list = []
        conf_list = []
        
        for view in views:
            assets = view['assets']
            ret = view['return']
            view_type = view.get('type', 'absolute')
            confidence = view.get('confidence', 1.0)
            
            p_row = np.zeros(n)
            
            if view_type == 'absolute':
                if isinstance(assets, str):
                    assets = [assets]
                for asset in assets:
                    if asset in asset_names:
                        idx = asset_names.index(asset)
                        p_row[idx] = 1.0 / len(assets)
            elif view_type == 'relative':
                # First asset outperforms second by 'return'
                if len(assets) >= 2:
                    idx1 = asset_names.index(assets[0]) if assets[0] in asset_names else -1
                    idx2 = asset_names.index(assets[1]) if assets[1] in asset_names else -1
                    if idx1 >= 0 and idx2 >= 0:
                        p_row[idx1] = 1.0
                        p_row[idx2] = -1.0
            
            if np.any(p_row != 0):
                P_list.append(p_row)
                Q_list.append(ret)
                conf_list.append(confidence)
        
        if P_list:
            P = np.array(P_list)
            Q = np.array(Q_list)
            # Omega scaled by confidence
            tau_Sigma = tau * Sigma
            view_vars = np.diag(P @ tau_Sigma @ P.T)
            omega = np.diag(view_vars / np.array(conf_list))
    
    # Compute posterior
    posterior_returns, posterior_cov = black_litterman_posterior(
        Sigma, pi, P, Q, omega, tau
    )
    
    # Optimize portfolio using posterior estimates
    # Mean-variance optimization with posterior estimates
    def neg_sharpe(w):
        ret = w @ posterior_returns
        risk = np.sqrt(w @ posterior_cov @ w)
        return -(ret - risk_free_rate) / risk if risk > 0 else 0
    
    constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
    bounds = [(0, 1) for _ in range(n)]
    w0 = np.ones(n) / n
    
    result = minimize(neg_sharpe, w0, method='SLSQP', bounds=bounds, constraints=constraints)
    weights = result.x
    
    # Compute portfolio metrics
    port_return = weights @ posterior_returns
    port_risk = np.sqrt(weights @ posterior_cov @ weights)
    sharpe = (port_return - risk_free_rate) / port_risk if port_risk > 0 else 0
    
    return {
        'weights': weights,
        'asset_names': asset_names,
        'return': port_return,
        'risk': port_risk,
        'sharpe': sharpe,
        'equilibrium_returns': pi,
        'posterior_returns': posterior_returns,
        'posterior_covariance': posterior_cov,
        'market_weights': market_weights
    }


print("Black-Litterman model defined.")

---

## 2. Ledoit-Wolf Shrinkage Estimator

### The Problem: Sample Covariance Instability

The standard sample covariance matrix (`np.cov()`):
- Becomes ill-conditioned when N (assets) is large relative to T (time periods)
- The optimizer exploits spurious correlations
- Results in extreme, unstable portfolio weights

### The Solution: Ledoit-Wolf Shrinkage

Shrinkage "pulls" the empirical covariance toward a structured target (constant correlation model), reducing the impact of extreme, likely false, covariance values.

In [None]:
# ============================================================
# LEDOIT-WOLF SHRINKAGE ESTIMATOR
# ============================================================

def ledoit_wolf_shrinkage(
    returns: np.ndarray,
    shrinkage_target: str = 'constant_correlation'
) -> Tuple[np.ndarray, float]:
    """
    Compute Ledoit-Wolf shrinkage covariance estimator.
    
    The shrinkage estimator combines the sample covariance matrix with
    a structured target to produce a more stable estimate.
    
    Parameters
    ----------
    returns : np.ndarray
        T x N matrix of returns.
    shrinkage_target : str
        'constant_correlation' or 'identity'.
    
    Returns
    -------
    Tuple[np.ndarray, float]
        (shrunk_covariance, shrinkage_intensity)
    """
    T, N = returns.shape
    
    # De-mean returns
    returns_centered = returns - returns.mean(axis=0)
    
    # Sample covariance
    sample_cov = (returns_centered.T @ returns_centered) / T
    
    # Sample variances and standard deviations
    sample_var = np.diag(sample_cov)
    sample_std = np.sqrt(sample_var)
    
    # Compute shrinkage target
    if shrinkage_target == 'constant_correlation':
        # Constant correlation model: same average correlation everywhere
        std_outer = np.outer(sample_std, sample_std)
        sample_corr = sample_cov / np.where(std_outer > 0, std_outer, 1)
        np.fill_diagonal(sample_corr, 1.0)
        
        # Average off-diagonal correlation
        off_diag_mask = ~np.eye(N, dtype=bool)
        avg_corr = np.mean(sample_corr[off_diag_mask])
        
        # Target: constant correlation matrix scaled by variances
        F = avg_corr * std_outer
        np.fill_diagonal(F, sample_var)
    else:
        # Identity (scaled by average variance)
        avg_var = np.mean(sample_var)
        F = avg_var * np.eye(N)
    
    # Compute optimal shrinkage intensity using Ledoit-Wolf formula
    # Simplified version - computes sample quantities
    
    # pi: sum of asymptotic variances of scaled sample covariances
    X = returns_centered
    pi_sum = 0.0
    for i in range(N):
        for j in range(N):
            term = X[:, i] * X[:, j] - sample_cov[i, j]
            pi_sum += np.sum(term ** 2) / T
    
    # gamma: misspecification (Frobenius norm of S - F)
    gamma = np.sum((sample_cov - F) ** 2)
    
    # Optimal shrinkage intensity
    kappa = (pi_sum - gamma) / gamma if gamma > 0 else 0
    shrinkage_intensity = max(0, min(1, kappa / T))
    
    # Shrunk covariance
    shrunk_cov = shrinkage_intensity * F + (1 - shrinkage_intensity) * sample_cov
    
    return shrunk_cov, shrinkage_intensity


def portfolio_optimizer_shrinkage(
    returns_df: pd.DataFrame,
    risk_free_rate: float = 0.0,
    shrinkage_target: str = 'constant_correlation'
) -> Dict:
    """
    Portfolio optimization using Ledoit-Wolf shrinkage covariance.
    
    Parameters
    ----------
    returns_df : pd.DataFrame
        Returns DataFrame.
    risk_free_rate : float
        Risk-free rate.
    shrinkage_target : str
        Shrinkage target type.
    
    Returns
    -------
    Dict
        Portfolio results.
    """
    returns = returns_df.values
    n = returns.shape[1]
    
    # Mean returns
    mu = np.mean(returns, axis=0)
    
    # Shrunk covariance
    Sigma, shrinkage_intensity = ledoit_wolf_shrinkage(returns, shrinkage_target)
    
    # GMVP
    Sigma_inv = np.linalg.inv(Sigma)
    ones = np.ones(n)
    w_gmvp = Sigma_inv @ ones / (ones @ Sigma_inv @ ones)
    w_gmvp = np.maximum(w_gmvp, 0)
    w_gmvp /= w_gmvp.sum()
    
    # MRR (Max Sharpe)
    excess = mu - risk_free_rate
    w_mrr = Sigma_inv @ excess
    w_mrr = np.maximum(w_mrr, 0)
    if w_mrr.sum() > 0:
        w_mrr /= w_mrr.sum()
    else:
        w_mrr = np.ones(n) / n
    
    # Compute metrics
    gmvp_ret = w_gmvp @ mu
    gmvp_risk = np.sqrt(w_gmvp @ Sigma @ w_gmvp)
    
    mrr_ret = w_mrr @ mu
    mrr_risk = np.sqrt(w_mrr @ Sigma @ w_mrr)
    mrr_sharpe = (mrr_ret - risk_free_rate) / mrr_risk if mrr_risk > 0 else 0
    
    return {
        'gmvp': {'weights': w_gmvp, 'return': gmvp_ret, 'risk': gmvp_risk},
        'mrr': {'weights': w_mrr, 'return': mrr_ret, 'risk': mrr_risk, 'sharpe': mrr_sharpe},
        'shrinkage_intensity': shrinkage_intensity,
        'shrunk_covariance': Sigma,
        'mean_returns': mu
    }


print("Ledoit-Wolf shrinkage estimator defined.")

---

## 3. Dynamic Volatility (EWMA)

### The Problem: Static Covariance Assumptions

Returns are heteroskedastic (volatility clusters over time). A simple equally-weighted historical covariance matrix doesn't capture recent volatility changes.

### The Solution: Exponentially Weighted Moving Average (EWMA)

EWMA gives more weight to recent observations, providing a more forward-looking risk estimate.

In [None]:
# ============================================================
# DYNAMIC VOLATILITY (EWMA)
# ============================================================

def ewma_covariance(
    returns: np.ndarray,
    lambda_decay: float = 0.94
) -> np.ndarray:
    """
    Compute EWMA (Exponentially Weighted Moving Average) covariance matrix.
    
    More recent observations receive higher weight, making this
    a more responsive estimate of current market conditions.
    
    Parameters
    ----------
    returns : np.ndarray
        T x N matrix of returns.
    lambda_decay : float
        Decay factor (0 < lambda < 1). Higher = more weight to older obs.
        RiskMetrics uses 0.94 for daily data.
    
    Returns
    -------
    np.ndarray
        EWMA covariance matrix (N x N).
    """
    T, N = returns.shape
    
    # De-mean returns
    returns_centered = returns - returns.mean(axis=0)
    
    # Initialize with sample covariance
    cov = np.zeros((N, N))
    
    # Compute EWMA iteratively
    for t in range(T):
        r = returns_centered[t:t+1].T  # Column vector
        if t == 0:
            cov = r @ r.T
        else:
            cov = lambda_decay * cov + (1 - lambda_decay) * (r @ r.T)
    
    return cov


def portfolio_optimizer_ewma(
    returns_df: pd.DataFrame,
    risk_free_rate: float = 0.0,
    lambda_decay: float = 0.94
) -> Dict:
    """
    Portfolio optimization using EWMA covariance matrix.
    
    Parameters
    ----------
    returns_df : pd.DataFrame
        Returns DataFrame.
    risk_free_rate : float
        Risk-free rate.
    lambda_decay : float
        EWMA decay factor.
    
    Returns
    -------
    Dict
        Portfolio results.
    """
    returns = returns_df.values
    n = returns.shape[1]
    
    # Mean returns (can also use EWMA mean if desired)
    mu = np.mean(returns, axis=0)
    
    # EWMA covariance
    Sigma = ewma_covariance(returns, lambda_decay)
    
    # Ensure positive definiteness
    eigvals = np.linalg.eigvalsh(Sigma)
    if eigvals.min() <= 0:
        # Add small regularization
        Sigma += np.eye(n) * abs(eigvals.min()) * 1.1
    
    # GMVP
    Sigma_inv = np.linalg.inv(Sigma)
    ones = np.ones(n)
    w_gmvp = Sigma_inv @ ones / (ones @ Sigma_inv @ ones)
    w_gmvp = np.maximum(w_gmvp, 0)
    w_gmvp /= w_gmvp.sum()
    
    # MRR
    excess = mu - risk_free_rate
    w_mrr = Sigma_inv @ excess
    w_mrr = np.maximum(w_mrr, 0)
    if w_mrr.sum() > 0:
        w_mrr /= w_mrr.sum()
    else:
        w_mrr = np.ones(n) / n
    
    # Metrics
    gmvp_ret = w_gmvp @ mu
    gmvp_risk = np.sqrt(w_gmvp @ Sigma @ w_gmvp)
    
    mrr_ret = w_mrr @ mu
    mrr_risk = np.sqrt(w_mrr @ Sigma @ w_mrr)
    mrr_sharpe = (mrr_ret - risk_free_rate) / mrr_risk if mrr_risk > 0 else 0
    
    return {
        'gmvp': {'weights': w_gmvp, 'return': gmvp_ret, 'risk': gmvp_risk},
        'mrr': {'weights': w_mrr, 'return': mrr_ret, 'risk': mrr_risk, 'sharpe': mrr_sharpe},
        'ewma_covariance': Sigma,
        'lambda_decay': lambda_decay
    }


print("Dynamic volatility (EWMA) defined.")

---

## 4. Downside Risk Optimization (CVaR / Sortino)

### The Problem: Normal Distribution Assumptions

MVO defines risk as variance (σ²), assuming normal distributions. In reality:
- Stock returns exhibit "fat tails" (kurtosis)
- Negative skewness means extreme losses happen more often than predicted

### The Solution: Downside Risk Measures

- **CVaR (Conditional Value at Risk)**: Expected loss given we're in the tail
- **Sortino Ratio**: Like Sharpe, but only penalizes downside volatility

In [None]:
# ============================================================
# DOWNSIDE RISK OPTIMIZATION (CVaR / SORTINO)
# ============================================================

def compute_var_cvar(
    returns: np.ndarray,
    weights: np.ndarray,
    confidence: float = 0.95
) -> Tuple[float, float]:
    """
    Compute Value at Risk (VaR) and Conditional VaR (Expected Shortfall).
    
    Parameters
    ----------
    returns : np.ndarray
        T x N matrix of returns.
    weights : np.ndarray
        Portfolio weights (N,).
    confidence : float
        Confidence level (e.g., 0.95 for 95% VaR).
    
    Returns
    -------
    Tuple[float, float]
        (VaR, CVaR) as positive numbers representing loss.
    """
    # Portfolio returns
    port_returns = returns @ weights
    
    # VaR: quantile of the loss distribution
    alpha = 1 - confidence
    var = -np.percentile(port_returns, alpha * 100)
    
    # CVaR: expected loss given we're beyond VaR
    losses = -port_returns
    cvar = np.mean(losses[losses >= var])
    
    return var, cvar


def compute_sortino_ratio(
    returns: np.ndarray,
    weights: np.ndarray,
    risk_free_rate: float = 0.0,
    target_return: float = 0.0
) -> float:
    """
    Compute Sortino ratio (return over downside deviation).
    
    Parameters
    ----------
    returns : np.ndarray
        T x N matrix of returns.
    weights : np.ndarray
        Portfolio weights.
    risk_free_rate : float
        Risk-free rate.
    target_return : float
        Minimum acceptable return (MAR).
    
    Returns
    -------
    float
        Sortino ratio.
    """
    port_returns = returns @ weights
    excess_return = np.mean(port_returns) - risk_free_rate
    
    # Downside deviation
    downside_returns = np.minimum(port_returns - target_return, 0)
    downside_std = np.sqrt(np.mean(downside_returns ** 2))
    
    if downside_std > 0:
        return excess_return / downside_std
    return 0.0


def min_cvar_portfolio(
    returns_df: pd.DataFrame,
    confidence: float = 0.95,
    target_return: Optional[float] = None
) -> Dict:
    """
    Optimize portfolio to minimize CVaR (Conditional Value at Risk).
    
    Parameters
    ----------
    returns_df : pd.DataFrame
        Returns DataFrame.
    confidence : float
        Confidence level.
    target_return : float, optional
        Target portfolio return constraint.
    
    Returns
    -------
    Dict
        Portfolio results.
    """
    returns = returns_df.values
    T, n = returns.shape
    
    def cvar_objective(w):
        _, cvar = compute_var_cvar(returns, w, confidence)
        return cvar
    
    constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}]
    if target_return is not None:
        mu = np.mean(returns, axis=0)
        constraints.append({'type': 'eq', 'fun': lambda w: w @ mu - target_return})
    
    bounds = [(0, 1) for _ in range(n)]
    w0 = np.ones(n) / n
    
    result = minimize(
        cvar_objective, w0, method='SLSQP',
        bounds=bounds, constraints=constraints,
        options={'ftol': 1e-8, 'maxiter': 500}
    )
    
    weights = result.x
    port_returns = returns @ weights
    var, cvar = compute_var_cvar(returns, weights, confidence)
    
    return {
        'weights': weights,
        'return': np.mean(port_returns),
        'risk': np.std(port_returns),
        'var': var,
        'cvar': cvar,
        'confidence': confidence
    }


def max_sortino_portfolio(
    returns_df: pd.DataFrame,
    risk_free_rate: float = 0.0,
    target_return: float = 0.0
) -> Dict:
    """
    Optimize portfolio to maximize Sortino ratio.
    
    Parameters
    ----------
    returns_df : pd.DataFrame
        Returns DataFrame.
    risk_free_rate : float
        Risk-free rate.
    target_return : float
        Minimum acceptable return.
    
    Returns
    -------
    Dict
        Portfolio results.
    """
    returns = returns_df.values
    n = returns.shape[1]
    
    def neg_sortino(w):
        return -compute_sortino_ratio(returns, w, risk_free_rate, target_return)
    
    constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
    bounds = [(0, 1) for _ in range(n)]
    w0 = np.ones(n) / n
    
    result = minimize(
        neg_sortino, w0, method='SLSQP',
        bounds=bounds, constraints=constraints,
        options={'ftol': 1e-8, 'maxiter': 500}
    )
    
    weights = result.x
    port_returns = returns @ weights
    sortino = compute_sortino_ratio(returns, weights, risk_free_rate, target_return)
    
    return {
        'weights': weights,
        'return': np.mean(port_returns),
        'risk': np.std(port_returns),
        'sortino_ratio': sortino,
        'sharpe_ratio': (np.mean(port_returns) - risk_free_rate) / np.std(port_returns)
    }


print("Downside risk optimization (CVaR/Sortino) defined.")

---

## 5. Hierarchical Risk Parity (HRP)

### The Problem: Traditional Optimization Instability

Standard MVO requires inverting the covariance matrix, which:
- Can be numerically unstable
- Doesn't account for hierarchical structure in markets
- Requires return predictions

### The Solution: Hierarchical Risk Parity (HRP)

Developed by Marcos Lopez de Prado, HRP:
1. Uses agglomerative hierarchical clustering on correlation distance
2. Groups stocks based on similarity
3. Allocates risk equally across clusters
4. **Avoids covariance matrix inversion entirely**

In [None]:
# ============================================================
# HIERARCHICAL RISK PARITY (HRP)
# ============================================================

def correlation_distance(corr_matrix: np.ndarray) -> np.ndarray:
    """
    Convert correlation matrix to distance matrix.
    
    Distance = sqrt(0.5 * (1 - correlation))
    
    Parameters
    ----------
    corr_matrix : np.ndarray
        Correlation matrix.
    
    Returns
    -------
    np.ndarray
        Distance matrix.
    """
    return np.sqrt(0.5 * (1 - corr_matrix))


def quasi_diagonalize(link: np.ndarray) -> List[int]:
    """
    Quasi-diagonalize the covariance matrix by reordering according to
    hierarchical clustering dendrogram.
    
    Parameters
    ----------
    link : np.ndarray
        Linkage matrix from hierarchical clustering.
    
    Returns
    -------
    List[int]
        Sorted list of original indices.
    """
    return list(leaves_list(link))


def cluster_variance(
    cov_matrix: np.ndarray,
    cluster_items: List[int]
) -> float:
    """
    Compute variance of an inverse-variance weighted cluster.
    
    Parameters
    ----------
    cov_matrix : np.ndarray
        Full covariance matrix.
    cluster_items : List[int]
        Indices of items in the cluster.
    
    Returns
    -------
    float
        Cluster variance.
    """
    # Extract sub-covariance matrix
    sub_cov = cov_matrix[np.ix_(cluster_items, cluster_items)]
    
    # Inverse-variance weights within cluster
    ivp = 1.0 / np.diag(sub_cov)
    ivp /= ivp.sum()
    
    # Cluster variance
    return float(ivp @ sub_cov @ ivp)


def recursive_bisection(
    cov_matrix: np.ndarray,
    sorted_indices: List[int]
) -> np.ndarray:
    """
    Recursive bisection for HRP allocation.
    
    Recursively splits the sorted asset list and allocates weights
    based on inverse cluster variance.
    
    Parameters
    ----------
    cov_matrix : np.ndarray
        Covariance matrix.
    sorted_indices : List[int]
        Quasi-diagonalized asset indices.
    
    Returns
    -------
    np.ndarray
        HRP weights for each asset.
    """
    n = cov_matrix.shape[0]
    weights = np.ones(n)
    cluster_items = [sorted_indices]
    
    while len(cluster_items) > 0:
        # Split each cluster into two sub-clusters
        new_clusters = []
        for cluster in cluster_items:
            if len(cluster) > 1:
                # Split in half
                mid = len(cluster) // 2
                left = cluster[:mid]
                right = cluster[mid:]
                
                # Compute cluster variances
                var_left = cluster_variance(cov_matrix, left)
                var_right = cluster_variance(cov_matrix, right)
                
                # Allocation factor (inverse variance weighted)
                alpha = 1 - var_left / (var_left + var_right)
                
                # Scale weights
                for i in left:
                    weights[i] *= alpha
                for i in right:
                    weights[i] *= (1 - alpha)
                
                new_clusters.append(left)
                new_clusters.append(right)
        
        cluster_items = [c for c in new_clusters if len(c) > 1]
    
    return weights / weights.sum()


def hierarchical_risk_parity(
    returns_df: pd.DataFrame,
    linkage_method: str = 'single'
) -> Dict:
    """
    Compute Hierarchical Risk Parity (HRP) portfolio.
    
    HRP uses machine learning clustering to build portfolios that:
    - Don't require covariance matrix inversion
    - Don't need expected return estimates
    - Are more stable and diversified
    
    Parameters
    ----------
    returns_df : pd.DataFrame
        Returns DataFrame.
    linkage_method : str
        Hierarchical clustering linkage method.
        Options: 'single', 'complete', 'average', 'ward'
    
    Returns
    -------
    Dict
        Portfolio results including weights, clustering info.
    """
    returns = returns_df.values
    n = returns.shape[1]
    asset_names = list(returns_df.columns)
    
    # Compute correlation and covariance matrices
    cov_matrix = np.cov(returns, rowvar=False)
    std = np.sqrt(np.diag(cov_matrix))
    corr_matrix = cov_matrix / np.outer(std, std)
    np.fill_diagonal(corr_matrix, 1.0)
    
    # Convert correlation to distance
    dist_matrix = correlation_distance(corr_matrix)
    
    # Hierarchical clustering
    # Convert distance matrix to condensed form for linkage
    condensed_dist = squareform(dist_matrix, checks=False)
    link = linkage(condensed_dist, method=linkage_method)
    
    # Quasi-diagonalize (get sorted order)
    sorted_indices = quasi_diagonalize(link)
    
    # Recursive bisection for weight allocation
    weights = recursive_bisection(cov_matrix, sorted_indices)
    
    # Portfolio metrics
    port_returns = returns @ weights
    port_return = np.mean(port_returns)
    port_risk = np.std(port_returns)
    
    return {
        'weights': weights,
        'asset_names': asset_names,
        'return': port_return,
        'risk': port_risk,
        'sharpe': port_return / port_risk if port_risk > 0 else 0,
        'sorted_indices': sorted_indices,
        'linkage_matrix': link,
        'correlation_matrix': corr_matrix,
        'distance_matrix': dist_matrix
    }


def plot_hrp_dendrogram(
    hrp_result: Dict,
    title: str = 'HRP Clustering Dendrogram'
) -> None:
    """
    Plot the hierarchical clustering dendrogram.
    
    Parameters
    ----------
    hrp_result : Dict
        Result from hierarchical_risk_parity().
    title : str
        Plot title.
    """
    plt.figure(figsize=(12, 6))
    dendrogram(
        hrp_result['linkage_matrix'],
        labels=hrp_result['asset_names'],
        leaf_rotation=45
    )
    plt.title(title)
    plt.xlabel('Assets')
    plt.ylabel('Distance')
    plt.tight_layout()
    plt.show()


print("Hierarchical Risk Parity (HRP) defined.")

---

## 6. Walk-Forward Optimization (Rolling Backtest)

### The Problem: In-Sample Overfitting

Computing optimal weights over the entire dataset means the model has "future knowledge" of how stocks perform. This leads to overfitting and unrealistic expectations.

### The Solution: Walk-Forward Optimization

Use a rolling window approach:
1. Train on historical data (e.g., 2018-2020)
2. Test on out-of-sample data (e.g., 2021)
3. Roll the window forward and repeat

This is the **only way** to validate a portfolio strategy's true predictive power.

In [None]:
# ============================================================
# WALK-FORWARD OPTIMIZATION (ROLLING BACKTEST)
# ============================================================

def walk_forward_backtest(
    price_df: pd.DataFrame,
    optimizer_func,
    train_window: int = 252,
    test_window: int = 21,
    rebalance_freq: int = 21,
    risk_free_rate: float = 0.0,
    verbose: bool = True
) -> Dict:
    """
    Perform walk-forward (out-of-sample) backtesting.
    
    This function rolls through time, using past data to compute
    weights and then testing on future data.
    
    Parameters
    ----------
    price_df : pd.DataFrame
        Price DataFrame with 'date' column and stock price columns.
    optimizer_func : callable
        Function that takes returns_df and returns a dict with 'weights'.
    train_window : int
        Number of trading days for training.
    test_window : int
        Number of trading days for testing.
    rebalance_freq : int
        Days between rebalancing.
    risk_free_rate : float
        Daily risk-free rate.
    verbose : bool
        Print progress information.
    
    Returns
    -------
    Dict
        Backtest results including returns, metrics, weights history.
    """
    # Prepare data
    df = price_df.copy()
    df['date'] = pd.to_datetime(df['date'])
    df = df.sort_values('date').reset_index(drop=True)
    
    price_cols = [c for c in df.columns if c != 'date']
    prices = df[price_cols].values
    dates = df['date'].values
    
    # Compute returns
    returns = prices[1:] / prices[:-1] - 1
    return_dates = dates[1:]
    
    T = len(returns)
    n_assets = len(price_cols)
    
    # Results storage
    portfolio_returns = []
    portfolio_dates = []
    weights_history = []
    rebalance_dates = []
    
    # Walk forward
    current_weights = np.ones(n_assets) / n_assets  # Start equal weighted
    last_rebalance = 0
    
    start_idx = train_window
    
    if verbose:
        print(f"Walk-Forward Backtest: {len(price_cols)} assets")
        print(f"Train window: {train_window} days, Test window: {test_window} days")
        print(f"Rebalance frequency: {rebalance_freq} days")
        print("-" * 50)
    
    for t in range(start_idx, T):
        # Check if we need to rebalance
        if t - last_rebalance >= rebalance_freq or t == start_idx:
            # Use training window to compute weights
            train_start = max(0, t - train_window)
            train_returns = returns[train_start:t]
            
            # Create DataFrame for optimizer
            train_df = pd.DataFrame(train_returns, columns=price_cols)
            
            try:
                result = optimizer_func(train_df)
                if isinstance(result, dict) and 'weights' in result:
                    current_weights = result['weights']
                elif isinstance(result, dict):
                    # Handle nested results (e.g., {'mrr': {'weights': ...}})
                    for key in ['mrr', 'gmvp', 'portfolio']:
                        if key in result and 'weights' in result[key]:
                            current_weights = result[key]['weights']
                            break
            except Exception as e:
                if verbose:
                    print(f"Optimization failed at {return_dates[t]}: {e}")
                # Keep previous weights
            
            last_rebalance = t
            rebalance_dates.append(return_dates[t])
            weights_history.append(current_weights.copy())
            
            if verbose and len(rebalance_dates) % 10 == 1:
                print(f"Rebalanced at {return_dates[t]}")
        
        # Compute portfolio return for day t
        day_return = returns[t] @ current_weights
        portfolio_returns.append(day_return)
        portfolio_dates.append(return_dates[t])
    
    # Convert to arrays
    portfolio_returns = np.array(portfolio_returns)
    
    # Compute metrics
    cumulative_return = np.prod(1 + portfolio_returns) - 1
    annual_return = (1 + cumulative_return) ** (252 / len(portfolio_returns)) - 1
    annual_volatility = np.std(portfolio_returns) * np.sqrt(252)
    sharpe_ratio = (np.mean(portfolio_returns) - risk_free_rate) / np.std(portfolio_returns) * np.sqrt(252)
    
    # Maximum drawdown
    cumulative = np.cumprod(1 + portfolio_returns)
    running_max = np.maximum.accumulate(cumulative)
    drawdowns = (cumulative - running_max) / running_max
    max_drawdown = np.min(drawdowns)
    
    # Sortino ratio
    downside_returns = np.minimum(portfolio_returns, 0)
    downside_std = np.sqrt(np.mean(downside_returns ** 2)) * np.sqrt(252)
    sortino_ratio = annual_return / downside_std if downside_std > 0 else 0
    
    if verbose:
        print("-" * 50)
        print("\nBacktest Results:")
        print(f"  Cumulative Return: {cumulative_return * 100:.2f}%")
        print(f"  Annual Return: {annual_return * 100:.2f}%")
        print(f"  Annual Volatility: {annual_volatility * 100:.2f}%")
        print(f"  Sharpe Ratio: {sharpe_ratio:.4f}")
        print(f"  Sortino Ratio: {sortino_ratio:.4f}")
        print(f"  Max Drawdown: {max_drawdown * 100:.2f}%")
        print(f"  Number of Rebalances: {len(rebalance_dates)}")
    
    return {
        'returns': portfolio_returns,
        'dates': portfolio_dates,
        'cumulative_return': cumulative_return,
        'annual_return': annual_return,
        'annual_volatility': annual_volatility,
        'sharpe_ratio': sharpe_ratio,
        'sortino_ratio': sortino_ratio,
        'max_drawdown': max_drawdown,
        'weights_history': weights_history,
        'rebalance_dates': rebalance_dates
    }


def compare_strategies(
    price_df: pd.DataFrame,
    strategies: Dict[str, callable],
    train_window: int = 252,
    rebalance_freq: int = 21,
    risk_free_rate: float = 0.0
) -> pd.DataFrame:
    """
    Compare multiple portfolio strategies using walk-forward testing.
    
    Parameters
    ----------
    price_df : pd.DataFrame
        Price DataFrame.
    strategies : Dict[str, callable]
        Dictionary mapping strategy names to optimizer functions.
    train_window : int
        Training window in days.
    rebalance_freq : int
        Rebalancing frequency in days.
    risk_free_rate : float
        Daily risk-free rate.
    
    Returns
    -------
    pd.DataFrame
        Comparison metrics for all strategies.
    """
    results = {}
    
    for name, optimizer in strategies.items():
        print(f"\n{'=' * 60}")
        print(f"Testing Strategy: {name}")
        print('=' * 60)
        
        result = walk_forward_backtest(
            price_df,
            optimizer,
            train_window=train_window,
            rebalance_freq=rebalance_freq,
            risk_free_rate=risk_free_rate,
            verbose=True
        )
        results[name] = result
    
    # Create comparison DataFrame
    comparison = pd.DataFrame({
        name: {
            'Cumulative Return (%)': res['cumulative_return'] * 100,
            'Annual Return (%)': res['annual_return'] * 100,
            'Annual Volatility (%)': res['annual_volatility'] * 100,
            'Sharpe Ratio': res['sharpe_ratio'],
            'Sortino Ratio': res['sortino_ratio'],
            'Max Drawdown (%)': res['max_drawdown'] * 100
        }
        for name, res in results.items()
    }).T
    
    return comparison, results


def plot_backtest_results(
    results: Dict[str, Dict],
    title: str = 'Strategy Comparison'
) -> None:
    """
    Plot cumulative returns for multiple strategies.
    
    Parameters
    ----------
    results : Dict[str, Dict]
        Dictionary of strategy results from walk_forward_backtest.
    title : str
        Plot title.
    """
    plt.figure(figsize=(12, 6))
    
    for name, res in results.items():
        cumulative = np.cumprod(1 + res['returns'])
        plt.plot(res['dates'], cumulative, label=name, linewidth=1.5)
    
    plt.xlabel('Date')
    plt.ylabel('Cumulative Return')
    plt.title(title)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()


print("Walk-Forward Optimization (Rolling Backtest) defined.")

---

## Visualization Utilities

In [None]:
# ============================================================
# VISUALIZATION UTILITIES
# ============================================================

def plot_portfolio_weights(
    weights: np.ndarray,
    asset_names: List[str],
    title: str = 'Portfolio Weights'
) -> None:
    """
    Plot portfolio weights as a bar chart.
    
    Parameters
    ----------
    weights : np.ndarray
        Portfolio weights.
    asset_names : List[str]
        Asset names.
    title : str
        Plot title.
    """
    plt.figure(figsize=(max(8, len(asset_names) * 0.5), 5))
    x = np.arange(len(asset_names))
    colors = ['steelblue' if w >= 0 else 'red' for w in weights]
    plt.bar(x, weights * 100, color=colors)
    plt.xticks(x, asset_names, rotation=45, ha='right')
    plt.ylabel('Weight (%)')
    plt.title(title)
    plt.axhline(0, color='black', linewidth=0.8)
    plt.tight_layout()
    plt.show()


def plot_efficient_frontier_comparison(
    returns_df: pd.DataFrame,
    portfolios: Dict[str, Dict],
    n_random: int = 2000,
    title: str = 'Efficient Frontier with Optimized Portfolios'
) -> None:
    """
    Plot efficient frontier with multiple portfolio strategies.
    
    Parameters
    ----------
    returns_df : pd.DataFrame
        Returns DataFrame.
    portfolios : Dict[str, Dict]
        Dictionary mapping portfolio names to results with 'return' and 'risk'.
    n_random : int
        Number of random portfolios to plot.
    title : str
        Plot title.
    """
    returns = returns_df.values
    n = returns.shape[1]
    mu = np.mean(returns, axis=0)
    Sigma = np.cov(returns, rowvar=False)
    
    # Generate random portfolios
    random_returns = []
    random_risks = []
    for _ in range(n_random):
        w = np.random.dirichlet(np.ones(n))
        r = w @ mu
        risk = np.sqrt(w @ Sigma @ w)
        random_returns.append(r)
        random_risks.append(risk)
    
    plt.figure(figsize=(10, 6))
    
    # Plot random portfolios
    plt.scatter(
        np.array(random_risks) * np.sqrt(TRADING_DAYS) * 100,
        np.array(random_returns) * TRADING_DAYS * 100,
        c='lightgray', alpha=0.3, s=10, label='Random'
    )
    
    # Plot optimized portfolios
    colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown']
    markers = ['o', 's', '^', 'D', 'v', 'p']
    
    for i, (name, port) in enumerate(portfolios.items()):
        if 'return' in port and 'risk' in port:
            plt.scatter(
                port['risk'] * np.sqrt(TRADING_DAYS) * 100,
                port['return'] * TRADING_DAYS * 100,
                c=colors[i % len(colors)],
                marker=markers[i % len(markers)],
                s=150, edgecolors='black', linewidth=1.5,
                label=name, zorder=5
            )
    
    plt.xlabel('Annual Risk (%)')
    plt.ylabel('Annual Return (%)')
    plt.title(title)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()


def print_portfolio_summary(name: str, result: Dict) -> None:
    """
    Print a formatted portfolio summary.
    
    Parameters
    ----------
    name : str
        Portfolio name.
    result : Dict
        Portfolio result dictionary.
    """
    print(f"\n{'=' * 60}")
    print(f"{name}")
    print('=' * 60)
    
    if 'weights' in result:
        weights = result['weights']
        if 'asset_names' in result:
            print("\nPortfolio Weights:")
            for name, w in zip(result['asset_names'], weights):
                if abs(w) > 0.001:
                    print(f"  {name:15s}: {w * 100:7.3f}%")
        else:
            print(f"\nWeights: {weights}")
    
    print("\nPerformance Metrics:")
    if 'return' in result:
        print(f"  Daily Return:   {result['return'] * 100:.4f}%")
        print(f"  Annual Return:  {result['return'] * TRADING_DAYS * 100:.2f}%")
    if 'risk' in result:
        print(f"  Daily Risk:     {result['risk'] * 100:.4f}%")
        print(f"  Annual Risk:    {result['risk'] * np.sqrt(TRADING_DAYS) * 100:.2f}%")
    if 'sharpe' in result:
        print(f"  Sharpe Ratio:   {result['sharpe'] * np.sqrt(TRADING_DAYS):.4f}")
    if 'sortino_ratio' in result:
        print(f"  Sortino Ratio:  {result['sortino_ratio']:.4f}")
    if 'cvar' in result:
        print(f"  CVaR ({result.get('confidence', 0.95)*100:.0f}%):    {result['cvar'] * 100:.4f}%")
    if 'shrinkage_intensity' in result:
        print(f"  Shrinkage Int.: {result['shrinkage_intensity']:.4f}")


print("Visualization utilities defined.")

---

## Example Usage and Demonstration

In [None]:
# ============================================================
# EXAMPLE: GENERATE SAMPLE DATA
# ============================================================

# Generate synthetic data for demonstration
# (Replace this with real data loading for actual analysis)

print("Generating sample data for demonstration...")
sample_data = generate_sample_data(n_assets=10, n_days=500, seed=42)
print(f"Sample data shape: {sample_data.shape}")
print(f"Date range: {sample_data['date'].min()} to {sample_data['date'].max()}")
print(f"Assets: {[c for c in sample_data.columns if c != 'date']}")
sample_data.head()

In [None]:
# ============================================================
# EXAMPLE: COMPARE ALL OPTIMIZATION METHODS
# ============================================================

# Compute returns
returns_df = compute_returns(sample_data)
print(f"Returns shape: {returns_df.shape}")

# 1. Black-Litterman Portfolio (with sample views)
print("\n" + "="*60)
print("1. BLACK-LITTERMAN MODEL")
print("="*60)

# Example views: STOCK_01 will outperform, STOCK_02 will underperform STOCK_03
views = [
    {'assets': 'STOCK_01', 'return': 0.0005, 'type': 'absolute', 'confidence': 0.8},
    {'assets': ['STOCK_02', 'STOCK_03'], 'return': 0.0002, 'type': 'relative', 'confidence': 0.6}
]

bl_result = black_litterman_portfolio(
    returns_df,
    views=views,
    risk_aversion=2.5,
    tau=0.05
)
print_portfolio_summary("Black-Litterman Portfolio", bl_result)

In [None]:
# 2. Ledoit-Wolf Shrinkage
print("\n" + "="*60)
print("2. LEDOIT-WOLF SHRINKAGE")
print("="*60)

lw_result = portfolio_optimizer_shrinkage(
    returns_df,
    risk_free_rate=RISK_FREE_RATE,
    shrinkage_target='constant_correlation'
)
print(f"\nShrinkage Intensity: {lw_result['shrinkage_intensity']:.4f}")
print(f"(0 = pure sample cov, 1 = pure structured target)")

print("\nGMVP Portfolio (Ledoit-Wolf):")
print(f"  Return: {lw_result['gmvp']['return'] * TRADING_DAYS * 100:.2f}%")
print(f"  Risk: {lw_result['gmvp']['risk'] * np.sqrt(TRADING_DAYS) * 100:.2f}%")

print("\nMax Sharpe Portfolio (Ledoit-Wolf):")
print(f"  Return: {lw_result['mrr']['return'] * TRADING_DAYS * 100:.2f}%")
print(f"  Risk: {lw_result['mrr']['risk'] * np.sqrt(TRADING_DAYS) * 100:.2f}%")
print(f"  Sharpe: {lw_result['mrr']['sharpe'] * np.sqrt(TRADING_DAYS):.4f}")

In [None]:
# 3. EWMA Dynamic Volatility
print("\n" + "="*60)
print("3. EWMA DYNAMIC VOLATILITY")
print("="*60)

ewma_result = portfolio_optimizer_ewma(
    returns_df,
    risk_free_rate=RISK_FREE_RATE,
    lambda_decay=0.94
)

print("\nGMVP Portfolio (EWMA):")
print(f"  Return: {ewma_result['gmvp']['return'] * TRADING_DAYS * 100:.2f}%")
print(f"  Risk: {ewma_result['gmvp']['risk'] * np.sqrt(TRADING_DAYS) * 100:.2f}%")

print("\nMax Sharpe Portfolio (EWMA):")
print(f"  Return: {ewma_result['mrr']['return'] * TRADING_DAYS * 100:.2f}%")
print(f"  Risk: {ewma_result['mrr']['risk'] * np.sqrt(TRADING_DAYS) * 100:.2f}%")
print(f"  Sharpe: {ewma_result['mrr']['sharpe'] * np.sqrt(TRADING_DAYS):.4f}")

In [None]:
# 4. Downside Risk Optimization
print("\n" + "="*60)
print("4. DOWNSIDE RISK OPTIMIZATION")
print("="*60)

# Min CVaR Portfolio
cvar_result = min_cvar_portfolio(returns_df, confidence=0.95)
print("\nMinimum CVaR Portfolio:")
print(f"  Return: {cvar_result['return'] * TRADING_DAYS * 100:.2f}%")
print(f"  Risk: {cvar_result['risk'] * np.sqrt(TRADING_DAYS) * 100:.2f}%")
print(f"  VaR (95%): {cvar_result['var'] * 100:.4f}%")
print(f"  CVaR (95%): {cvar_result['cvar'] * 100:.4f}%")

# Max Sortino Portfolio
sortino_result = max_sortino_portfolio(returns_df, risk_free_rate=RISK_FREE_RATE)
print("\nMaximum Sortino Portfolio:")
print(f"  Return: {sortino_result['return'] * TRADING_DAYS * 100:.2f}%")
print(f"  Risk: {sortino_result['risk'] * np.sqrt(TRADING_DAYS) * 100:.2f}%")
print(f"  Sharpe Ratio: {sortino_result['sharpe_ratio'] * np.sqrt(TRADING_DAYS):.4f}")
print(f"  Sortino Ratio: {sortino_result['sortino_ratio'] * np.sqrt(TRADING_DAYS):.4f}")

In [None]:
# 5. Hierarchical Risk Parity
print("\n" + "="*60)
print("5. HIERARCHICAL RISK PARITY (HRP)")
print("="*60)

hrp_result = hierarchical_risk_parity(returns_df, linkage_method='single')
print_portfolio_summary("HRP Portfolio", hrp_result)

# Plot dendrogram
print("\nHRP Clustering Dendrogram:")
plot_hrp_dendrogram(hrp_result, title='HRP Asset Clustering')

In [None]:
# ============================================================
# COMPARE ALL METHODS ON EFFICIENT FRONTIER
# ============================================================

# Collect all portfolio results
all_portfolios = {
    'Black-Litterman': bl_result,
    'Ledoit-Wolf MRR': lw_result['mrr'],
    'EWMA MRR': ewma_result['mrr'],
    'Min CVaR': cvar_result,
    'Max Sortino': sortino_result,
    'HRP': hrp_result
}

plot_efficient_frontier_comparison(
    returns_df,
    all_portfolios,
    n_random=3000,
    title='Efficient Frontier: Comparison of Advanced Optimization Methods'
)

In [None]:
# ============================================================
# 6. WALK-FORWARD BACKTESTING
# ============================================================

print("\n" + "="*60)
print("6. WALK-FORWARD BACKTESTING")
print("="*60)

# Define strategies to compare
def equal_weight_optimizer(returns_df):
    n = returns_df.shape[1]
    return {'weights': np.ones(n) / n}

def hrp_optimizer(returns_df):
    return hierarchical_risk_parity(returns_df)

def lw_optimizer(returns_df):
    result = portfolio_optimizer_shrinkage(returns_df, shrinkage_target='constant_correlation')
    return result['mrr']

def sortino_optimizer(returns_df):
    return max_sortino_portfolio(returns_df)

strategies = {
    'Equal Weight': equal_weight_optimizer,
    'HRP': hrp_optimizer,
    'Ledoit-Wolf': lw_optimizer,
    'Max Sortino': sortino_optimizer
}

# Run comparison
comparison_df, backtest_results = compare_strategies(
    sample_data,
    strategies,
    train_window=126,  # 6 months
    rebalance_freq=21,  # Monthly
    risk_free_rate=RISK_FREE_RATE
)

print("\n" + "="*60)
print("STRATEGY COMPARISON SUMMARY")
print("="*60)
print(comparison_df.round(4))

In [None]:
# Plot cumulative returns comparison
plot_backtest_results(
    backtest_results,
    title='Walk-Forward Backtest: Strategy Comparison'
)

---

## Summary

This notebook implements six advanced portfolio optimization techniques that address the limitations of classical Mean-Variance Optimization:

| Problem | Classical MVO Limitation | Solution Implemented |
|---------|-------------------------|---------------------|
| Estimation Error | Uses historical means blindly | **Black-Litterman Model** |
| Covariance Instability | Sample covariance is noisy | **Ledoit-Wolf Shrinkage** |
| Static Risk | Assumes constant volatility | **EWMA Dynamic Volatility** |
| Normal Assumptions | Ignores fat tails/skewness | **CVaR/Sortino Optimization** |
| Matrix Inversion | Numerical instability | **Hierarchical Risk Parity** |
| In-Sample Bias | Overfits to historical data | **Walk-Forward Backtesting** |

### Key Takeaways

1. **Black-Litterman** provides more intuitive and stable weights by starting from market equilibrium
2. **Shrinkage estimators** reduce the impact of spurious correlations
3. **EWMA** gives more weight to recent market conditions
4. **CVaR/Sortino** focus on actual downside risk, not symmetric variance
5. **HRP** uses machine learning clustering and doesn't require matrix inversion
6. **Walk-forward testing** is essential for validating any portfolio strategy

### Usage with Real Data

To use these methods with real stock data:
1. Load data using `read_stock_data_from_folder()` or your own data source
2. Filter to desired date range and stocks using `filter_dataset()`
3. Apply any of the optimization methods
4. Validate with `walk_forward_backtest()` before deploying