In [8]:
# portable_alpha_model.py

import csv
import numpy as np
import pandas as pd
from pathlib import Path
import tkinter as tk
from tkinter import filedialog
import openpyxl

# =============================================================================
# 1. MAPPING: User-friendly labels → Internal variable names
# =============================================================================

LABEL_MAP = {
    "Number of simulations": "N_SIMULATIONS",
    "Simulation horizon (months)": "N_MONTHS",
    "Annual financing mean (%)": "financing_mean_annual",
    "Annual financing vol (%)": "financing_vol_annual",
    "Monthly spike probability": "spike_prob",
    "Spike size (σ × multiplier)": "spike_factor",
    "In-House annual return (%)": "mu_H",
    "In-House annual vol (%)": "sigma_H",
    "Alpha-Extension annual return (%)": "mu_E",
    "Alpha-Extension annual vol (%)": "sigma_E",
    "External annual return (%)": "mu_M",
    "External annual vol (%)": "sigma_M",
    "Corr index–In-House": "rho_idx_H",
    "Corr index–Alpha-Extension": "rho_idx_E",
    "Corr index–External": "rho_idx_M",
    "Corr In-House–Alpha-Extension": "rho_H_E",
    "Corr In-House–External": "rho_H_M",
    "Corr Alpha-Extension–External": "rho_E_M",
    "Buffer multiple": "buffer_multiple",
    # Continuous‐range parameters
    "X min (mm)": "X_min",
    "X max (mm)": "X_max",
    "X step (mm)": "X_step",
    "EM theta min": "EM_theta_min",
    "EM theta max": "EM_theta_max",
    "EM theta step": "EM_theta_step",
    # Backward compatibility if user still provides lists
    "X grid (mm)": "X_grid_list",
    "External manager α fractions": "EM_thetas_list",
    "In-House β": "w_beta_H",
    "In-House α": "w_alpha_H",
}

# =============================================================================
# 2. FILE-PICKER FOR CSV SELECTION
# =============================================================================

def select_csv_file():
    """
    Pop up a file-picker dialog so the user can choose a CSV file.
    Returns a pathlib.Path to the selected file.
    Raises FileNotFoundError if the user cancels.
    """
    root = tk.Tk()
    root.withdraw()
    file_path = filedialog.askopenfilename(
        title="Select CSV File",
        filetypes=[("CSV files", "*.csv"), ("All files", "*.*")]
    )
    root.destroy()
    if not file_path:
        raise FileNotFoundError("No file selected.")
    return Path(file_path)

# =============================================================================
# 3. LOAD PARAMETERS USING MAPPING
# =============================================================================

def load_parameters(csv_filepath, label_map):
    """
    Read a CSV that may have leading instruction rows, then a header row "Parameter,Value".
    Skip all rows until the header, then parse friendly labels → internal names via label_map.
    Returns a dict {internal_var_name: parsed_value}.
    """
    params = {}

    # Read all lines
    lines = Path(csv_filepath).read_text(encoding="utf-8").splitlines()

    # Find header row that starts with "Parameter,"
    header_idx = None
    for i, line in enumerate(lines):
        if line.strip().startswith("Parameter,"):
            header_idx = i
            break

    if header_idx is None:
        raise ValueError(f"No header row starting with 'Parameter,' found in {csv_filepath}")

    # Only use header + following data lines
    header_and_data = lines[header_idx:]
    reader = csv.DictReader(header_and_data)

    for row in reader:
        friendly_key = row.get("Parameter", "").strip()
        if not friendly_key or friendly_key not in label_map:
            continue

        internal_key = label_map[friendly_key]
        raw_val = row.get("Value", "").strip()

        # If semicolon present, parse as list
        if ";" in raw_val:
            parts = [p.strip() for p in raw_val.split(";") if p.strip() != ""]
            parsed_list = []
            for p in parts:
                try:
                    if "." in p:
                        parsed_list.append(float(p))
                    else:
                        parsed_list.append(int(p))
                except ValueError:
                    parsed_list.append(p)
            params[internal_key] = parsed_list
        else:
            # Try int → float → string
            try:
                params[internal_key] = int(raw_val)
            except ValueError:
                try:
                    params[internal_key] = float(raw_val)
                except ValueError:
                    params[internal_key] = raw_val

    return params

# =============================================================================
# 4. HELPER TO LOAD INDEX RETURNS
# =============================================================================

def load_index_returns(csv_path):
    """
    Load a CSV of monthly index returns into a pandas Series.
    Expects a "Date" column and either "Monthly_TR" or "Return" column.
    Returns a pd.Series indexed by Date.
    """
    csv_path = Path(csv_path)
    if not csv_path.exists() or not csv_path.is_file():
        raise FileNotFoundError(f"Index CSV not found at {csv_path}")
    df = pd.read_csv(csv_path, parse_dates=["Date"])
    if "Date" not in df.columns:
        raise ValueError(f"'Date' column is missing from {csv_path}")
    if "Monthly_TR" in df.columns:
        col = "Monthly_TR"
    elif "Return" in df.columns:
        col = "Return"
    else:
        raise ValueError(f"CSV must contain a 'Monthly_TR' or 'Return' column: found {df.columns.tolist()}")

    df = df.sort_values("Date").reset_index(drop=True)
    df.set_index("Date", inplace=True)
    series = df[col].dropna().copy()
    series.index = pd.to_datetime(series.index)
    return series

# =============================================================================
# 5. SIMULATION AND UTILITY FUNCTIONS
# =============================================================================

def simulate_financing(T, financing_mean, financing_sigma, spike_prob, spike_factor):
    """
    Simulate a series of financing spreads f_t for T months,
    using a Normal + occasional jump model.
    """
    f = np.zeros(T)
    for t in range(T):
        base = financing_mean + np.random.normal(0, financing_sigma)
        jump = 0.0
        if np.random.rand() < spike_prob:
            jump = spike_factor * financing_sigma
        f[t] = max(base + jump, 0.0)
    return f

def build_cov_matrix(rho_idx_H, rho_idx_E, rho_idx_M,
                     rho_H_E, rho_H_M, rho_E_M,
                     idx_sigma, sigma_H, sigma_E, sigma_M):
    """
    Build the 4×4 covariance matrix for (Index, H, E, M).
    """
    sds = np.array([idx_sigma, sigma_H, sigma_E, sigma_M])
    rho = np.array([
        [1.0,       rho_idx_H, rho_idx_E, rho_idx_M],
        [rho_idx_H, 1.0,       rho_H_E,   rho_H_M],
        [rho_idx_E, rho_H_E,   1.0,       rho_E_M],
        [rho_idx_M, rho_H_M,   rho_E_M,   1.0    ]
    ])
    return np.outer(sds, sds) * rho

def simulate_alpha_streams(T, cov, mu_idx, mu_H, mu_E, mu_M):
    """
    Simulate T joint observations of (Index_return, H, E, M)
    from a multivariate Normal with given means and covariance.
    Returns an array of shape (T, 4).
    """
    means = np.array([mu_idx, mu_H, mu_E, mu_M])
    return np.random.multivariate_normal(means, cov, size=T)

def export_to_excel(inputs_dict, summary_df, raw_returns_dict, filename="Outputs.xlsx"):
    """
    Write inputs, summary, and raw returns into an Excel workbook.
    """
    with pd.ExcelWriter(filename, engine="openpyxl") as writer:
        # 1) Inputs sheet
        df_inputs = pd.DataFrame.from_dict(inputs_dict, orient="index", columns=["Value"])
        df_inputs.index.name = "Parameter"
        df_inputs.reset_index(inplace=True)
        df_inputs.to_excel(writer, sheet_name="Inputs", index=False)

        # 2) Summary sheet
        summary_df.to_excel(writer, sheet_name="Summary", index=False)

        # 3) Raw Returns sheets
        for sheet_name, df in raw_returns_dict.items():
            df.to_excel(writer, sheet_name=sheet_name, index=True)

    print(f"Exported results to {filename}")

# =============================================================================
# 6. MAIN EXECUTION
# =============================================================================

if __name__ == "__main__":
    # 6.1) Prompt user to select the parameters CSV
    try:
        params_csv_path = select_csv_file()
        print(f"Parameters CSV selected: {params_csv_path}")
    except FileNotFoundError:
        raise RuntimeError("No parameter CSV selected; exiting.")

    # 6.2) Load and unpack parameters
    raw_params = load_parameters(params_csv_path, LABEL_MAP)

    # Unpack scalar parameters (with defaults if missing)
    N_SIMULATIONS         = raw_params.get("N_SIMULATIONS", 5000)
    N_MONTHS              = raw_params.get("N_MONTHS", 12)
    financing_mean_annual = raw_params.get("financing_mean_annual", 0.005)
    financing_vol_annual  = raw_params.get("financing_vol_annual", 0.001)
    spike_prob            = raw_params.get("spike_prob", 0.02)
    spike_factor          = raw_params.get("spike_factor", 2.25)

    mu_H    = raw_params.get("mu_H", 0.04)
    sigma_H = raw_params.get("sigma_H", 0.01)
    mu_E    = raw_params.get("mu_E", 0.05)
    sigma_E = raw_params.get("sigma_E", 0.02)
    mu_M    = raw_params.get("mu_M", 0.03)
    sigma_M = raw_params.get("sigma_M", 0.02)

    rho_idx_H = raw_params.get("rho_idx_H", 0.05)
    rho_idx_E = raw_params.get("rho_idx_E", 0.00)
    rho_idx_M = raw_params.get("rho_idx_M", 0.00)
    rho_H_E   = raw_params.get("rho_H_E", 0.10)
    rho_H_M   = raw_params.get("rho_H_M", 0.10)
    rho_E_M   = raw_params.get("rho_E_M", 0.00)

    buffer_multiple = raw_params.get("buffer_multiple", 3.0)
    w_beta_H        = raw_params.get("w_beta_H", 0.50)
    w_alpha_H       = raw_params.get("w_alpha_H", 0.50)

    # Unpack continuous-range parameters (with defaults)
    X_min   = raw_params.get("X_min", 0)
    X_max   = raw_params.get("X_max", 1000)
    X_step  = raw_params.get("X_step", 100)

    EM_min  = raw_params.get("EM_theta_min", 0.50)
    EM_max  = raw_params.get("EM_theta_max", 0.75)
    EM_step = raw_params.get("EM_theta_step", 0.25)

    # Validate ranges
    if X_step <= 0:
        raise ValueError(f"X step must be positive; got {X_step}")
    if EM_step <= 0:
        raise ValueError(f"EM theta step must be positive; got {EM_step}")
    if X_max < X_min:
        raise ValueError(f"X max ({X_max}) < X min ({X_min})")
    if EM_max < EM_min:
        raise ValueError(f"EM theta max ({EM_max}) < EM theta min ({EM_min})")

    # Build continuous grids (inclusive of endpoint)
    X_grid = np.arange(X_min, X_max + X_step/2, X_step)
    EM_thetas = np.arange(EM_min, EM_max + EM_step/2, EM_step)

    # Warn if grid is too large
    total_combinations = len(X_grid) * len(EM_thetas)
    if total_combinations > 1000:
        print(
            f"WARNING: You will simulate {total_combinations} (X, θ) combinations. "
            "This may be computationally intensive. "
            "Consider increasing step sizes for a reasonable runtime."
        )

    # Backward-compatibility: if user provided explicit lists, use them instead
    if "X_grid_list" in raw_params:
        X_grid = raw_params["X_grid_list"]
    if "EM_thetas_list" in raw_params:
        EM_thetas = raw_params["EM_thetas_list"]

    # Convert annual percentages to monthly decimals
    financing_mean  = financing_mean_annual / 12
    financing_sigma = financing_vol_annual / 12
    mu_H_month      = mu_H / 12
    sigma_H_month   = sigma_H / 12
    mu_E_month      = mu_E / 12
    sigma_E_month   = sigma_E / 12
    mu_M_month      = mu_M / 12
    sigma_M_month   = sigma_M / 12

    # 6.3) Prompt user to select the index CSV
    print("Please select the INDEX CSV (monthly total returns).")
    try:
        INDEX_CSV_PATH = select_csv_file()
        print(f"Index CSV selected: {INDEX_CSV_PATH}")
    except FileNotFoundError:
        raise RuntimeError("Index CSV was not selected; exiting.")

    # 6.4) Load index returns
    try:
        idx_series = load_index_returns(INDEX_CSV_PATH)
        print(
            f"Loaded {len(idx_series)} months, "
            f"from {idx_series.index[0].date()} through {idx_series.index[-1].date()}."
        )
    except Exception as e:
        raise RuntimeError(f"Failed to load index returns: {e}")

    # Compute index mean and sigma (monthly)
    mu_idx    = idx_series.mean()
    idx_sigma = idx_series.std(ddof=1)

    # 6.5) Prepare inputs dictionary for Excel export
    inputs_dict = {
        "N_SIMULATIONS":          N_SIMULATIONS,
        "N_MONTHS":               N_MONTHS,
        "Financing Mean (annual)": financing_mean_annual,
        "Financing Vol  (annual)": financing_vol_annual,
        "Spike Probability (month)": spike_prob,
        "Spike Size (σ‐multiplier)": spike_factor,
        "μ_H (annual)":           mu_H,
        "σ_H (annual)":           sigma_H,
        "μ_E (annual)":           mu_E,
        "σ_E (annual)":           sigma_E,
        "μ_M (annual)":           mu_M,
        "σ_M (annual)":           sigma_M,
        "ρ_idx_H":                rho_idx_H,
        "ρ_idx_E":                rho_idx_E,
        "ρ_idx_M":                rho_idx_M,
        "ρ_H_E":                  rho_H_E,
        "ρ_H_M":                  rho_H_M,
        "ρ_E_M":                  rho_E_M,
        "Buffer multiple (m)":    buffer_multiple,
        "In‐House w_beta":        w_beta_H,
        "In‐House w_alpha":       w_alpha_H,
        "X_min (mm)":             X_min,
        "X_max (mm)":             X_max,
        "X_step (mm)":            X_step,
        "EM theta_min":           EM_min,
        "EM theta_max":           EM_max,
        "EM theta_step":          EM_step
    }

    # 6.6) Run Monte Carlo simulation for all scenarios
    all_summaries = []
    all_raw_returns = {}

    for X in X_grid:
        # 6.6.1) Simulate financing spreads
        f_series = simulate_financing(N_MONTHS, financing_mean, financing_sigma, spike_prob, spike_factor)
        dates_sim = pd.date_range(
            start=idx_series.index[-1] + pd.DateOffset(months=1),
            periods=N_MONTHS, freq="ME"
        )

        # 6.6.2) Pre-allocate one-year returns
        results = {"Base": np.zeros(N_SIMULATIONS)}
        for theta in EM_thetas:
            results[f"EM_X={X}_θ={theta}"] = np.zeros(N_SIMULATIONS)
        results[f"EXT_X={X}"] = np.zeros(N_SIMULATIONS)

        # 6.6.3) Build covariance matrix for this run
        cov_mat = build_cov_matrix(
            rho_idx_H, rho_idx_E, rho_idx_M,
            rho_H_E, rho_H_M, rho_E_M,
            idx_sigma, sigma_H_month, sigma_E_month, sigma_M_month
        )

        # 6.6.4) Draw all streams for all sims at once
        sims = np.random.multivariate_normal(
            [mu_idx, mu_H_month, mu_E_month, mu_M_month],
            cov_mat,
            size=(N_SIMULATIONS, N_MONTHS)
        )  # shape: (N_SIMULATIONS, N_MONTHS, 4)

        # Precompute one-year index returns for each simulation
        idx_one_year = np.prod(1 + sims[:, :, 0], axis=1) - 1

        # 6.6.5) Tile financing series
        f_matrix = np.tile(f_series, (N_SIMULATIONS, 1))

        # 6.6.6) Prepare raw_returns DataFrames for first simulation paths
        raw_returns = {
            "Base": pd.DataFrame(index=dates_sim),
            f"EXT_X={X}": pd.DataFrame(index=dates_sim),
        }
        for theta in EM_thetas:
            raw_returns[f"EM_X={X}_θ={theta}"] = pd.DataFrame(index=dates_sim)

        # 6.6.7) Loop over simulations
        for sim in range(N_SIMULATIONS):
            r_beta = sims[sim, :, 0]
            r_H    = sims[sim, :, 1]
            r_E    = sims[sim, :, 2]
            r_M    = sims[sim, :, 3]
            f_t    = f_matrix[sim, :]

            # Base strategy
            R_base = (r_beta - f_t) * w_beta_H + r_H * w_alpha_H
            results["Base"][sim] = np.prod(1 + R_base) - 1

            # EXT strategy
            inhouse_beta_factor  = (1.0 - X/1000.0) / 2.0
            inhouse_alpha_factor = (1.0 - X/1000.0) / 2.0
            ext_beta_factor      = X / 1000.0
            ext_alpha_factor     = X / 1000.0
            R_ext = (
                (r_beta - f_t) * (inhouse_beta_factor + ext_beta_factor)
                + r_H * inhouse_alpha_factor
                + r_E * ext_alpha_factor
            )
            results[f"EXT_X={X}"][sim] = np.prod(1 + R_ext) - 1

            # EM scenarios
            for theta in EM_thetas:
                em_beta_factor  = X / 1000.0
                em_alpha_factor = theta * (X / 1000.0)
                R_em = (
                    (r_beta - f_t) * (inhouse_beta_factor + em_beta_factor)
                    + r_H * inhouse_alpha_factor
                    + r_M * em_alpha_factor
                )
                results[f"EM_X={X}_θ={theta}"][sim] = np.prod(1 + R_em) - 1

            # Save monthly path for first simulation
            if sim == 0:
                raw_returns["Base"] = pd.DataFrame({"Base": R_base}, index=dates_sim)
                raw_returns[f"EXT_X={X}"] = pd.DataFrame({f"EXT_X={X}": R_ext}, index=dates_sim)
                for theta in EM_thetas:
                    raw_returns[f"EM_X={X}_θ={theta}"] = pd.DataFrame(
                        {f"EM_X={X}_θ={theta}": R_em}, index=dates_sim
                    )

        # 6.6.8) Build yearly results DataFrame
        df_yearly = pd.DataFrame(results)

        # 6.6.9) Compute summary metrics
        summary_rows = []
        for config, arr in df_yearly.items():
            ann_ret = np.mean(arr)
            ann_vol = np.std(arr, ddof=1)
            var_95  = np.percentile(arr, 5)
            # Tracking error = std of (strategy return – index return)
            active_returns = arr - idx_one_year
            te = np.std(active_returns, ddof=1)

            if config not in raw_returns:
                raise KeyError(f"Config '{config}' not found. Keys: {list(raw_returns.keys())}")
            mr_series = raw_returns[config].iloc[:, 0]
            threshold = -buffer_multiple * idx_sigma
            breach_pct = np.mean(mr_series < threshold) * 100

            summary_rows.append({
                "Config": config,
                "X (mm)": X,
                "Ann Return": ann_ret,
                "Ann Vol": ann_vol,
                "VaR 95": var_95,
                "TE (est.)": te,
                "Breach %": breach_pct
            })

        summary_df = pd.DataFrame(summary_rows)
        all_summaries.append(summary_df)

        # 6.6.10) Collect raw returns for Excel sheets
        for key, df in raw_returns.items():
            all_raw_returns[key] = df

    # 6.7) Combine all summaries and export to Excel
    final_summary = pd.concat(all_summaries, ignore_index=True)
    export_to_excel(inputs_dict, final_summary, all_raw_returns)

    # ─── HUMAN‐FRIENDLY POST‐PROCESSING ───────────────────────────────────────────

    # 1) Make a copy so we don’t overwrite the original
    display_df = final_summary.copy()

    # 2) Rename columns to more explicit, human‐readable labels
    display_df = display_df.rename(columns={
        "X (mm)":        "Cash Buffer (USD mm)",
        "Ann Return":    "Annual Return (%)",
        "Ann Vol":       "Annual Volatility (%)",
        "VaR 95":        "95%-VaR (%)",
        "TE (est.)":     "Tracking Error (%)",
        "Breach %":      "Breach Probability (%)"
    })

    # 3) Convert numeric columns into percentage strings (except Cash Buffer)
    pct_cols = [
        "Annual Return (%)",
        "Annual Volatility (%)",
        "95%-VaR (%)",
        "Tracking Error (%)",
        "Breach Probability (%)"
    ]

    for col in pct_cols:
        display_df[col] = (display_df[col]).map("{:.1f}%".format)

    # 4) Show the nicely formatted table
    print("\n=== Summary Table (Human‐Friendly) ===\n")
    print(display_df.to_string(index=False))

    # 5) Print a short English description for each row
    print("\n=== Narrative Summaries ===\n")
    for _, row in display_df.iterrows():
        cfg   = row["Config"]
        x_mm  = row["Cash Buffer (USD mm)"]
        ret   = row["Annual Return (%)"]
        vol   = row["Annual Volatility (%)"]
        var95 = row["95%-VaR (%)"]
        te    = row["Tracking Error (%)"]
        br    = row["Breach Probability (%)"]

        print(
            f"For configuration '{cfg}' with a cash buffer of ${x_mm:.0f} mm:\n"
            f"  • Expected annual return: {ret}\n"
            f"  • Annual volatility: {vol}\n"
            f"  • 95% Value‐at‐Risk: {var95}\n"
            f"  • Estimated tracking error: {te}\n"
            f"  • Probability of breaching the buffer: {br}\n"
        )


Parameters CSV selected: /Users/teacher/Library/CloudStorage/Dropbox/Learning/Code/Portable Alpha-Extension Model/parameters.csv
Please select the INDEX CSV (monthly total returns).
Index CSV selected: /Users/teacher/Library/CloudStorage/Dropbox/Learning/Code/Portable Alpha-Extension Model/sp500tr_fred_divyield.csv
Loaded 663 months, from 1970-01-01 through 2025-03-01.
Exported results to Outputs.xlsx

=== Summary Table (Human‐Friendly) ===

          Config  Cash Buffer (USD mm) Annual Return (%) Annual Volatility (%) 95%-VaR (%) Tracking Error (%) Breach Probability (%)
            Base                     0              4.4%                  0.8%        3.1%               0.7%                   0.0%
    EM_X=0_θ=0.5                     0              4.4%                  0.8%        3.1%               0.7%                   0.0%
   EM_X=0_θ=0.75                     0              4.4%                  0.8%        3.1%               0.7%                   0.0%
         EXT_X=0      