# Portfolio Optimization - Case Study

## Business Overview
**Client**: Investment Management Firm  
**Objective**: Build optimized portfolios with superior risk-adjusted returns vs benchmark  
**Investment Horizon**: Medium to long-term (2+ years)

**Comparison Framework**: Equal-Weight vs sharpe-Optimized vs Benchmark (QQQ)

**Key Constraints**:
- No short selling allowed
- No leverage permitted  
- Monthly rebalancing frequency

## Expected Deliverables
- Executive KPI dashboard with key metrics (CAGR, sharpe, MDD, Calmar)
- Risk-return visualization with efficient frontier
- Stress test analysis across market regimes
- Reproducible results with documented assumptions

# Configuration, Global Parameters, Logging & Reproducibility Setups

In [29]:
# CONFIGURATION & SETUP

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import math
import warnings
import logging
import os
from itertools import combinations
from scipy.optimize import minimize

# Configure display and warnings
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.options.display.float_format = '{:,.6f}'.format # Consistent numeric formatting
warnings.filterwarnings('ignore') # Suppress non-critical warnings for cleaner
plt.style.use('seaborn-v0_8')
%matplotlib inline


This block sets up the analysis environment and global configuration for the portfolio optimization case study. It:

- Imports all required libraries for data handling, plotting, optimization, and statistics.
Configures display options, warning suppression, and plotting style, and ensures plots render inline in Jupyter.
- Resolves the data folder path robustly for both script and notebook contexts.
- Defines all key global parameters that govern the backtest: asset universe, date range, operating mode, risk constraints, benchmark, transaction costs, Monte Carlo settings, and walk-forward evaluation windows.

In [30]:
# GLOBAL PARAMETERS - EDIT THESE FOR DIFFERENT SCENARIOS

# Get the directory of the current notebook and navigate to the data folder
try:
    # Use __file__ when the notebook is run as a script (e.g., in a Binder or Colab environment)
    NOTEBOOK_DIR = os.path.dirname(os.path.abspath(__file__))
    DATA_FOLDER = os.path.join(NOTEBOOK_DIR, '..', 'data')
except NameError:
    # Fallback for local Jupyter environment (where __file__ may not exist)
    NOTEBOOK_DIR = os.path.dirname(os.getcwd())
    DATA_FOLDER = os.path.join(NOTEBOOK_DIR, 'data')


# Stock universe (add more tickers as needed)
STOCK_UNIVERSE = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA', 'META', 'TSLA', 'NFLX', 'AMD', 'CRM']
N_STOCKS_AUTO = 4  # Number of stocks to select automatically
START_DATE = '2020-01-01'
END_DATE = '2024-12-31'
MODE = 'snapshot'  # 'snapshot' or 'live'

# Risk Management
MAX_WEIGHT = 0.40  # Maximum allocation per asset (40%)
REBALANCE_FREQUENCY = 'M'  # Monthly rebalancing
COST_PER_SIDE = 0.001  # 10 bps transaction cost

# Benchmark & Analysis
BENCHMARK = 'QQQ'  # NASDAQ-100 ETF
MIN_DAYS = 500  # Minimum days required for analysis

# Monte Carlo & Reproducibility
N_SCENARIOS = 20000  # Number of portfolio scenarios
RNG_SEED = 42  # For reproducible results
PORTFOLIO_VALUE = 10000  # Initial portfolio value ($10k)

# Walk-Forward Analysis Parameters
TRAIN_WINDOW = 252  # Training window in days (1 trading year)
TEST_WINDOW = 63   # Test window in days (1 trading quarter)
RISK_FREE_RATE = 0.02  # Risk-free annualized rate for Sharpe calculation# Output Directory ConfigurationOUTPUT_DIR = os.getenv('OUTPUT_DIR', os.path.join(NOTEBOOK_DIR, 'outputs'))os.makedirs(OUTPUT_DIR, exist_ok=True)

This block ensures reproducibility, sets up logging, and defines small utility functions used throughout the portfolio analysis. It:

- Seeds both NumPy’s legacy global RNG and the newer Generator API for consistent random behavior across different NumPy calls.
- Configures a module-level logger and prints a run manifest of key parameters.
- Provides helper functions for RNG access, annualization of returns/volatility, Sharpe ratio with numerical stability, maximum drawdown, weight normalization, and converting weight arrays to labeled pandas Series.

In [31]:
# Set random seeds for reproducibility

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Timestamped, leveled logs
logger = logging.getLogger(__name__) # Module-level logger

# Utility functions for reproducibility
def _rng(seed=None):
    """Return a reproducible NumPy Generator."""
    return np.random.default_rng(RNG_SEED if seed is None else seed) # Deterministic generator, optional override

def _annualize_return(mean_daily):
    return mean_daily * 252.0 # Scale daily mean return to annualized (trading days)

def _annualize_vol(std_daily):
    return std_daily * np.sqrt(252.0) # Scale daily std to annualized volatility

def _sharpe(mean, std, rf=0.0, eps=1e-12):
    return (mean - rf) / max(std, eps) # Numerical guard to avoid division by zero when std≈0

def _max_drawdown(nav: pd.Series) -> float:
    roll_max = nav.cummax() # Running peak of NAV
    dd = nav / roll_max - 1.0 # Drawdown series as percentage from peak
    return float(dd.min()) if len(dd) else 0.0 # Worst peak-to-trough loss (returns 0.0 if empty)

def _normalize_weights(w: np.ndarray, eps=1e-12):
    s = np.sum(w)
    if s <= eps:        return np.zeros_like(w) # Avoid divide-by-zero; return zeros if vector sum is too small
    return w / s
def _to_series_weights(w: np.ndarray, tickers):
    return pd.Series(w, index=list(tickers), dtype=float) # Attach tickers as index for readability

# Log key parameters
logger.info("=== PORTFOLIO OPTIMIZATION ENHANCED PARAMETERS ===")
logger.info(f"Mode: {MODE}")
logger.info(f"Stock Universe: {STOCK_UNIVERSE}")
logger.info(f"Auto-selection count: {N_STOCKS_AUTO}")
logger.info(f"Date Range: {START_DATE} to {END_DATE}")
logger.info(f"Benchmark: {BENCHMARK}")
logger.info(f"Max Weight per Asset: {MAX_WEIGHT*100}%")
logger.info(f"Transaction Cost: {COST_PER_SIDE*10000} bps per side")
logger.info(f"Monte Carlo Scenarios: {N_SCENARIOS:,}")
logger.info(f"Random Seed: {RNG_SEED}")
logger.info(f"Portfolio Value: ${PORTFOLIO_VALUE:,}")
logger.info(f"Training Window: {TRAIN_WINDOW} days")
logger.info(f"Test Window: {TEST_WINDOW} days")
logger.info(f"Risk Free Rate: {RISK_FREE_RATE*100}%")



## DATA INGESTION & VALIDATION FUNCTIONS

This block defines functions for loading, validating, and processing stock price data from CSV files. It:

- Loads stock price data for a list of tickers from individual CSV files, filtering by a specified date range.
- Performs comprehensive data validation, including checks for missing values, duplicate dates, insufficient data, and extreme outliers.
- Handles missing data using forward/backward fill and removes stocks with too many gaps or constant prices.
- Computes clean daily percentage returns from the validated price data, handling infinite or NaN values.

In [32]:
def load_prices_from_csv(tickers, start_date, end_date, data_folder='./', strict=True):
    """
    Load stock prices from CSV files with comprehensive validation

    Parameters:
    -----------
    tickers : list
        List of stock ticker symbols
    start_date : str
        Start date in 'YYYY-MM-DD' format
    end_date : str
        End date in 'YYYY-MM-DD' format
    data_folder : str
        Path to folder containing CSV files
    strict : bool
        If True, raise errors for data issues; if False, issue warnings

    Returns:
    --------
    pd.DataFrame
        DataFrame with aligned stock prices (Close Price only)
    """

    logger.info(f"Loading price data for {len(tickers)} tickers...")

    stock_data = {} # per-ticker Close Price series
    failed_tickers = [] # collect tickers that fail any step

    for ticker in tickers:
        try:
            # Construct file path
            file_path = os.path.join(data_folder, f"{ticker}.csv")

            if not os.path.exists(file_path):
                logger.warning(f"File not found: {file_path}")
                failed_tickers.append(ticker)
                continue

            # Load CSV with date parsing
            df = pd.read_csv(file_path, parse_dates=['Date'], index_col='Date')

            # Filter date range
            df = df.loc[start_date:end_date]

            if df.empty:
                logger.warning(f"No data for {ticker} in specified date range")
                failed_tickers.append(ticker)
                continue

            # Store only Close Price
            stock_data[ticker] = df['Close Price']
            logger.info(f"✅ {ticker}: {len(df)} records loaded") # keep only the Close Price

        except Exception as e:
            logger.error(f"Error loading {ticker}: {str(e)}")
            failed_tickers.append(ticker)

    if not stock_data:
        raise ValueError("No valid stock data loaded!")

    # Create aligned DataFrame
    stock_adj_close = pd.DataFrame(stock_data)

    # Data validation (shape/quality checks, NA handling, ticker filtering)
    stock_adj_close = validate_stock_data(stock_adj_close, tickers, failed_tickers, strict)

    logger.info(f"📊 Final dataset: {len(stock_adj_close.columns)} stocks, {len(stock_adj_close)} days")
    return stock_adj_close

def validate_stock_data(df, original_tickers, failed_tickers, strict=True):
    """
    Comprehensive data validation and cleaning
    """
    logger.info("Performing data validation...")

    # Report failed tickers
    if failed_tickers:
        logger.warning(f"Failed to load: {failed_tickers}")

    # Check minimum data requirements
    if len(df) < MIN_DAYS:
        message = f"Dataset has only {len(df)} days (minimum: {MIN_DAYS})"
        if strict:
            raise ValueError(message) # hard fail if strict
        else:
            logger.warning(message) # proceed with caution if not strict

    # Handle missing values
    missing_data = df.isnull().sum()
    if missing_data.any():
        logger.warning("Missing data detected:")
        for ticker, missing_count in missing_data[missing_data > 0].items():
            logger.warning(f"  {ticker}: {missing_count} missing values")

        # Forward fill missing values
        df = df.fillna(method='ffill').fillna(method='bfill') # impute gaps by forward/backward fill
        logger.info("Missing values filled using forward/backward fill")

    # Check for duplicated dates
    if df.index.duplicated().any():
        logger.warning("Duplicate dates found - removing duplicates")
        df = df[~df.index.duplicated(keep='first')] # keep first occurrence

    # Remove stocks with insufficient data
    min_valid_ratio = 0.95  # Require 95% valid data
    for ticker in df.columns.copy():
        valid_ratio = df[ticker].notna().sum() / len(df)
        if valid_ratio < min_valid_ratio:
            logger.warning(f"Removing {ticker}: only {valid_ratio:.1%} valid data")
            df = df.drop(columns=[ticker])

    # Check for extreme outliers and constant series
    returns = df.pct_change().dropna()
    for ticker in returns.columns.copy():
        # Check for constant prices (no variation)
        if returns[ticker].std() < 1e-6:
            logger.warning(f"Removing {ticker}: constant price series")
            df = df.drop(columns=[ticker])
            continue

        # Check for extreme movements (>50% daily change) for awareness
        extreme_moves = (abs(returns[ticker]) > 0.5).sum()
        if extreme_moves > 0:
            logger.warning(f"{ticker}: {extreme_moves} extreme daily moves (>50%)")

    if df.empty:
        raise ValueError("No valid stocks remaining after validation!")

    logger.info(f"✅ Validation complete: {len(df.columns)} stocks, {len(df)} days")
    return df

def compute_returns(prices_df: pd.DataFrame) -> pd.DataFrame:
    """Compute daily returns from prices (pct_change), drop first NaNs."""
    returns = prices_df.pct_change().dropna(how='all')
    return returns.replace([np.inf, -np.inf], np.nan).dropna(axis=0, how='all') # remove infinities and all-NaN rows

## ENHANCED KPI CALCULATION FUNCTIONS

This block provides enhanced KPI computation utilities for portfolio analysis. It calculates comprehensive performance and risk metrics (annual return/volatility, Sharpe, max drawdown, Sortino, 95% VaR/CVaR, alpha, beta, information ratio) and lightweight alternatives, plus benchmark-relative KPIs (correlation, tracking error, active return). It assumes daily return series and annualizes appropriately, relying on helper functions (e.g., _annualize_return, _annualize_vol, _sharpe, _max_drawdown) defined elsewhere in the project.

In [33]:
def calculate_comprehensive_kpis(returns: pd.Series, benchmark_returns: pd.Series = None, rf: float = RISK_FREE_RATE) -> dict:
    """
    Calculate comprehensive KPIs including advanced risk metrics

    Parameters:
    -----------
    returns : pd.Series
        Portfolio returns series
    benchmark_returns : pd.Series, optional
        Benchmark returns for alpha/beta calculation
    rf : float
        Risk-free rate (annualized)

    Returns:
    --------
    dict
        Dictionary with comprehensive KPIs
    """
    if len(returns) == 0:
        # Graceful fallback: return zeroed metrics if no data
        return {"ann_return": 0.0, "ann_vol": 0.0, "sharpe": 0.0, "max_dd": 0.0,
                "sortino": 0.0, "var_95": 0.0, "cvar_95": 0.0, "alpha": 0.0, "beta": 0.0, "info_ratio": 0.0}

    # Basic metrics
    mu_d = float(returns.mean())  # Daily mean return
    sd_d = float(returns.std(ddof=0))  # Daily volatility (population std)
    ann_r = _annualize_return(mu_d)  # Annualize mean (assumes daily input)
    ann_v = _annualize_vol(sd_d)     # Annualize volatility (sqrt(252) convention)
    sr = _sharpe(ann_r, ann_v, rf)   # Sharpe uses annualized inputs and annual rf

    # Maximum Drawdown
    nav = (1 + returns).cumprod()
    mdd = _max_drawdown(nav) # Peak-to-trough % drawdown

    # Sortino Ratio (downside deviation)
    downside_returns = returns[returns < 0]
    downside_std = float(downside_returns.std(ddof=0)) if len(downside_returns) > 0 else 0.0
    sortino = _sharpe(ann_r, _annualize_vol(downside_std), rf) if downside_std > 0 else 0.0 # Uses downside vol

    # Value at Risk (VaR) and Conditional VaR (CVaR) at 95% confidence
    var_95 = float(np.percentile(returns, 5)) * np.sqrt(252)  # Left-tail (5th pct), annualized
    cvar_95 = float(returns[returns <= np.percentile(returns, 5)].mean()) * np.sqrt(252)  # Tail mean, annualized

    # Alpha, Beta, and Information Ratio vs benchmark
    alpha, beta, info_ratio = 0.0, 0.0, 0.0
    if benchmark_returns is not None and len(benchmark_returns) > 0:
        # Align returns to common days
        aligned_idx = returns.index.intersection(benchmark_returns.index)
        if len(aligned_idx) > 10:  # Need sufficient data
            aligned = pd.concat([returns.rename('p'), benchmark_returns.rename('b')], axis=1).dropna()
            port_aligned = aligned['p']
            bench_aligned = aligned['b']

            # Beta calculation cov(port, bench) / var(bench)
            if bench_aligned.std() > 0:
                beta = float(np.cov(port_aligned, bench_aligned)[0, 1] / np.var(bench_aligned))

            # Alpha calculation (annualized CAPM): alpha = Rp - [rf + beta*(Rb - rf)]
            bench_ann_return = _annualize_return(bench_aligned.mean())
            alpha = ann_r - (rf + beta * (bench_ann_return - rf))

            # Information Ratio = annualized active return / annualized tracking error
            active_returns = port_aligned - bench_aligned
            tracking_error = _annualize_vol(active_returns.std(ddof=0))
            if tracking_error > 0:
                info_ratio = _annualize_return(active_returns.mean()) / tracking_error

    return {
        "ann_return": ann_r,
        "ann_vol": ann_v,
        "sharpe": sr,
        "max_dd": mdd,
        "sortino": sortino,
        "var_95": var_95,
        "cvar_95": cvar_95,
        "alpha": alpha,
        "beta": beta,
        "info_ratio": info_ratio,
        "mu_d": mu_d,
        "vol_d": sd_d
    }

def compute_kpis(returns: pd.Series, rf: float = 0.0) -> dict:
    """Compute basic KPIs using the comprehensive calculator, then subset."""
    k = calculate_comprehensive_kpis(returns=returns, benchmark_returns=None, rf=rf)
    return {k2: k.get(k2, 0.0) for k2 in ('ann_return', 'ann_vol', 'sharpe', 'max_dd')}

def kpis_vs_benchmark(port_ret: pd.Series, bench_ret: pd.Series) -> dict:
    """Compute correlation, tracking error, and active return vs benchmark."""
    idx = port_ret.index.intersection(bench_ret.index)
    aligned = pd.concat([port_ret.rename('p'), bench_ret.rename('b')], axis=1).dropna()
    pr, br = aligned['p'], aligned['b']
    corr = float(np.corrcoef(pr, br)[0,1]) if pr.std() > 0 and br.std() > 0 else 0.0 # Only if both var
    active = pr - br
    te = float(active.std(ddof=0) * np.sqrt(252.0)) # Annualized tracking error
    ar = float((_annualize_return(pr.mean()) - _annualize_return(br.mean()))) # Annualized active return
    return {"corr": corr, "tracking_error": te, "active_return": ar}

## PORTFOLIO OPTIMIZATION FUNCTIONS

This block implements core portfolio optimization utilities used throughout the case study:

- Computes average pairwise (off-diagonal) correlation across assets to gauge diversification.
- Given asset return series and weights, computes daily portfolio mean/volatility, annualizes them, and derives Sharpe ratio using helper functions.
- Optimizes portfolio weights under box constraints and a full-investment constraint using:
    - SciPy SLSQP for three objectives: max Sharpe, minimum variance, and risk parity.
    - A Monte Carlo fallback that samples feasible weights (Dirichlet), enforces bounds, optionally penalizes turnover, and selects the best according to the chosen objective.
- Supports turnover penalties (L1 difference from previous weights), risk-free rate, and reproducible randomness via seed and global settings.

**Notes**
- returns_df is assumed to be daily returns; annualization is performed via helper functions: _annualize_return, _annualize_vol, _sharpe.
- Bounds default to [0, MAX_WEIGHT] and weights sum to sum_to (usually 1.0).
- Risk parity minimizes deviations from equal risk contribution.
- Helper utilities (_normalize_weights, _to_series_weights, _rng) and globals (MAX_WEIGHT, RISK_FREE_RATE, N_SCENARIOS, logger) are expected from the broader project context.

In [34]:
def average_offdiag_correlation(returns_df: pd.DataFrame) -> float:
    """Average off-diagonal correlation among assets."""
    if returns_df.shape[1] <= 1:
        return 0.0
    corr = returns_df.corr().values
    n = corr.shape[0]
    return float((corr.sum() - np.trace(corr)) / (n * (n - 1)))

def portfolio_stats_from_weights(returns_df: pd.DataFrame, weights: np.ndarray, rf: float = 0.0) -> dict:
    """Compute mean, vol, sharpe given asset returns and weights."""
    w = np.asarray(weights).reshape(-1)  # Ensure 1D weight vector
    mu = returns_df.mean().values        # Daily expected returns per asset
    cov = returns_df.cov().values        # Daily covariance matrix
    pmu_d = float(np.dot(w, mu))         # Daily portfolio expected return
    pvol_d = float(np.sqrt(max(np.dot(w, cov @ w), 0.0)))  # Daily portfolio volatility (stdev)
    ann_r = _annualize_return(pmu_d)     # Annualized return (helper handles frequency assumption)
    ann_v = _annualize_vol(pvol_d)       # Annualized volatility
    sr = _sharpe(ann_r, ann_v, rf)       # Sharpe ratio using annualized figures and rf
    return {"ann_return": ann_r, "ann_vol": ann_v, "sharpe": sr, "mu_d": pmu_d, "vol_d": pvol_d}

def optimize_weights(
    returns_df: pd.DataFrame,
    objective: str = 'max_sharpe',
    bounds: tuple = (0.0, MAX_WEIGHT),
    sum_to: float = 1.0,
    method: str = 'scipy',
    turnover_penalty: float = 0.0,
    prev_weights: np.ndarray = None,
    risk_free: float = RISK_FREE_RATE,
    seed: int = None,
) -> dict:
    """
    Optimize portfolio weights under constraints with enhanced objectives

    Parameters:
    -----------
    returns_df : pd.DataFrame
        Asset returns DataFrame
    objective : str
        'max_sharpe', 'min_var', or 'risk_parity'
    bounds : tuple
        (lower_bound, upper_bound) for weights
    sum_to : float
        Target sum of weights (usually 1.0)
    method : str
        'scipy' or 'mc' (Monte Carlo)
    turnover_penalty : float
        Penalty for turnover (L1 norm of weight changes)
    prev_weights : np.ndarray
        Previous weights for turnover calculation
    risk_free : float
        Risk-free rate
    seed : int
        Random seed

    Returns:
    --------
    dict
        Dictionary with optimized weights, KPIs, and method used
    """
    assets = list(returns_df.columns)
    n = len(assets)
    lb, ub = bounds
    lb = max(0.0, float(lb))  # Enforce no shorting if lb < 0 is passed
    ub = float(ub)
    if ub <= 0:
        raise ValueError("Upper bound must be > 0")

    mu = returns_df.mean().values
    cov = returns_df.cov().values
    x0 = np.full(n, sum_to / n, dtype=float)  # Start from equal weights
    x0 = np.clip(x0, lb, ub)                  # Respect box constraints
    x0 = _normalize_weights(x0) * sum_to      # Re-normalize to the simplex
    prev_w = np.zeros(n) if prev_weights is None else np.asarray(prev_weights).reshape(-1)

    def obj_sharpe(w):
        w = np.asarray(w)
        pmu_d = np.dot(w, mu)
        pvar = max(np.dot(w, cov @ w), 0.0)
        pstd_d = np.sqrt(pvar)
        ann_r = _annualize_return(pmu_d)
        ann_v = _annualize_vol(pstd_d)
        sharpe = _sharpe(ann_r, ann_v, risk_free)
        # Minimize negative Sharpe; add L1 turnover penalty vs previous weights
        pen = turnover_penalty * np.sum(np.abs(w - prev_w)) if turnover_penalty > 0 else 0.0
        return -sharpe + pen

    def obj_minvar(w):
        w = np.asarray(w)
        pen = turnover_penalty * np.sum(np.abs(w - prev_w)) if turnover_penalty > 0 else 0.0
        return max(np.dot(w, cov @ w), 0.0) + pen

    def obj_risk_parity(w):
        """Risk Parity objective: minimize sum of squared deviations from equal risk contribution"""
        w = np.asarray(w)
        covw = cov @ w
        rc = w * covw  # risk contributions using variance
        total_var = max(float(w @ covw), 1e-12)
        target = total_var / n
        pen = turnover_penalty * np.sum(np.abs(w - prev_w)) if turnover_penalty > 0 else 0.0
        return float(np.sum((rc - target) ** 2)) + pen

    def project_bounds(w):
        # Enforce box constraints and renormalize to sum_to
        w = np.clip(w, lb, ub)
        w = _normalize_weights(w)
        return w * sum_to

    if method == 'scipy':
        try:
            cons = (
                {'type': 'eq', 'fun': lambda w: np.sum(w) - sum_to},
            )
            bnds = tuple((lb, ub) for _ in range(n))

            if objective == 'max_sharpe':
                fun = obj_sharpe
            elif objective == 'min_var':
                fun = obj_minvar
            elif objective == 'risk_parity':
                fun = obj_risk_parity
            else:
                raise ValueError(f"Unknown objective: {objective}")

            # SLSQP handles linear equality constraints and box bounds well
            res = minimize(fun, x0, method='SLSQP', bounds=bnds, constraints=cons,
                         options={'maxiter': 200, 'ftol': 1e-9})

            if res.success:
                w_opt = project_bounds(res.x)  # Final guard against numerical drift
                stats = portfolio_stats_from_weights(returns_df, w_opt, rf=risk_free)
                return {
                    'weights': _to_series_weights(w_opt, assets),  # Return as pd.Series with tickers
                    'kpis': stats,
                    'method_used': 'scipy',
                }
        except Exception as e:
            logger.warning(f"SciPy optimization failed: {e}, falling back to Monte Carlo")

    # Monte Carlo fallback
    logger.info("Using Monte Carlo optimization")
    rng_local = _rng(seed)  # Project helper for reproducible RNG
    W = rng_local.dirichlet(alpha=np.ones(n), size=N_SCENARIOS)  # Sample weights on the simplex
    mask = (W <= ub + 1e-12).all(axis=1)  # Enforce upper bound per asset
    Wf = W[mask]

    if Wf.shape[0] < max(1000, n * 200):
        # If too few feasible samples, clip and renormalize rather than discard
        Wc = np.clip(W, lb, ub)
        Wc = (Wc / Wc.sum(axis=1, keepdims=True)) * sum_to
        Wf = Wc
    else:
        Wf = (Wf / Wf.sum(axis=1, keepdims=True)) * sum_to

    if objective == 'max_sharpe':
        pmu_d = Wf @ mu
        pvar = np.einsum('ij,jk,ik->i', Wf, cov, Wf)  # Vectorized portfolio variance
        pstd_d = np.sqrt(np.maximum(pvar, 0.0))
        ann_r = _annualize_return(pmu_d)
        ann_v = _annualize_vol(pstd_d)
        sharpe = (ann_r - risk_free) / np.maximum(ann_v, 1e-12)
        if turnover_penalty > 0:
            pen = turnover_penalty * np.sum(np.abs(Wf - prev_w.reshape(1, -1)), axis=1)
            sharpe = sharpe - pen
        idx = int(np.argmax(sharpe))
        w_opt = Wf[idx]
    elif objective == 'min_var':
        pvar = np.einsum('ij,jk,ik->i', Wf, cov, Wf)
        if turnover_penalty > 0:
            pen = turnover_penalty * np.sum(np.abs(Wf - prev_w.reshape(1, -1)), axis=1)
            score = pvar + pen
            idx = int(np.argmin(score))
        else:
            idx = int(np.argmin(pvar))
        w_opt = Wf[idx]
    elif objective == 'risk_parity':
        covW = Wf @ cov  # (m, n)
        rc = Wf * covW   # (m, n)
        total_var = np.sum(rc, axis=1)
        target = (total_var / n).reshape(-1, 1)
        score = np.sum((rc - target) ** 2, axis=1)  # Deviation from equal risk contribution
        if turnover_penalty > 0:
            score = score + turnover_penalty * np.sum(np.abs(Wf - prev_w.reshape(1, -1)), axis=1)
        idx = int(np.argmin(score))
        w_opt = Wf[idx]
    else:
        raise ValueError(f"Unknown objective: {objective}")

    w_opt = project_bounds(w_opt)  # Final projection to satisfy bounds/sum
    stats = portfolio_stats_from_weights(returns_df, w_opt, rf=risk_free)
    return {
        'weights': _to_series_weights(w_opt, assets),
        'kpis': stats,
        'method_used': 'mc',
    }

## STOCK SELECTION FUNCTIONS

Selects an optimal subset of assets from the universe using either exhaustive search (for small universes) or a greedy forward-selection heuristic (for larger ones). Over a specified lookback window, it:

* Computes returns and evaluates candidate portfolios.  
* Optimizes weights for each candidate using a max-Sharpe objective with per-asset weight caps.  
* Scores candidates (Sharpe only for exhaustive; Sharpe penalized by correlation for greedy to encourage diversification).  
* Returns the chosen tickers, a DataFrame of evaluated combinations with metrics, and the optimal weights for the best selection.


In [35]:
def select_optimal_stocks(
    prices_df: pd.DataFrame,
    N_stocks: int,
    lookback_days: int,
    method: str = 'greedy',
    seed: int = None,
) -> dict:
    """
    Select N_stocks from the universe using advanced selection methodology

    Parameters:
    -----------
    prices_df : pd.DataFrame
        Price data for all assets
    N_stocks : int
        Number of stocks to select
    lookback_days : int
        Number of days to look back for analysis
    method : str
        'exhaustive' for small universes, 'greedy' for larger ones
    seed : int
        Random seed for reproducibility

    Returns:
    --------
    dict
        Dictionary with selected tickers, analysis DataFrame, and best weights
    """
    assert N_stocks >= 1, "N_stocks must be >= 1"
    tickers = list(prices_df.columns)  # Universe tickers
    end_idx = prices_df.index[-1]
    start_idx = prices_df.index[max(0, len(prices_df) - lookback_days)]  # Start of lookback window
    px_lb = prices_df.loc[start_idx:end_idx]  # Price slice for lookback
    rets_lb = compute_returns(px_lb)  # Return series aligned to lookback window
    combos_rows = []  # Accumulates evaluation metrics per candidate combination

    if len(tickers) <= 10 and method == 'exhaustive':
        logger.info(f"Using exhaustive search for {len(tickers)} assets")
        best = None
        for combo in combinations(tickers, N_stocks):  # Evaluate all N_stocks-size combos
            sub = rets_lb[list(combo)].dropna()  # Require overlapping, non-NA window
            if sub.shape[0] < 2:  # Guard against too few observations
                continue
            opt = optimize_weights(sub, objective='max_sharpe', bounds=(0.0, MAX_WEIGHT),
                                 sum_to=1.0, method='scipy', seed=seed)  # Constrained max-Sharpe optimization
            k = opt['kpis']
            avg_corr = average_offdiag_correlation(sub)  # Cross-asset average correlation
            score = k['sharpe']  # Score by Sharpe (no correlation penalty in exhaustive mode)
            combos_rows.append({
                'combo': combo,
                'sharpe': k['sharpe'],
                'ann_return': k['ann_return'],
                'ann_vol': k['ann_vol'],
                'avg_corr': avg_corr,
                'score': score,
                'weights': opt['weights'],
            })
            if best is None or score > best['score']:
                best = combos_rows[-1]
        combos_df = pd.DataFrame(combos_rows)
        sel = list(best['combo']) if best else tickers[:N_stocks]
        best_weights = best['weights'] if best else pd.Series(np.full(N_stocks, 1.0 / N_stocks), index=sel)
        return {'selected_tickers': sel, 'combos_df': combos_df, 'best_weights': best_weights}

    # Greedy selection for larger universes
    logger.info(f"Using greedy search for {len(tickers)} assets")
    selected = []
    remaining = set(tickers)
    best_weights = None

    while len(selected) < N_stocks and remaining:
        best_local = None
        for cand in list(remaining):
            trial = selected + [cand]  # Try adding one new candidate
            sub = rets_lb[trial].dropna()  # Work with common, valid return window
            if sub.shape[0] < 2:
                continue
            opt = optimize_weights(sub, objective='max_sharpe', bounds=(0.0, MAX_WEIGHT),
                                 sum_to=1.0, method='scipy', seed=seed)
            avg_corr = average_offdiag_correlation(sub)
            score = opt['kpis']['sharpe'] * (1.0 - avg_corr)  # Penalize high correlation to encourage diversification
            row = {
                'combo': tuple(trial),
                'sharpe': opt['kpis']['sharpe'],
                'ann_return': opt['kpis']['ann_return'],
                'ann_vol': opt['kpis']['ann_vol'],
                'avg_corr': avg_corr,
                'score': score,
                'weights': opt['weights'],
            }
            if best_local is None or score > best_local['score']:
                best_local = row

        if best_local is None:
            break

        selected = list(best_local['combo'])  # Commit the best local addition
        best_weights = best_local['weights']  # Corresponding optimal weights for the current selection
        remaining = set(tickers) - set(selected)  # Continue with unselected assets
        combos_rows.append(best_local)

    combos_df = pd.DataFrame(combos_rows)
    if len(selected) > N_stocks:
        selected = selected[:N_stocks]
        best_weights = best_weights[selected]
        best_weights = best_weights / best_weights.sum()  # Re-normalize if truncated

    logger.info(f"Selected {len(selected)} stocks: {selected}")
    return {'selected_tickers': selected, 'combos_df': combos_df, 'best_weights': best_weights}

## SIMULATION & REBALANCING FUNCTIONS

Defines two core utilities for portfolio simulation:

- build_weights_schedule creates a time-indexed schedule of constant target weights at a given frequency between two dates.
- simulate_rebalance runs a NAV simulation that rebalances to the scheduled target weights at each rebalancing date, applies transaction costs and slippage based on portfolio turnover, and returns NAV, turnover events, cumulative costs, and summary KPIs

In [36]:
def build_weights_schedule(start_date: pd.Timestamp, end_date: pd.Timestamp, weights: pd.Series, freq: str = 'M') -> pd.DataFrame:
    """Create a schedule with constant weights at a given frequency between dates (inclusive)."""
    idx = pd.date_range(start=start_date, end=end_date, freq=freq)
    if len(idx) == 0:
        idx = pd.DatetimeIndex([start_date])  # Ensure at least one date if range is empty
    ws = pd.DataFrame(index=idx, columns=weights.index, dtype=float)  # Columns align to tickers in weights
    ws.loc[:, :] = weights.values
    return ws

def simulate_rebalance(
    prices_df: pd.DataFrame,
    weights_schedule: pd.DataFrame,
    rebalance_freq: str = 'M',
    cost_per_side: float = COST_PER_SIDE,
    slippage: float = 0.0,
) -> dict:
    """
    Simulate NAV with rebalancing at close, applying costs and tracking turnover

    Parameters:
    -----------
    prices_df : pd.DataFrame
        Price data for assets
    weights_schedule : pd.DataFrame
        Rebalancing schedule (index: dates, columns: tickers)
    rebalance_freq : str
        Rebalancing frequency
    cost_per_side : float
        Transaction cost per side
    slippage : float
        Additional slippage cost

    Returns:
    --------
    dict
        Dictionary with NAV series, turnover, costs, and KPIs
    """
    px = prices_df.copy()
    px = px[sorted(px.columns)]  # Stable column order for alignment
    rets = compute_returns(px)  # Price-to-return conversion (assumed defined elsewhere)
    ws = weights_schedule.copy().reindex(rets.index).dropna(how='all').fillna(0.0)  # Align to returns calendar
    all_cols = sorted(set(px.columns) | set(ws.columns))  # Union of tickers across data and schedule
    rets = rets.reindex(columns=all_cols).fillna(0.0)  # Missing assets -> zero returns
    ws = ws.reindex(columns=all_cols).fillna(0.0)      # Missing weights -> zero weight

    nav = 1.0  # Start NAV at 1.0 (scaling to dollars happens elsewhere if needed)
    current_weights = np.zeros(len(all_cols), dtype=float)  # Current portfolio weights (post-trade)
    nav_series = pd.Series(index=rets.index, dtype=float)
    turnover_series = pd.Series(0.0, index=rets.index)
    cum_cost_value = 0.0
    cum_cost_events = 0

    for t in rets.index:
        r_t = rets.loc[t].values
        day_ret = float(np.nansum(current_weights * r_t))  # Portfolio daily return from current weights
        nav *= (1.0 + day_ret)

        if t in ws.index and not ws.loc[t].isna().all():  # Rebalance on scheduled dates
            target = ws.loc[t].values.astype(float)  # Target weights for this rebalance
            w_eod_unnorm = current_weights * (1.0 + r_t)  # Drifted weights after price move
            s = w_eod_unnorm.sum()
            w_eod = w_eod_unnorm / s if s > 0 else current_weights.copy()  # Normalize to 100%
            delta = target - w_eod
            turnover = float(np.sum(np.abs(delta)))  # Total absolute weight change (two-sided measure)
            cost = (cost_per_side + slippage) * turnover * nav  # Costs in NAV terms (per-side + slippage)
            nav -= cost
            cum_cost_value += float(cost)
            cum_cost_events += 1 if turnover > 0 else 0
            turnover_series.loc[t] = turnover
            current_weights = target.copy()  # Apply target weights post-cost

        nav_series.loc[t] = nav  # Record end-of-day NAV

    returns_net = nav_series.pct_change().fillna(0.0)  # Net returns after costs
    kpis_net = compute_kpis(returns_net)  # Summary performance metrics (assumed defined elsewhere)
    return {
        'nav': nav_series,
        'turnover': turnover_series[turnover_series > 0],
        'cum_cost': float(cum_cost_value),
        'cum_cost_events': float(cum_cost_events),
        'kpis_net': kpis_net,
    }

## WALK-FORWARD ANALYSIS FUNCTIONS

Implements the walk-forward validation pipeline for the portfolio optimization case study. For each rolling fold, it:

- Splits the historical price series into train and test windows.
- Selects a subset of tickers, optimizes portfolio weights on the training window, and carries prior weights for turnover-aware optimization.
- Simulates out-of-sample performance with rebalancing and transaction costs.
- Collects per-fold in-sample (training) and out-of-sample KPIs and aggregates OOS NAV and KPIs across all folds.
- Returns detailed fold results, aggregated out-of-sample KPIs, and the combined OOS NAV series.

In [37]:
def walk_forward_evaluate(
    prices_df: pd.DataFrame,
    train_window: int = TRAIN_WINDOW,
    test_window: int = TEST_WINDOW,
    rebalance_freq: str = REBALANCE_FREQUENCY,
    params: dict = None,
) -> dict:
    """
    Walk-forward validation pipeline with comprehensive analysis

    Parameters:
    -----------
    prices_df : pd.DataFrame
        Price data for all assets
    train_window : int
        Training window size in days
    test_window : int
        Test window size in days
    rebalance_freq : str
        Rebalancing frequency
    params : dict
        Parameters for optimization

    Returns:
    --------
    dict
        Dictionary with fold results, aggregated KPIs, and OOS NAV
    """
    if params is None:
        params = {
            'N_stocks': N_STOCKS_AUTO,
            'selection_method': 'exhaustive' if len(prices_df.columns) <= 10 else 'greedy',  # auto method by universe size
            'objective': 'max_sharpe',
            'opt_method': 'scipy',
            'turnover_penalty': 0.0,            # set >0 to penalize weight changes vs previous fold
            'cost_per_side': COST_PER_SIDE,     # transaction cost per trade side
            'max_weight': MAX_WEIGHT,           # per-asset cap for weight constraints
            'seed': RNG_SEED,
        }

    px = prices_df.copy()  # avoid mutating caller's DataFrame
    dates = px.index       # time axis used for rolling splits
    folds = []             # per-fold artifacts (tickers, weights, KPIs)
    all_nav = []           # list of OOS NAV segments to concatenate later
    i = train_window       # start at first index after initial training window
    prev_w = None          # previous fold's weights (for turnover-aware optimization)
    rng_seed = params.get('seed', RNG_SEED)

    logger.info(f"Starting walk-forward analysis: train={train_window}, test={test_window}")

    # Advance by test_window each iteration; ensure a full test window remains
    while i + test_window <= len(dates):
        train_start = dates[i - train_window]
        train_end = dates[i - 1]
        test_start = dates[i]
        test_end = dates[i + test_window - 1]

        logger.info(f"Fold: Train {train_start.date()} to {train_end.date()}, Test {test_start.date()} to {test_end.date()}")

        px_tr = px.loc[train_start:train_end]

        # Stock selection on the training window (method can be exhaustive/greedy)
        sel = select_optimal_stocks(
            px_tr,
            N_stocks=params.get('N_stocks', N_STOCKS_AUTO),
            lookback_days=train_window,
            method=params.get('selection_method', 'exhaustive' if px_tr.shape[1] <= 10 else 'greedy'),
            seed=rng_seed,
        )
        tickers = sel['selected_tickers']

        # Weight optimization on selected tickers' returns (in-sample)
        rets_tr = compute_returns(px_tr[tickers]).dropna()
        opt = optimize_weights(
            rets_tr,
            objective=params.get('objective', 'max_sharpe'),
            bounds=(0.0, params.get('max_weight', MAX_WEIGHT)),  # long-only with per-asset cap
            sum_to=1.0,                                          # fully invested
            method=params.get('opt_method', 'scipy'),
            turnover_penalty=params.get('turnover_penalty', 0.0),
            prev_weights=prev_w if prev_w is not None else None, # enable turnover penalty if provided
            seed=rng_seed,
        )
        w = opt['weights']
        # Persist current weights (ordered by tickers) for next fold's turnover calculations
        prev_w = w.reindex(tickers).fillna(0.0).values

        # Out-of-sample simulation over the test window with rebalancing and costs
        ws = build_weights_schedule(test_start, test_end, w, freq=rebalance_freq)  # schedule of target weights
        sim = simulate_rebalance(
            px[tickers].loc[dates[0]:test_end],  # include history to allow consistent NAV series construction
            ws,
            rebalance_freq=rebalance_freq,
            cost_per_side=params.get('cost_per_side', COST_PER_SIDE)
        )
        nav = sim['nav'].loc[test_start:test_end]  # slice OOS segment only
        all_nav.append(nav)

        # KPIs: in-sample from optimizer; OOS from simulated NAV
        kpis_is = opt['kpis']
        kpis_oos = compute_kpis(nav.pct_change().fillna(0.0))

        folds.append({
            'train_start': train_start, 'train_end': train_end,
            'test_start': test_start, 'test_end': test_end,
            'tickers': tickers, 'weights': w,
            'kpis_is': kpis_is, 'kpis_oos': kpis_oos,
        })
        i += test_window  # move to next non-overlapping fold

    # Aggregate out-of-sample NAV and compute overall OOS KPIs
    nav_oos = pd.concat(all_nav).sort_index() if all_nav else pd.Series(dtype=float)
    agg_kpis = compute_kpis(nav_oos.pct_change().fillna(0.0)) if len(nav_oos) else {k: 0.0 for k in ('ann_return','ann_vol','sharpe','max_dd')}

    logger.info(f"Walk-forward complete: {len(folds)} folds, OOS Sharpe: {agg_kpis.get('sharpe', 0):.3f}")

    return {'folds': folds, 'agg_kpis_oos': agg_kpis, 'nav_oos': nav_oos}

## REPORTING & SUMMARY FUNCTIONS

This block provides two reporting utilities:

- generate_exec_summary: Builds a concise, markdown-formatted executive summary from in-sample, out-of-sample, and cost KPIs, including auto-generated recommendations based on thresholds.
- load_benchmark_data: Returns a benchmark time series aligned to your universe index. It first tries to load a real benchmark from disk; if unavailable, it synthesizes an equal-weight benchmark from the universe, normalized to a base of 100.

In [38]:
def generate_exec_summary(kpis_dict: dict) -> str:
    """
    Generate executive summary with key insights

    Parameters:
    -----------
    kpis_dict : dict
        Dictionary with 'is', 'oos', and 'costs' keys

    Returns:
    --------
    str
        Markdown formatted executive summary
    """
    # Safely pull nested KPI dicts with empty defaults to avoid KeyError
    is_k = kpis_dict.get('is', {})
    oos_k = kpis_dict.get('oos', {})
    costs = kpis_dict.get('costs', {})

    # Compose markdown lines (returns/vol are assumed as decimals; multiplied by 100 for % display)
    # If costs['cost_per_side'] not provided, falls back to global COST_PER_SIDE
    lines = [
        "## 📊 Executive Summary",
        "",
        f"**Performance Overview:**",
        f"- In-sample Sharpe Ratio: {is_k.get('sharpe', 0):.2f} | Out-of-sample Sharpe: {oos_k.get('sharpe', 0):.2f}",
        f"- Out-of-sample Annual Return: {oos_k.get('ann_return', 0)*100:.2f}% | Volatility: {oos_k.get('ann_vol', 0)*100:.2f}%",
        f"- Maximum Drawdown: {oos_k.get('max_dd', 0)*100:.2f}%",
        "",
        f"**Risk Management:**",
        f"- Rebalancing events: {costs.get('events', 0):.0f}",
        f"- Transaction cost per side: {costs.get('cost_per_side', COST_PER_SIDE)*10000:.1f} bps",
        "",
        f"**Recommendations:**",
        f"- {'✅ Strong' if oos_k.get('sharpe', 0) > 1.0 else '⚠️ Moderate' if oos_k.get('sharpe', 0) > 0.5 else '❌ Weak'} risk-adjusted performance",
        f"- {'✅ Acceptable' if abs(oos_k.get('max_dd', 0)) < 0.2 else '⚠️ High'} drawdown levels",
        f"- Monitor tracking error and consider adjusting weight constraints if needed",
    ]
    return "\n".join(lines)

def load_benchmark_data(prices_df: pd.DataFrame, benchmark_ticker: str = BENCHMARK) -> pd.Series:
    """
    Load benchmark data or create synthetic benchmark

    Parameters:
    -----------
    prices_df : pd.DataFrame
        Universe price data
    benchmark_ticker : str
        Benchmark ticker symbol

    Returns:
    --------
    pd.Series
        Benchmark price series
    """
    # Try to load benchmark from data folder (expects a CSV at DATA_FOLDER/<ticker>.csv)
    benchmark_path = os.path.join(DATA_FOLDER, f"{benchmark_ticker}.csv")

    if os.path.exists(benchmark_path):
        try:
            # Load prices and align to universe index; forward-fill to cover missing dates
            benchmark_df = load_prices_from_csv([benchmark_ticker], START_DATE, END_DATE, DATA_FOLDER, strict=False)
            if not benchmark_df.empty and benchmark_ticker in benchmark_df.columns:
                benchmark_series = benchmark_df[benchmark_ticker].reindex(prices_df.index).fillna(method='ffill')
                logger.info(f"✅ Loaded {benchmark_ticker} benchmark data")
                return benchmark_series
        except Exception as e:
            logger.warning(f"Failed to load {benchmark_ticker}: {e}")

    # Fallback: build a synthetic equal-weight benchmark from the universe
    logger.info(f"Creating synthetic equal-weight benchmark from universe")
    returns = compute_returns(prices_df)
    if returns.empty:
        # Preserve index shape; return empty float series if we cannot compute returns
        return pd.Series(index=prices_df.index, dtype=float)

    # Equal-weight vector across available columns
    w = np.full(len(prices_df.columns), 1.0 / max(1, len(prices_df.columns)))
    # Aggregate cross-sectional returns into a single EW return
    ew_ret = pd.Series(np.nansum(returns.values * w.reshape(1, -1), axis=1), index=returns.index)
    # Convert returns to NAV and normalize to base 100 for comparability
    ew_nav = (1 + ew_ret).cumprod()
    return ew_nav / ew_nav.iloc[0] * 100  # Normalize to 100

## ENHANCED VISUALIZATION FUNCTIONS

This module defines three Plotly-based visualization functions for the portfolio optimization case study:

* create\_performance\_dashboard: A 2x2 dashboard showing portfolio vs. benchmark NAV, current allocation, rolling drawdowns, and a KPI comparison bar chart.  
* create\_risk\_metrics\_chart: A radar (polar) chart that summarizes key risk metrics such as Sharpe, Sortino, Information Ratio, VaR, CVaR, and Max Drawdown.  
* create\_walk\_forward\_analysis\_chart: A two-panel chart showing the out-of-sample NAV evolution and in-sample vs. out-of-sample Sharpe ratios by fold for walk-forward evaluation.

Assumptions and conventions:

* NAV inputs are pd.Series with aligned DatetimeIndex.  
* KPI values for returns/volatility/drawdowns are decimal fractions (e.g., 0.12 for 12%) and are converted to percentages for plotting.  
* Walk-forward results are provided as a dict with folds (list of dicts with kpis\_is, kpis\_oos) and nav\_oos (Series).


In [39]:
def create_performance_dashboard(portfolio_nav, benchmark_nav, portfolio_kpis, benchmark_kpis, weights_series):
    """
    Create comprehensive performance dashboard with multiple charts

    Parameters:
    -----------
    portfolio_nav : pd.Series
        Portfolio NAV series
    benchmark_nav : pd.Series
        Benchmark NAV series
    portfolio_kpis : dict
        Portfolio KPIs
    benchmark_kpis : dict
        Benchmark KPIs
    weights_series : pd.Series
        Final portfolio weights

    Returns:
    --------
    plotly.graph_objects.Figure
        Interactive dashboard figure
    """
    # Create subplots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=(
            'Portfolio vs Benchmark Performance',
            'Portfolio Allocation',
            'Rolling Drawdown',
            'Performance Metrics Comparison'
        ),
        specs=[[{"secondary_y": False}, {"type": "pie"}],
               [{"secondary_y": False}, {"type": "bar"}]],
        vertical_spacing=0.12,
        horizontal_spacing=0.1
    )

    # 1. Performance comparison
    fig.add_trace(
        go.Scatter(
            x=portfolio_nav.index,
            y=portfolio_nav.values,
            name='Portfolio',
            line=dict(color='#1f77b4', width=2)
        ),
        row=1, col=1
    )

    fig.add_trace(
        go.Scatter(
            x=benchmark_nav.index,
            y=benchmark_nav.values,
            name='Benchmark',
            line=dict(color='#ff7f0e', width=2, dash='dash')
        ),
        row=1, col=1
    )

    # 2. Portfolio allocation pie chart
    fig.add_trace(
        go.Pie(
            labels=weights_series.index,
            values=weights_series.values,
            name="Portfolio Weights",
            textinfo='label+percent',
            textposition='auto'
        ),
        row=1, col=2
    )

    # 3. Rolling drawdown
    portfolio_dd = (portfolio_nav / portfolio_nav.cummax() - 1) * 100  # Assumes NAV is cumulative; output in %
    benchmark_dd = (benchmark_nav / benchmark_nav.cummax() - 1) * 100  # Same scaling for benchmark

    fig.add_trace(
        go.Scatter(
            x=portfolio_dd.index,
            y=portfolio_dd.values,
            name='Portfolio DD',
            fill='tonexty',
            line=dict(color='red', width=1)
        ),
        row=2, col=1
    )

    fig.add_trace(
        go.Scatter(
            x=benchmark_dd.index,
            y=benchmark_dd.values,
            name='Benchmark DD',
            line=dict(color='orange', width=1, dash='dash')
        ),
        row=2, col=1
    )

    # 4. Performance metrics comparison
    metrics = ['ann_return', 'ann_vol', 'sharpe', 'max_dd']
    # Convert return/vol/DD from decimals to percentages for display; Sharpe remains unitless
    portfolio_values = [portfolio_kpis.get(m, 0) * (100 if m in ['ann_return', 'ann_vol', 'max_dd'] else 1) for m in metrics]
    benchmark_values = [benchmark_kpis.get(m, 0) * (100 if m in ['ann_return', 'ann_vol', 'max_dd'] else 1) for m in metrics]

    fig.add_trace(
        go.Bar(
            x=['Annual Return (%)', 'Volatility (%)', 'Sharpe Ratio', 'Max DD (%)'],
            y=portfolio_values,
            name='Portfolio',
            marker_color='#1f77b4'
        ),
        row=2, col=2
    )

    fig.add_trace(
        go.Bar(
            x=['Annual Return (%)', 'Volatility (%)', 'Sharpe Ratio', 'Max DD (%)'],
            y=benchmark_values,
            name='Benchmark',
            marker_color='#ff7f0e'
        ),
        row=2, col=2
    )

    # Update layout
    fig.update_layout(
        title={
            'text': '📊 Portfolio Optimization Dashboard',
            'x': 0.5,
            'xanchor': 'center',
            'font': {'size': 20}
        },
        height=800,
        showlegend=True,
        template='plotly_white'  # Consistent clean theme across subplots
    )

    # Update axes labels
    fig.update_xaxes(title_text="Date", row=1, col=1)
    fig.update_yaxes(title_text="NAV", row=1, col=1)
    fig.update_xaxes(title_text="Date", row=2, col=1)
    fig.update_yaxes(title_text="Drawdown (%)", row=2, col=1)
    fig.update_yaxes(title_text="Value", row=2, col=2)

    return fig

def create_risk_metrics_chart(comprehensive_kpis):
    """
    Create advanced risk metrics visualization

    Parameters:
    -----------
    comprehensive_kpis : dict
        Dictionary with comprehensive KPIs including VaR, CVaR, etc.

    Returns:
    --------
    plotly.graph_objects.Figure
        Risk metrics chart
    """
    # Risk metrics data (VaR/CVaR/Max DD expressed as positive percentages for visualization)
    risk_metrics = {
        'Sharpe Ratio': comprehensive_kpis.get('sharpe', 0),
        'Sortino Ratio': comprehensive_kpis.get('sortino', 0),
        'Information Ratio': comprehensive_kpis.get('info_ratio', 0),
        'VaR (95%)': abs(comprehensive_kpis.get('var_95', 0)) * 100,
        'CVaR (95%)': abs(comprehensive_kpis.get('cvar_95', 0)) * 100,
        'Max Drawdown': abs(comprehensive_kpis.get('max_dd', 0)) * 100
    }

    # Create radar chart for risk metrics
    fig = go.Figure()

    # Normalize values for radar chart (cap ratio-like metrics for readability)
    normalized_values = []
    for metric, value in risk_metrics.items():
        if 'Ratio' in metric:
            normalized_values.append(max(0, min(value, 3)))  # Cap ratios at 3
        else:
            normalized_values.append(value)

    fig.add_trace(go.Scatterpolar(
        r=normalized_values,
        theta=list(risk_metrics.keys()),
        fill='toself',
        name='Risk Metrics',
        line_color='rgb(1,90,120)',
        fillcolor='rgba(1,90,120,0.2)'
    ))

    fig.update_layout(
        polar=dict(
            radialaxis=dict(
                visible=True,
                range=[0, max(normalized_values) * 1.1]  # Dynamic radial range based on data
            )
        ),
        title={
            'text': '🎯 Advanced Risk Metrics Profile',
            'x': 0.5,
            'xanchor': 'center',
            'font': {'size': 16}
        },
        template='plotly_white',
        height=500
    )

    return fig

def create_walk_forward_analysis_chart(walk_forward_results):
    """
    Create walk-forward analysis visualization

    Parameters:
    -----------
    walk_forward_results : dict
        Results from walk_forward_evaluate function

    Returns:
    --------
    plotly.graph_objects.Figure
        Walk-forward analysis chart
    """
    folds = walk_forward_results['folds']  # List of fold dicts with 'kpis_is' and 'kpis_oos'
    nav_oos = walk_forward_results['nav_oos']  # Out-of-sample NAV as a time series

    # Create subplots
    fig = make_subplots(
        rows=2, cols=1,
        subplot_titles=(
            'Out-of-Sample NAV Evolution',
            'In-Sample vs Out-of-Sample Sharpe Ratios by Fold'
        ),
        vertical_spacing=0.15
    )

    # 1. OOS NAV evolution
    fig.add_trace(
        go.Scatter(
            x=nav_oos.index,
            y=nav_oos.values,
            name='OOS NAV',
            line=dict(color='#2E8B57', width=2)
        ),
        row=1, col=1
    )

    # 2. IS vs OOS Sharpe ratios
    fold_numbers = list(range(1, len(folds) + 1))  # Sequential fold labels starting at 1
    is_sharpe = [fold['kpis_is']['sharpe'] for fold in folds]
    oos_sharpe = [fold['kpis_oos']['sharpe'] for fold in folds]

    fig.add_trace(
        go.Bar(
            x=fold_numbers,
            y=is_sharpe,
            name='In-Sample Sharpe',
            marker_color='#4CAF50',
            opacity=0.7
        ),
        row=2, col=1
    )

    fig.add_trace(
        go.Bar(
            x=fold_numbers,
            y=oos_sharpe,
            name='Out-of-Sample Sharpe',
            marker_color='#FF9800',
            opacity=0.7
        ),
        row=2, col=1
    )

    # Update layout
    fig.update_layout(
        title={
            'text': '🔄 Walk-Forward Analysis Results',
            'x': 0.5,
            'xanchor': 'center',
            'font': {'size': 18}
        },
        height=700,
        showlegend=True,
        template='plotly_white'
    )

    # Update axes
    fig.update_xaxes(title_text="Date", row=1, col=1)
    fig.update_yaxes(title_text="NAV", row=1, col=1)
    fig.update_xaxes(title_text="Fold Number", row=2, col=1)
    fig.update_yaxes(title_text="Sharpe Ratio", row=2, col=1)

    return fig

## MAIN EXECUTION PIPELINE

### Step 1: Data Loading and Validation

* If running in a notebook/Colab environment, it clones the project repo (only if missing) and points the data path to the repo’s data folder.  
* Lists available CSV files to confirm data presence.  
* Loads the stock price data for the defined STOCK\_UNIVERSE and the benchmark series.  
* Prints basic diagnostics: number of assets, date coverage, and summary stats (average daily return/volatility and average pairwise correlation).  
* Wraps the loading in a try/except to surface failures cleanly.

In [40]:
print("🔄 Cloning repo and locating data...")

# Clone repo if needed
REPO_URL = "https://github.com/maxiveloso/python-portfolio-optimization.git"
REPO_DIR = "/content/python-portfolio-optimization"  # Colab-style working directory
DATA_FOLDER = os.path.join(REPO_DIR, "data")  # CSVs are expected under the repo's data folder

if not os.path.exists(REPO_DIR):
    print("📥 Cloning GitHub repo...")
    !git clone -q {REPO_URL} {REPO_DIR}  # Notebook shell command; requires internet access

# Confirm data files are present
files = os.listdir(DATA_FOLDER)
tickers_found = [f.replace(".csv", "") for f in files if f.endswith(".csv")]
print(f"📂 Found {len(tickers_found)} tickers in repo data folder: {', '.join(tickers_found[:10])}...")

# Load data using your existing function
try:
    stock_prices = load_prices_from_csv(
        tickers=STOCK_UNIVERSE,
        start_date=START_DATE,
        end_date=END_DATE,
        data_folder=DATA_FOLDER,
        strict=False  # tolerate missing tickers; load what’s available
    )

    benchmark_prices = load_benchmark_data(stock_prices, BENCHMARK)  # aligns to stock index; can synthesize EW if missing

    print(f"✅ Data loaded successfully:")
    print(f"   📈 Stocks: {len(stock_prices.columns)} assets, {len(stock_prices)} days")
    print(f"   🎯 Benchmark: {len(benchmark_prices)} days")
    print(f"   📅 Date range: {stock_prices.index[0].date()} to {stock_prices.index[-1].date()}")

    # Display basic statistics
    returns = compute_returns(stock_prices)  # compute daily returns from prices
    print(f"\n📊 Universe Statistics:")
    print(f"   Average daily return: {returns.mean().mean()*100:.3f}%")
    print(f"   Average daily volatility: {returns.std().mean()*100:.3f}%")
    print(f"   Average correlation: {average_offdiag_correlation(returns):.3f}")  # mean of off-diagonal correlations

except Exception as e:
    print(f"❌ Error loading data: {e}")
    raise

🔄 Cloning repo and locating data...
📂 Found 11 tickers in repo data folder: AMZN, TSLA, CRM, META, GOOGL, NVDA, AAPL, NFLX, MSFT, AMD...
✅ Data loaded successfully:
   📈 Stocks: 10 assets, 1258 days
   🎯 Benchmark: 1258 days
   📅 Date range: 2020-01-02 to 2024-12-31

📊 Universe Statistics:
   Average daily return: 0.147%
   Average daily volatility: 2.738%
   Average correlation: 0.542


### Step 2: Stock Selection and Portfolio Optimization

This pipeline segment selects a subset of stocks from the universe and optimizes portfolio weights across multiple objectives. It:

* Chooses a lookback window (up to \~2 years) and a selection method (exhaustive or greedy) based on universe size.  
* Calls a stock selection routine to pick N\_STOCKS\_AUTO tickers and reports the chosen list.  
* Computes returns for the selected tickers.  
* Optimizes portfolio weights for three objectives—maximum Sharpe, minimum variance, and risk parity—under no-short and max-weight constraints, collecting KPIs for each.  
* Selects the Max Sharpe solution as the primary strategy and prints its summary statistics and final weights.

In [41]:
print("🔄 Performing stock selection and optimization...")

# Stock selection
lookback_days = min(504, len(stock_prices))  # 2 years or available data
selection_method = 'exhaustive' if len(stock_prices.columns) <= 10 else 'greedy'  # Exhaustive search for small universes; greedy otherwise

selection_results = select_optimal_stocks(
    prices_df=stock_prices,
    N_stocks=N_STOCKS_AUTO,
    lookback_days=lookback_days,
    method=selection_method,
    seed=RNG_SEED
)

selected_tickers = selection_results['selected_tickers']  # List of chosen tickers from selection stage
print(f"✅ Selected {len(selected_tickers)} stocks: {selected_tickers}")

# Portfolio optimization with multiple objectives
selected_prices = stock_prices[selected_tickers]  # Restrict price data to selected names
selected_returns = compute_returns(selected_prices)  # Compute periodic returns used by optimizers

# Optimize for different objectives
objectives = ['max_sharpe', 'min_var', 'risk_parity']  # Three optimization targets to compare
optimization_results = {}

for objective in objectives:
    print(f"\n🎯 Optimizing for {objective}...")
    opt_result = optimize_weights(
        returns_df=selected_returns,
        objective=objective,          # Optimization objective
        bounds=(0.0, MAX_WEIGHT),     # Long-only with per-asset max weight cap
        sum_to=1.0,                   # Weights must sum to 1 (fully invested)
        method='scipy',               # Use SciPy-based solver
        risk_free=RISK_FREE_RATE,     # Used for Sharpe-based objective
        seed=RNG_SEED                 # Reproducibility for any randomized steps
    )
    optimization_results[objective] = opt_result

    kpis = opt_result['kpis']
    print(f"   📊 {objective}: Sharpe={kpis['sharpe']:.3f}, Return={kpis['ann_return']*100:.2f}%, Vol={kpis['ann_vol']*100:.2f}%")

# Use max_sharpe as primary strategy
primary_weights = optimization_results['max_sharpe']['weights']
primary_kpis = optimization_results['max_sharpe']['kpis']  # KPI snapshot for the chosen solution

print(f"\n🏆 Primary Strategy (Max Sharpe):")
print(f"   Sharpe Ratio: {primary_kpis['sharpe']:.3f}")
print(f"   Annual Return: {primary_kpis['ann_return']*100:.2f}%")
print(f"   Annual Volatility: {primary_kpis['ann_vol']*100:.2f}%")
print(f"\n💼 Portfolio Weights:")
for ticker, weight in primary_weights.items():
    print(f"   {ticker}: {weight*100:.2f}%")  # Final allocation per asset

🔄 Performing stock selection and optimization...
✅ Selected 4 stocks: ['NVDA', 'META', 'NFLX', 'CRM']

🎯 Optimizing for max_sharpe...
   📊 max_sharpe: Sharpe=1.182, Return=48.48%, Vol=39.31%

🎯 Optimizing for min_var...
   📊 min_var: Sharpe=0.759, Return=28.63%, Vol=35.11%

🎯 Optimizing for risk_parity...
   📊 risk_parity: Sharpe=1.041, Return=40.10%, Vol=36.61%

🏆 Primary Strategy (Max Sharpe):
   Sharpe Ratio: 1.182
   Annual Return: 48.48%
   Annual Volatility: 39.31%

💼 Portfolio Weights:
   NVDA: 40.00%
   META: 24.11%
   NFLX: 27.62%
   CRM: 8.28%


### Step 3: Comprehensive KPI Analysis

This block computes the portfolio’s daily returns and the benchmark’s daily returns, then produces a comprehensive set of performance and risk KPIs for the portfolio (and separately for the benchmark as a baseline). It prints a readable summary of annualized return/volatility, Sharpe/Sortino, information ratio, drawdown, VaR/CVaR, and alpha/beta (vs. the benchmark), followed by benchmark-relative diagnostics such as correlation, active return, and tracking error. It uses the global RISK_FREE_RATE and BENCHMARK for risk-adjusted metrics and labeling.

In [42]:
print("🔄 Calculating comprehensive KPIs...")

# Calculate portfolio returns
# Compute daily portfolio returns as the weighted sum of selected asset returns
portfolio_returns = (selected_returns * primary_weights).sum(axis=1)

# Calculate benchmark returns
# Convert benchmark price series to a single-column DataFrame and compute daily returns
benchmark_returns = compute_returns(benchmark_prices.to_frame('benchmark'))['benchmark']

# Calculate comprehensive KPIs
# Full KPI suite for the portfolio (ann. return/vol, Sharpe/Sortino, info ratio, max DD, VaR/CVaR, alpha/beta)
# Uses RISK_FREE_RATE for risk-adjusted metrics
comprehensive_kpis = calculate_comprehensive_kpis(
    returns=portfolio_returns,
    benchmark_returns=benchmark_returns,
    rf=RISK_FREE_RATE
)

# Same KPI set for the benchmark (for comparison baselines like alpha/beta)
benchmark_kpis = calculate_comprehensive_kpis(
    returns=benchmark_returns,
    rf=RISK_FREE_RATE
)

# Display comprehensive metrics
# Pretty-print key portfolio metrics (percentages where applicable)
print(f"\n📊 Comprehensive Portfolio Analysis:")
print(f"   Annual Return: {comprehensive_kpis['ann_return']*100:.2f}%")
print(f"   Annual Volatility: {comprehensive_kpis['ann_vol']*100:.2f}%")
print(f"   Sharpe Ratio: {comprehensive_kpis['sharpe']:.3f}")
print(f"   Sortino Ratio: {comprehensive_kpis['sortino']:.3f}")
print(f"   Information Ratio: {comprehensive_kpis['info_ratio']:.3f}")
print(f"   Maximum Drawdown: {comprehensive_kpis['max_dd']*100:.2f}%")
print(f"   VaR (95%): {comprehensive_kpis['var_95']*100:.2f}%")
print(f"   CVaR (95%): {comprehensive_kpis['cvar_95']*100:.2f}%")
print(f"   Alpha: {comprehensive_kpis['alpha']*100:.2f}%")
print(f"   Beta: {comprehensive_kpis['beta']:.3f}")

# Benchmark comparison
# Relative diagnostics vs. benchmark: correlation, active return, tracking error
vs_benchmark = kpis_vs_benchmark(portfolio_returns, benchmark_returns)
print(f"\n🎯 vs {BENCHMARK} Benchmark:")
print(f"   Correlation: {vs_benchmark['corr']:.3f}")
print(f"   Active Return: {vs_benchmark['active_return']*100:.2f}%")
print(f"   Tracking Error: {vs_benchmark['tracking_error']*100:.2f}%")

🔄 Calculating comprehensive KPIs...

📊 Comprehensive Portfolio Analysis:
   Annual Return: 48.48%
   Annual Volatility: 39.30%
   Sharpe Ratio: 1.183
   Sortino Ratio: 1.648
   Information Ratio: 1.363
   Maximum Drawdown: -62.58%
   VaR (95%): -60.50%
   CVaR (95%): -88.51%
   Alpha: 21.29%
   Beta: 1.357

🎯 vs QQQ Benchmark:
   Correlation: 0.884
   Active Return: 27.91%
   Tracking Error: 20.48%


### Step 4: Walk-Forward Analysis

This block performs the walk-forward validation step of the pipeline. It:

* Assembles walk-forward parameters (universe size, selection method, objective, optimizer, turnover cost, weight cap, transaction costs, seed).  
* Runs walk\_forward\_evaluate over rolling train/test windows with a specified rebalance frequency.  
* Extracts and prints aggregated out-of-sample (OOS) KPIs and the number of folds.  
* Computes and reports average in-sample (IS) vs OOS Sharpe across folds, plus the relative performance decay from IS to OOS.

In [43]:
print("🔄 Performing walk-forward analysis...")

# Walk-forward parameters
# Note: `selection_method` must be defined upstream (e.g., exhaustive/heuristic/MC); it drives asset selection per fold.
wf_params = {
    'N_stocks': N_STOCKS_AUTO,
    'selection_method': selection_method,
    'objective': 'max_sharpe',   # Optimize for Sharpe in each training window
    'opt_method': 'scipy',       # Use SciPy optimizer for continuous weights
    'turnover_penalty': 0.0,     # No explicit turnover regularization in objective
    'cost_per_side': COST_PER_SIDE,  # Transaction cost in bps per buy/sell side
    'max_weight': MAX_WEIGHT,    # Per-asset weight cap (risk constraint)
    'seed': RNG_SEED,            # For reproducibility across folds
}

# Perform walk-forward analysis
# Expects: `stock_prices` (price history), rolling train/test windows, and rebalance frequency.
# Returns: a dict with per-fold results and aggregate KPIs (e.g., keys: 'folds', 'agg_kpis_oos').
walk_forward_results = walk_forward_evaluate(
    prices_df=stock_prices,
    train_window=TRAIN_WINDOW,
    test_window=TEST_WINDOW,
    rebalance_freq=REBALANCE_FREQUENCY,
    params=wf_params
)

# Display walk-forward results
# Aggregate OOS KPIs summarize performance across all test windows; 'folds' holds per-window details.
oos_kpis = walk_forward_results['agg_kpis_oos']
n_folds = len(walk_forward_results['folds'])

print(f"\n🔄 Walk-Forward Analysis Results ({n_folds} folds):")
print(f"   📈 OOS Annual Return: {oos_kpis['ann_return']*100:.2f}%")
print(f"   📉 OOS Annual Volatility: {oos_kpis['ann_vol']*100:.2f}%")
print(f"   ⚡ OOS Sharpe Ratio: {oos_kpis['sharpe']:.3f}")
print(f"   🔻 OOS Maximum Drawdown: {oos_kpis['max_dd']*100:.2f}%")

# Calculate average IS vs OOS performance
# Compute mean Sharpe across folds for both in-sample (train) and out-of-sample (test) periods.
avg_is_sharpe = np.mean([fold['kpis_is']['sharpe'] for fold in walk_forward_results['folds']])
avg_oos_sharpe = np.mean([fold['kpis_oos']['sharpe'] for fold in walk_forward_results['folds']])

print(f"\n📊 Average Performance Across Folds:")
print(f"   🎯 Average IS Sharpe: {avg_is_sharpe:.3f}")
print(f"   🎯 Average OOS Sharpe: {avg_oos_sharpe:.3f}")
print(f"   📉 Performance Decay: {((avg_is_sharpe - avg_oos_sharpe) / avg_is_sharpe * 100):.1f}%")  # Note: if avg_is_sharpe == 0, this would divide by zero

🔄 Performing walk-forward analysis...

🔄 Walk-Forward Analysis Results (15 folds):
   📈 OOS Annual Return: 9.20%
   📉 OOS Annual Volatility: 38.59%
   ⚡ OOS Sharpe Ratio: 0.238
   🔻 OOS Maximum Drawdown: -36.79%

📊 Average Performance Across Folds:
   🎯 Average IS Sharpe: 1.887
   🎯 Average OOS Sharpe: 0.488
   📉 Performance Decay: 74.1%


### Step 5: Portfolio Simulation with Rebalancing

This block it simulates a rebalanced portfolio over time using a predefined weight schedule, applies transaction costs, and computes portfolio performance metrics. It:

* Builds a rebalancing schedule from the simulation start to end dates based on final/primary weights and a rebalancing frequency.  
* Runs a transaction-cost-aware backtest to get NAV, turnover per rebalance, cumulative costs, and count of cost events.  
* Summarizes portfolio results (final value, total return, rebalancing events, transaction costs, average turnover).  
* Compares the portfolio’s cumulative performance to a benchmark’s cumulative return and reports excess return.

In [44]:
print("🔄 Simulating portfolio with rebalancing...")

# Create rebalancing schedule
simulation_start = stock_prices.index[0]
simulation_end = stock_prices.index[-1]

weights_schedule = build_weights_schedule(
    start_date=simulation_start,
    end_date=simulation_end,
    weights=primary_weights,  # target weights to apply on each scheduled rebalance date
    freq=REBALANCE_FREQUENCY  # e.g., 'M' for monthly rebalancing cadence
)

# Run simulation
simulation_results = simulate_rebalance(
    prices_df=selected_prices,          # price series for selected assets
    weights_schedule=weights_schedule,  # dict/series of dates -> weights
    rebalance_freq=REBALANCE_FREQUENCY, # controls when trades occur
    cost_per_side=COST_PER_SIDE,        # per-side transaction cost (e.g., 10 bps)
    slippage=0.0                        # additional execution slippage (set to zero here)
)

portfolio_nav = simulation_results['nav']               # cumulative NAV series (starts near 1.0)
turnover_events = simulation_results['turnover']        # turnover per rebalance event
total_costs = simulation_results['cum_cost']            # cumulative transaction costs (NAV terms)
cost_events = simulation_results['cum_cost_events']     # number of rebalance events incurring costs

# Calculate final portfolio value
final_value = portfolio_nav.iloc[-1] * PORTFOLIO_VALUE  # scale NAV by initial capital
total_return = (portfolio_nav.iloc[-1] - 1) * 100       # total % return over the simulation

print(f"\n💰 Portfolio Simulation Results:")
print(f"   🏁 Final Portfolio Value: ${final_value:,.2f}")
print(f"   📈 Total Return: {total_return:.2f}%")
print(f"   🔄 Rebalancing Events: {cost_events:.0f}")
print(f"   💸 Total Transaction Costs: ${total_costs * PORTFOLIO_VALUE:.2f}")
print(f"   📊 Average Turnover per Event: {turnover_events.mean()*100:.2f}%")

# Benchmark comparison
benchmark_nav = (1 + benchmark_returns).cumprod()       # convert returns to NAV-like index via cumprod
benchmark_final_value = benchmark_nav.iloc[-1] * PORTFOLIO_VALUE
benchmark_total_return = (benchmark_nav.iloc[-1] - 1) * 100

print(f"\n🎯 vs {BENCHMARK} Benchmark:")
print(f"   📈 Portfolio Return: {total_return:.2f}%")
print(f"   📊 Benchmark Return: {benchmark_total_return:.2f}%")
print(f"   🏆 Excess Return: {(total_return - benchmark_total_return):.2f}%")

🔄 Simulating portfolio with rebalancing...

💰 Portfolio Simulation Results:
   🏁 Final Portfolio Value: $75,498.35
   📈 Total Return: 654.98%
   🔄 Rebalancing Events: 42
   💸 Total Transaction Costs: $23.85
   📊 Average Turnover per Event: 3.48%

🎯 vs QQQ Benchmark:
   📈 Portfolio Return: 654.98%
   📊 Benchmark Return: 136.51%
   🏆 Excess Return: 518.48%


### Step 6: Executive Summary Generation

This step assembles key performance metrics and costs from earlier steps, generates a concise executive summary using a helper function, and prints it neatly. Specifically:

- Announces the start of summary generation.
- Packages in-sample KPIs, out-of-sample KPIs, and transaction cost info into a single dictionary.
- Calls generate_exec_summary to produce a markdown-formatted overview with performance, risk, and recommendations.
- Prints the summary between separator lines for readability.

In [45]:
print("🔄 Generating executive summary...")

# Prepare summary data
summary_data = {
    'is': comprehensive_kpis,  # In-sample KPIs (dict), computed earlier in the pipeline
    'oos': oos_kpis,           # Out-of-sample KPIs (dict), from walk-forward/backtest
    'costs': {
        'events': cost_events,             # Number of rebalancing/trade events recorded
        'cost_per_side': COST_PER_SIDE     # Transaction cost (per side), global parameter
    }
}

# Generate executive summary
exec_summary = generate_exec_summary(summary_data)  # Returns a markdown-formatted string

print("\n" + "="*60)  # Visual separator
print(exec_summary)    # Human-readable executive summary with performance and risk insights
print("="*60)          # Closing separator

🔄 Generating executive summary...

## 📊 Executive Summary

**Performance Overview:**
- In-sample Sharpe Ratio: 1.18 | Out-of-sample Sharpe: 0.24
- Out-of-sample Annual Return: 9.20% | Volatility: 38.59%
- Maximum Drawdown: -36.79%

**Risk Management:**
- Rebalancing events: 42
- Transaction cost per side: 10.0 bps

**Recommendations:**
- ❌ Weak risk-adjusted performance
- ⚠️ High drawdown levels
- Monitor tracking error and consider adjusting weight constraints if needed


### Step 7: Enhanced Visualizations

This block generates and displays three interactive Plotly visualizations, then tries to save each as standalone HTML files:

- A performance dashboard comparing portfolio vs. benchmark, showing allocations and drawdowns.
- A risk metrics chart summarizing key KPIs.
- A walk-forward analysis chart visualizing out-of-sample results over time.

In [48]:
# --- Helpers -----------------------------------------------------------------
def _to_nav(returns: pd.Series, base: float = 100.0) -> pd.Series:
    r = pd.Series(returns).dropna()
    if r.empty:
        return pd.Series(dtype=float)
    nav = (1 + r).cumprod()
    return nav / nav.iloc[0] * base

def _rolling_sharpe(daily_ret: pd.Series, ann_rf: float = 0.0, window: int = 252) -> pd.Series:
    r = pd.Series(daily_ret).dropna()
    if r.empty:
        return pd.Series(dtype=float)
    rf_d = ann_rf / 252.0
    excess = r - rf_d
    mu = excess.rolling(window).mean()
    sd = excess.rolling(window).std(ddof=0)
    return (mu / sd) * np.sqrt(252)

def _rolling_vol(daily_ret: pd.Series, window: int = 252) -> pd.Series:
    r = pd.Series(daily_ret).dropna()
    if r.empty:
        return pd.Series(dtype=float)
    return r.rolling(window).std(ddof=0) * np.sqrt(252)  # annualized

def _var_cvar(ret: pd.Series, q: float = 0.05):
    r = pd.Series(ret).dropna()
    if r.empty:
        return np.nan, np.nan
    VaR = r.quantile(q)
    CVaR = r[r <= VaR].mean()
    return VaR, CVaR


# --- Ensure NAV/returns are available ----------------------------------------
portfolio_nav = _to_nav(portfolio_returns, base=100.0)
benchmark_nav = _to_nav(benchmark_returns, base=100.0)
p_ret = portfolio_returns.dropna()
b_ret = benchmark_returns.dropna()

# --- 1) BANS: KPI cards (2 rows x 5 cols) -----------------------------------
pk = dict(comprehensive_kpis)
bk = dict(benchmark_kpis)
oos = oos_kpis

cards = [
    {"title": "Annual Return",      "key": "ann_return",  "pct": True,  "ref": "bench"},
    {"title": "Annual Volatility",  "key": "ann_vol",     "pct": True,  "ref": "bench"},
    {"title": "Sharpe Ratio",       "key": "sharpe",      "pct": False, "ref": "bench"},
    {"title": "Sortino Ratio",      "key": "sortino",     "pct": False, "ref": "bench"},
    {"title": "Max Drawdown",       "key": "max_dd",      "pct": True,  "ref": "bench"},
    {"title": "Information Ratio",  "key": "info_ratio",  "pct": False, "ref": 0.0},
    {"title": "Alpha",              "key": "alpha",       "pct": True,  "ref": 0.0},
    {"title": "Beta",               "key": "beta",        "pct": False, "ref": 1.0},
    {"title": "OOS Annual Return",  "key": ("oos","ann_return"), "pct": True,  "ref": None},
    {"title": "OOS Sharpe",         "key": ("oos","sharpe"),     "pct": False, "ref": None},
]

cols = 5
rows = 2
bans_fig = make_subplots(rows=rows, cols=cols, specs=[[{"type": "domain"}]*cols for _ in range(rows)])

def _resolve_value(card):
    key = card["key"]
    if isinstance(key, tuple) and key[0] == "oos":
        return float(oos.get(key[1], np.nan))
    return float(pk.get(key, np.nan))

def _resolve_reference(card):
    ref = card["ref"]
    if ref == "bench":
        return float(bk.get(card["key"], np.nan))
    return ref

number_font_size = 44
title_font_size = 14
delta_font_size = 18

for i, card in enumerate(cards, start=1):
    r = (i-1)//cols + 1
    c = (i-1)%cols + 1
    val = _resolve_value(card)
    ref = _resolve_reference(card)
    pct = card["pct"]
    display_val = val * 100.0 if pct and pd.notna(val) else val
    display_ref = (ref * 100.0) if (pct and isinstance(ref, (int,float)) and pd.notna(ref)) else ref

    delta_dict = None
    if ref is not None and pd.notna(display_ref):
        delta_dict = dict(reference=display_ref, valueformat=".2f", relative=False,
                          font=dict(size=delta_font_size))

    bans_fig.add_trace(
        go.Indicator(
            mode="number+delta",
            value=0.0 if pd.isna(display_val) else display_val,
            number=dict(valueformat=".2f", suffix="%" if pct else "", font=dict(size=number_font_size)),
            delta=delta_dict,
            title=dict(text=card["title"], font=dict(size=title_font_size, family="sans-serif"))
        ),
        row=r, col=c
    )

bans_fig.update_layout(
    height=250 * rows,
    template="plotly_white",
    margin=dict(l=20, r=20, t=60, b=20),
    autosize=True
)
bans_fig.update_xaxes(showgrid=False, zeroline=False)
bans_fig.update_yaxes(showgrid=False, zeroline=False)

bans_fig.show()

# --- 2) Performance dashboard: Return (%) vs Rolling Drawdown ---------------
perf_fig = make_subplots(specs=[[{"secondary_y": True}]])

# Compute cumulative returns and drawdowns
p_cum = (1 + p_ret).cumprod()
b_cum = (1 + b_ret).cumprod()

idx = p_cum.index.union(b_cum.index).sort_values()
p_cum = p_cum.reindex(idx).ffill().fillna(1.0)
b_cum = b_cum.reindex(idx).ffill().fillna(1.0)

p_ret_cum_pct = (p_cum - 1.0) * 100.0
b_ret_cum_pct = (b_cum - 1.0) * 100.0

p_dd_pct = (p_cum / p_cum.cummax() - 1.0) * 100.0
b_dd_pct = (b_cum / b_cum.cummax() - 1.0) * 100.0

portfolio_color = "#1f77b4"
portfolio_dd_color = "crimson"
benchmark_color = "#2ca02c"

# Portfolio cumulative return (solid) - Primary Y-axis
perf_fig.add_trace(
    go.Scatter(
        x=idx, y=p_ret_cum_pct,
        name="Portfolio Return",
        line=dict(color=portfolio_color, width=2),
        hovertemplate="%{y:.2f}%<extra>Portfolio Return</extra>"
    ),
    secondary_y=False,
)

# Benchmark cumulative return (dashed) - Primary Y-axis
perf_fig.add_trace(
    go.Scatter(
        x=idx, y=b_ret_cum_pct,
        name="Benchmark Return",
        line=dict(color=benchmark_color, width=2, dash="dash"),
        hovertemplate="%{y:.2f}%<extra>Benchmark Return</extra>"
    ),
    secondary_y=False,
)

# Portfolio drawdown (filled area to 0%) - Secondary Y-axis
perf_fig.add_trace(
    go.Scatter(
        x=idx, y=p_dd_pct,
        name="Portfolio DD",
        line=dict(color=portfolio_dd_color, width=1),
        fill="tozeroy",
        fillcolor="rgba(220,20,60,0.25)",
        hovertemplate="%{y:.2f}%<extra>Portfolio DD</extra>"
    ),
    secondary_y=True,
)

# Benchmark drawdown (dashed line, filled area to 0%) - Secondary Y-axis
perf_fig.add_trace(
    go.Scatter(
        x=idx, y=b_dd_pct,
        name="Benchmark DD",
        line=dict(color=benchmark_color, width=1, dash="dash"),
        fill="tozeroy",
        fillcolor="rgba(44,160,44,0.15)",
        hovertemplate="%{y:.2f}%<extra>Benchmark DD</extra>"
    ),
    secondary_y=True,
)

perf_fig.update_layout(
    title_text="<b>Portfolio Performance: Cumulative Returns vs. Historical Drawdowns</b>",
    title_x=0.5,
    height=460,
    template="plotly_white",
    margin=dict(l=40, r=20, t=60, b=40),
    legend=dict(
        x=0.02, y=0.5, xanchor="left", yanchor="middle",
        bgcolor="rgba(255,255,255,0.75)", bordercolor="rgba(0,0,0,0)", font=dict(size=10)
    ),
        autosize=True
)

min_ret = min(p_ret_cum_pct.min(), b_ret_cum_pct.min())
max_ret = max(p_ret_cum_pct.max(), b_ret_cum_pct.max())

perf_fig.update_yaxes(
    title_text="Return (%)",
    secondary_y=False,
    showgrid=False,
    zeroline=False,
    range=[min_ret * 1.3, max_ret * 1.3] # Adjusted range to include negative values
)

min_dd = min(p_dd_pct.min(), b_dd_pct.min())
perf_fig.update_yaxes(
    title_text="Drawdown (%)",
    secondary_y=True,
    showgrid=False,
    zeroline=False,
    range=[min_dd * 2, 0],
    tickvals=[0, -20, -40, -60, -80]
)

perf_fig.update_xaxes(showgrid=False, zeroline=False)

perf_fig.add_shape(type="line", x0=idx.min(), x1=idx.max(), y0=0, y1=0,
                   line=dict(color="gray", width=1, dash="dash"), xref="x", yref="y")

perf_fig.show()

# --- 3) Diagnostics + Allocation pie in single 2x2 figure -------------
weights_series = primary_weights

# Use the exact same parameters as the working code
diag_fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=[
        'Distribution of Daily Returns',
        'Rolling Sharpe (12m)',
        'Portfolio Allocation',
        'Rolling Volatility (12m, ann.)'
    ],
    specs=[
        [{"type": "histogram"}, {"type": "scatter"}],
        [{"type": "pie"}, {"type": "scatter"}]
    ],
    vertical_spacing=0.18,
    horizontal_spacing=0.15
)

# Distribution with mean & VaR95 as legend items (row=1,col=1)
mean_r = p_ret.mean()
var95, _cvar95 = _var_cvar(p_ret, q=0.05)

# Use histogram instead of bar for better compatibility
diag_fig.add_trace(
    go.Histogram(
        x=p_ret.values*100.0,
        nbinsx=50,
        name="Returns",
        marker_color='lightblue',
        opacity=0.7,
        showlegend=False
    ),
    row=1, col=1
)

# Add VaR lines using add_shape (same as working code)
diag_fig.add_shape(
    type="line",
    x0=var95*100, x1=var95*100, y0=0, y1=210,
    line=dict(color="orange", width=2, dash="dash"),
    row=1, col=1
)
diag_fig.add_shape(
    type="line",
    x0=mean_r*100, x1=mean_r*100, y0=0, y1=210,
    line=dict(color="red", width=2, dash="dash"),
    row=1, col=1
)

# Rolling Sharpe (row=1,col=2)
rs = _rolling_sharpe(p_ret, ann_rf=0.0, window=252)
overall_sharpe = float(comprehensive_kpis.get("sharpe", 1.2))

diag_fig.add_trace(
    go.Scatter(x=rs.index, y=rs.values, mode="lines",
               line=dict(color="green", width=2), name="Rolling Sharpe",
               showlegend=False),
    row=1, col=2
)

# Add horizontal line using add_shape
diag_fig.add_shape(
    type="line",
    x0=rs.index[0], x1=rs.index[-1],
    y0=overall_sharpe, y1=overall_sharpe,
    line=dict(color="red", width=2, dash="dash"),
    row=1, col=2
)

# Portfolio Allocation Pie
diag_fig.add_trace(
    go.Pie(
        labels=weights_series.index,
        values=weights_series.values,
        name="Allocation",
        textinfo='label+percent',
        textposition='auto',
        showlegend=False
    ),
    row=2, col=1
)

# Rolling Volatility
rv = _rolling_vol(p_ret, window=252)
overall_vol = float(comprehensive_kpis.get("ann_vol", 0.18))

diag_fig.add_trace(
    go.Scatter(x=rv.index, y=rv.values*100.0, mode="lines",
               line=dict(color="blue", width=2), name="Rolling Vol",
               showlegend=False),
    row=2, col=2
)

# Add horizontal line using add_shape
diag_fig.add_shape(
    type="line",
    x0=rv.index[0], x1=rv.index[-1],
    y0=overall_vol * 100.0, y1=overall_vol * 100.0,
    line=dict(color="red", width=2, dash="dash"),
    row=2, col=2
)

# Use the exact same layout as working code
diag_fig.update_layout(
    title={
        'text': '<b>Portfolio Diagnostics Dashboard</b>',
        'y': 0.96,  # Position title very high
        'x': 0.5,
        'xanchor': 'center',
        'yanchor': 'top',
        'font': {'size': 22, 'color': 'black'}
    },
    height=900,
    autosize=True,
    showlegend=False,
    template='plotly_white',
    margin=dict(t=140, b=80, l=80, r=80)
)

# Update subplot titles - same as working code
annotations = list(diag_fig.layout.annotations)
for i, annotation in enumerate(annotations):
    annotation.update(
        font=dict(size=16, color='black'),
        y=annotation.y + 0.03  # Move titles up more
    )

# Update axes labels
diag_fig.update_xaxes(title_text="Return (%)", row=1, col=1)
diag_fig.update_yaxes(title_text="Count", row=1, col=1)

diag_fig.update_xaxes(title_text="Date", row=1, col=2)
diag_fig.update_yaxes(title_text="Sharpe", row=1, col=2)

diag_fig.update_xaxes(title_text="Date", row=2, col=2)
diag_fig.update_yaxes(title_text="Volatility (%)", row=2, col=2)

# Add custom annotations for legends
# Add custom annotation for the Distribution of Daily Returns legend
diag_fig.add_annotation(
    x=0.35, y=0.95,
    xref="paper", yref="paper",
    text=f"<b>Daily Returns</b><br>" +
         f"<span style='color:red'>┅┅</span> Mean: {mean_r * 100.0:.2f}%<br>" +
         f"<span style='color:orange'>┅┅</span> VaR 95%: {var95 * 100.0:.2f}%",
    showarrow=False,
    align="left",
    bgcolor="rgba(255,255,255,0.9)",
    bordercolor="rgba(0,0,0,0.2)",
    borderwidth=0,
    font=dict(size=10)
)

# Add custom annotation for the Rolling Sharpe
diag_fig.add_annotation(
    # Rolling Sharpe legend (bottom-right of cell 1,2)
    x=0.98, y=0.65,
    xref="paper", yref="paper",
    text=f"<b>Rolling Sharpe</b><br>" +
         f"<span style='color:green'>━━</span> Rolling Sharpe<br>" +
         f"<span style='color:red'>┅┅</span> Total Sharpe: {overall_sharpe:.2f}",
    showarrow=False,
    align="left",
    bgcolor="rgba(255,255,255,0.9)",
    bordercolor="rgba(0,0,0,0.0)",
    borderwidth=0,
    font=dict(size=10)
)
# Add custom annotation for the Rolling Volatility
diag_fig.add_annotation(
    # Rolling Volatility legend (bottom-right of cell 2,2)
    x=0.98, y=0.1,
    xref="paper", yref="paper",
    text=f"<b>Rolling Volatility</b><br>" +
         f"<span style='color:blue'>━━</span> Rolling Vol<br>" +
         f"<span style='color:red'>┅┅</span> Avg Vol: {overall_vol * 100.0:.2f}%",
    showarrow=False,
    align="left",
    bgcolor="rgba(255,255,255,0.9)",
    bordercolor="rgba(0,0,0,0.2)",
    borderwidth=0,
    font=dict(size=10)
)

diag_fig.show()

### Step 8: Export Results

This block finalizes the portfolio optimization run by exporting all key artifacts and a human-readable summary. It:

- Writes final portfolio weights, NAV time series, comprehensive KPIs, and walk-forward metrics to CSV files.
- Saves the executive summary as a Markdown file.
- Prints a concise run summary with selected assets and top-line performance metrics.
- Assumes upstream variables (e.g., primary_weights, portfolio_nav, benchmark_nav, comprehensive_kpis, benchmark_kpis, walk_forward_results, exec_summary, selected_tickers, oos_kpis) are already computed.

In [49]:
print("🔄 Exporting results...")

# --- Conditionally set OUTPUT_DIR based on environment ---
try:
    # This code runs in Google Colab
    from google.colab import drive
    drive.mount('/content/drive')

    # Define a path within your Google Drive
    OUTPUT_DIR = '/content/drive/MyDrive/Veloso_Case_Study_Portfolio_Analysis_Results'

    # Set a flag to indicate the environment
    is_colab = True

    # Create the directory if it doesn't exist
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)

except ImportError:
    # This code runs locally or in a GitHub Codespace
    OUTPUT_DIR = 'output'
    is_colab = False
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)

# Export portfolio weights
primary_weights.to_csv(os.path.join(OUTPUT_DIR, 'final_portfolio_weights.csv'), header=['Weight'])  # One-column CSV of final weights

# Export NAV series
nav_export = pd.DataFrame({
    'Date': portfolio_nav.index,
    'Portfolio_NAV': portfolio_nav.values,
    'Benchmark_NAV': benchmark_nav.reindex(portfolio_nav.index).fillna(method='ffill').values  # Align to portfolio dates; forward-fill gaps
})
nav_export.to_csv(os.path.join(OUTPUT_DIR, 'nav_series.csv'), index=False)

# Export comprehensive KPIs
kpis_export = pd.DataFrame({
    'Metric': list(comprehensive_kpis.keys()),
    'Portfolio': list(comprehensive_kpis.values()),
    'Benchmark': [benchmark_kpis.get(k, np.nan) for k in comprehensive_kpis.keys()]  # Map benchmark KPIs by metric; use NaN if missing
})
kpis_export.to_csv(os.path.join(OUTPUT_DIR, 'comprehensive_kpis.csv'), index=False)

# Export walk-forward results
wf_export = []
for i, fold in enumerate(walk_forward_results['folds']):
    wf_export.append({
        'Fold': i + 1,
        'Train_Start': fold['train_start'],
        'Train_End': fold['train_end'],
        'Test_Start': fold['test_start'],
        'Test_End': fold['test_end'],
        'IS_Sharpe': fold['kpis_is']['sharpe'],
        'IS_Return': fold['kpis_is']['ann_return'],
        'IS_Vol': fold['kpis_is']['ann_vol'],
        'OOS_Sharpe': fold['kpis_oos']['sharpe'],
        'OOS_Return': fold['kpis_oos']['ann_return'],
        'OOS_Vol': fold['kpis_oos']['ann_vol'],
        'Selected_Tickers': '|'.join(fold['tickers'])  # Pipe-delimited tickers per fold for readability
    })

wf_df = pd.DataFrame(wf_export)
wf_df.to_csv(os.path.join(OUTPUT_DIR, 'walk_forward_results.csv'), index=False)

# Export executive summary
with open(os.path.join(OUTPUT_DIR, 'executive_summary.md'), 'w') as f:
    f.write(exec_summary)  # Save Markdown-formatted executive summary

print("\n✅ Results exported successfully:")
if is_colab:
    print(f"   The results have been saved to your Google Drive in the folder: '{OUTPUT_DIR}'")
else:
    print(f"   The results have been saved to the local folder: '{OUTPUT_DIR}'")

print("   💼 final_portfolio_weights.csv")
print("   📈 nav_series.csv")
print("   📊 comprehensive_kpis.csv")
print("   🔄 walk_forward_results.csv")
print("   📝 executive_summary.md")

print("\n🎉 Portfolio optimization analysis complete!")
print(f"\n🏆 Final Results Summary:")
print(f"   Selected Assets: {', '.join(selected_tickers)}")
print(f"   Portfolio Sharpe Ratio: {comprehensive_kpis['sharpe']:.3f}")
print(f"   Annual Return: {comprehensive_kpis['ann_return']*100:.2f}%")
print(f"   Maximum Drawdown: {comprehensive_kpis['max_dd']*100:.2f}%")
print(f"   Walk-Forward OOS Sharpe: {oos_kpis['sharpe']:.3f}")

🔄 Exporting results...
Mounted at /content/drive

✅ Results exported successfully:
   The results have been saved to your Google Drive in the folder: '/content/drive/MyDrive/Veloso_Case_Study_Portfolio_Analysis_Results'
   💼 final_portfolio_weights.csv
   📈 nav_series.csv
   📊 comprehensive_kpis.csv
   🔄 walk_forward_results.csv
   📝 executive_summary.md

🎉 Portfolio optimization analysis complete!

🏆 Final Results Summary:
   Selected Assets: NVDA, META, NFLX, CRM
   Portfolio Sharpe Ratio: 1.183
   Annual Return: 48.48%
   Maximum Drawdown: -62.58%
   Walk-Forward OOS Sharpe: 0.238
