In [1]:
import yfinance as yf
import pandas as pd
import numpy as np

from risk_engine.data import to_returns
from risk_engine.market import (
    var_parametric, var_historical, var_es_monte_carlo, backtest_var_historical
)

tickers = ["AAPL", "MSFT", "TLT"]

# Download – choose ONE of these two lines:
# 1) adjusted-prices workflow (provides 'Adj Close'):
# df_raw = yf.download(tickers, start="2020-01-01", end="2025-01-01", auto_adjust=False, progress=False)
# 2) auto-adjusted (default) workflow (use 'Close'):
df_raw = yf.download(tickers, start="2020-01-01", end="2025-01-01", auto_adjust=True, progress=False)

# Extract a clean wide price table
if isinstance(df_raw.columns, pd.MultiIndex):
    main = "Close" if "Close" in df_raw.columns.levels[0] else "Adj Close"
    prices = df_raw[main]
else:
    main = "Close" if "Close" in df_raw.columns else "Adj Close"
    prices = df_raw[main]

prices = prices.dropna(how="all").astype(float)

# Compute returns and weights
returns = to_returns(prices)
w = np.ones(len(returns.columns)) / len(returns.columns)

# Risk metrics
exposure = 1_000_000
alpha = 0.99
print("Parametric VaR:", var_parametric(returns, w, alpha=alpha, exposure=exposure))
print("Historical VaR:", var_historical(returns, w, alpha=alpha, exposure=exposure))
print("MC VaR, ES:", var_es_monte_carlo(returns, w, alpha=alpha, exposure=exposure))

# Backtest (needs enough data vs window)
window = 250
bt = backtest_var_historical(returns, w, alpha=alpha, window=window)
print("Backtest sample size T:", bt["T"])
print("Exceedances:", bt["exceedances"])
print("Hit rate (should be ~1-alpha):", bt["hit_rate"])
print("Kupiec p-value:", bt["kupiec_pvalue"])


Parametric VaR: 28396.881636590162
Historical VaR: 31645.208994779823
MC VaR, ES: (28490.740468379478, 32450.642510054084)
Backtest sample size T: 1007
Exceedances: 15
Hit rate (should be ~1-alpha): 0.014895729890764648
Kupiec p-value: 0.1454715447424322


In [2]:
import pandas as pd
import numpy as np
from math import sqrt, erfc
from risk_engine.market import backtest_var_historical, var_parametric, var_historical, var_es_monte_carlo

def kupiec_pval(x, T, alpha):
    # LR_uc from our implementation; p = erfc(sqrt(LR/2))
    p = 1 - alpha
    eps = 1e-12
    p = min(max(p, eps), 1-eps)
    pi_hat = min(max((x/T) if T else 0.0, eps), 1-eps)
    ll0 = (T-x)*np.log(1-p) + x*np.log(p)
    ll1 = (T-x)*np.log(1-pi_hat) + x*np.log(pi_hat)
    LR = float(-2*(ll0-ll1))
    return float(erfc(sqrt(LR/2)))

def calibrate_all(returns, weights, exposure=1_000_000, alphas=(0.95, 0.975, 0.99, 0.995), window=250):
    rows = []
    for a in alphas:
        # point estimates (not “accuracy”, but informative)
        var_p = var_parametric(returns, weights, alpha=a, exposure=exposure)
        var_h = var_historical(returns, weights, alpha=a, exposure=exposure)
        var_mc, es_mc = var_es_monte_carlo(returns, weights, alpha=a, exposure=exposure, n_sims=100_000, seed=7)

        # backtest (historical VaR only)
        if len(returns) > window:
            bt = backtest_var_historical(returns, weights, alpha=a, window=window)
            T, x = bt["T"], bt["exceedances"]
            hit = bt["hit_rate"]
            kup = kupiec_pval(x, T, a) if T > 0 else np.nan
        else:
            T = x = hit = kup = np.nan

        rows.append({
            "alpha": a,
            "expected_exceed_rate": 1-a,
            "VaR_Param": var_p,
            "VaR_Hist": var_h,
            "VaR_MC": var_mc,
            "ES_MC": es_mc,
            "BT_T": T,
            "BT_exceed": x,
            "BT_hit_rate": hit,
            "BT_Kupiec_p": kup,
        })
    return pd.DataFrame(rows)

calib = calibrate_all(returns, w, exposure=1_000_000, alphas=(0.95, 0.975, 0.99, 0.995), window=250)
calib_style = calib.copy()
calib_style["expected_exceed_rate"] = (calib_style["expected_exceed_rate"]*100).round(2)
calib_style["BT_hit_rate_%"] = (calib_style["BT_hit_rate"]*100).round(2)
calib_style = calib_style.drop(columns=["BT_hit_rate"])
calib_style


Unnamed: 0,alpha,expected_exceed_rate,VaR_Param,VaR_Hist,VaR_MC,ES_MC,BT_T,BT_exceed,BT_Kupiec_p,BT_hit_rate_%
0,0.95,5.0,19928.81214,19903.216179,19804.09406,24929.449507,1007,51,0.925274,5.06
1,0.975,2.5,23844.291131,25262.362378,23679.24132,28289.291719,1007,23,0.65603,2.28
2,0.99,1.0,28396.881637,31645.208995,28235.616738,32262.122475,1007,15,0.145472,1.49
3,0.995,0.5,31496.872711,41324.745076,31358.835428,34941.105035,1007,13,0.00302,1.29


In [3]:

import pandas as pd
from pathlib import Path

# assumes you already have: `returns` (DataFrame of daily returns) and `w` (weights array)
from risk_engine.market import backtest_var_historical

alpha = 0.99
window = 250

# 1) If not enough data, show a friendly message and skip saving
if len(returns) <= window:
    print(f"Not enough data: have {len(returns)} rows of returns, need > {window} for backtest.")
else:
    # 2) Run backtest
    bt = backtest_var_historical(returns, w, alpha=alpha, window=window)

    # 3) Build exceptions dataframe
    ex_df = pd.DataFrame({
        "date": bt["r_p"].index,
        "return": bt["r_p"].values,
        "VaR_threshold": bt["VaR_threshold"].values,
        "exception": bt["exceptions"].values
    })

    # 4) Ensure output folder exists and save
    outdir = Path("notebooks")
    outdir.mkdir(parents=True, exist_ok=True)
    ex_path = outdir / f"exceptions_{int(alpha*100)}pct_window{window}.csv"
    ex_df.to_csv(ex_path, index=False)
    print("Saved to:", ex_path.resolve())



Saved to: /Users/amandaachiangia/integrated-risk/notebooks/notebooks/notebooks/exceptions_99pct_window250.csv


In [4]:
from math import sqrt, erfc
import numpy as np

def christoffersen_independence_p(except_series: pd.Series):
    # except_series: 0/1 indexed by date, drop NaN at start
    x = except_series.dropna().astype(int).values
    if len(x) < 3: 
        return np.nan
    # transitions
    n00 = n01 = n10 = n11 = 0
    for i in range(1, len(x)):
        a,b = x[i-1], x[i]
        if a==0 and b==0: n00 += 1
        elif a==0 and b==1: n01 += 1
        elif a==1 and b==0: n10 += 1
        else: n11 += 1

    # likelihoods
    def l2(n0, n1):  # for a Bernoulli with p = n1/(n0+n1)
        n = n0 + n1
        if n == 0 or n1 == 0 or n0 == 0:
            # handle edges softly
            from math import log
            p = (n1 + 1e-12)/(n + 1e-12)
            return n0*np.log(1-p) + n1*np.log(p)
        p = n1/n
        return n0*np.log(1-p) + n1*np.log(p)

    L_ind = l2(n00+n01, n10+n11)   # one p for whole chain
    L_dep = l2(n00, n01) + l2(n10, n11)  # separate p_0->1 and p_1->1
    LR = -2*(L_ind - L_dep)
    pval = erfc(sqrt(LR/2.0))
    return pval, {"n00":n00,"n01":n01,"n10":n10,"n11":n11, "LR":LR}

p_ind, counts = christoffersen_independence_p(bt["exceptions"][bt["VaR_threshold"].notna()])
print("Christoffersen independence p-value:", p_ind)
print("Transition counts:", counts)


Christoffersen independence p-value: 0.21667583639016583
Transition counts: {'n00': 977, 'n01': 14, 'n10': 14, 'n11': 1, 'LR': np.float64(1.5262519271321935)}


In [5]:
import yfinance as yf
import pandas as pd

tickers = ["AAPL","MSFT","TLT"]
df = yf.download(tickers, start="2020-01-01", end="2025-01-01", auto_adjust=True)["Close"]

# Ensure the folder exists
import os
os.makedirs("data", exist_ok=True)

# Save CSV inside it
df.to_csv("data/market_data.csv")
print("Saved to data/market_data.csv")


[*********************100%***********************]  3 of 3 completed

Saved to data/market_data.csv



