In [1]:
import os
from pathlib import Path
import pandas as pd
import numpy as np

In [2]:
DATA_DIR = Path(os.path.abspath('')).resolve() / "data"
DAILY_RET_PATH = DATA_DIR / "prices" / "derived" / "daily_returns.parquet"
MKT_VOL_PATH  = DATA_DIR / "market" / "market_vol_monthly.parquet"
RF_PATH       = DATA_DIR / "risk_free" / "rf_monthly.parquet"

RET_MONTHLY_OUT   = DATA_DIR / "returns" / "monthly_asset_returns.parquet"
COMMON_SIG_OUT    = DATA_DIR / "signals" / "common_vol_signal.parquet"
ONEFACTOR_SIG_OUT = DATA_DIR / "signals" / "onefactor_signals.parquet"
STRATS_OUT        = DATA_DIR / "strategies" / "strategy_returns.parquet"

TRAIN_END = pd.Timestamp("2024-12-31")   # Calibrate k on 2023–2024
BASE_WEIGHTS = "equal"                   # "equal" for equal-weight across MAG7 columns in the data
MAX_LEVERAGE = None                      # Optional float cap (e.g., 3.0). None = uncapped.


In [None]:
# Some util functions

def ensure_dirs():
    """
    Create the directories to store the info
    """
    for p in [RET_MONTHLY_OUT, COMMON_SIG_OUT, ONEFACTOR_SIG_OUT, STRATS_OUT]:
        p.parent.mkdir(parents=True, exist_ok=True)

def compound_monthly_returns(daily_rets: pd.DataFrame) -> pd.DataFrame:
    """Compound daily simple returns to monthly simple returns."""

    # aggregate daily returns within each calendar month to get a monthly return
    m = (1.0 + daily_rets).groupby(pd.Grouper(freq="ME")).prod() - 1.0 

    m.index = pd.to_datetime(m.index)  # timestamped at that month’s last calendar day
    return m

def monthly_realized_var_per_asset(daily_rets: pd.DataFrame) -> pd.DataFrame:
    """
    Monthly realized variance (i.e. sample variance over daily returns) per asset 
    from daily returns within each month.
    Uses sample variance (ddof=1).
    """
    sigma2 = daily_rets.groupby(pd.Grouper(freq="ME")).var(ddof=1)
    sigma2.index = pd.to_datetime(sigma2.index)
    return sigma2

def get_weights(columns, mode="equal"):
    if mode == "equal":
        w = pd.Series(1.0 / len(columns), index=columns)
        return w
    raise ValueError(f"Unsupported weight mode: {mode}")

def calibrate_k(signal_base: pd.Series, train_mask: pd.Series) -> float:
    """
    Given base signal s_base_t (e.g., 1/sigma2), choose k so that
    the average exposure over the TRAIN window equals 1.
        k = 1 / mean_t(s_base_t) over train period
    """
    avg = signal_base.loc[train_mask].dropna().mean()
    return float(1.0 / avg) if np.isfinite(avg) and avg > 0 else 1.0

def clamp_exposure(x, max_leverage=None):
    if max_leverage is None:
        return x
    return x.clip(upper=float(max_leverage))

In [None]:
ensure_dirs() # create the directories needed

daily_rets = pd.read_parquet(DAILY_RET_PATH)        # wide: columns=tickers
mkt_vol    = pd.read_parquet(MKT_VOL_PATH)          # columns: sigma2, sigma, inv_sigma, inv_sigma2
rf_m       = pd.read_parquet(RF_PATH)               # column: rf_month

# Align month-end indices
daily_rets.index = pd.to_datetime(daily_rets.index)  # trading days (kept as-is)

# Coerce monthly tables to calendar month-end so they line up with returns
mkt_vol.index = pd.to_datetime(mkt_vol.index).to_period("M").to_timestamp("M")
rf_m.index    = pd.to_datetime(rf_m.index).to_period("M").to_timestamp("M")

# ---------- Build monthly asset returns ----------
mret = compound_monthly_returns(daily_rets)
mret.to_parquet(RET_MONTHLY_OUT)

# Per-asset monthly realized variance (for one-factor baseline)
asset_sigma2 = monthly_realized_var_per_asset(daily_rets)

# ---------- Common-volatility signal (market-based) ----------
# Base signal s_base_t = 1 / sigma2_market_t
s_base = (1.0 / mkt_vol["sigma2"]).rename("s_base").replace([np.inf, -np.inf], np.nan)

# Calibrate k_common on TRAIN window
train_mask = (s_base.index <= TRAIN_END)
k_common = calibrate_k(s_base, train_mask)

exposure_common = (k_common * s_base).rename("exposure_common")
if MAX_LEVERAGE is not None:
    exposure_common = clamp_exposure(exposure_common, MAX_LEVERAGE)
exposure_common_lag = exposure_common.shift(1).rename("exposure_common_lag")

common_df = pd.concat(
    [mkt_vol["sigma2"].rename("sigma2_mkt"), s_base, pd.Series(k_common, index=s_base.index, name="k_common"),
        exposure_common, exposure_common_lag],
    axis=1
)

common_df.to_parquet(COMMON_SIG_OUT)

# ---------- One-factor (per-asset) signals ----------
# For each asset i: s_base_i,t = 1/sigma2_i,t, k_i so avg exposure over TRAIN equals 1
s_base_i = 1.0 / asset_sigma2.replace([np.inf, -np.inf], np.nan)
# Calibrate a k_i per column
k_i = {}
for col in s_base_i.columns:
    k_i[col] = calibrate_k(s_base_i[col], s_base_i.index <= TRAIN_END)
k_i_ser = pd.Series(k_i)

exposure_i = s_base_i.mul(k_i_ser, axis=1)
if MAX_LEVERAGE is not None:
    exposure_i = exposure_i.apply(lambda s: clamp_exposure(s, MAX_LEVERAGE))

exposure_i_lag = exposure_i.shift(1)

onefactor_df = pd.concat(
    {
        "sigma2_i": asset_sigma2,
        "s_base_i": s_base_i,
        "k_i": pd.DataFrame({c: k_i[c] for c in s_base_i.columns}, index=s_base_i.index),
        "exposure_i": exposure_i,
        "exposure_i_lag": exposure_i_lag,
    },
    axis=1
)
onefactor_df.to_parquet(ONEFACTOR_SIG_OUT)

# ---------- Strategy returns ----------
# Make sure indices align
idx = mret.index.intersection(exposure_common_lag.index).intersection(rf_m.index)
mret = mret.loc[idx]
exposure_common_lag = exposure_common_lag.loc[idx]
exposure_i_lag = exposure_i_lag.loc[idx]
rf = rf_m.loc[idx, "rf_month"].rename("rf")

# Base weights
tickers = [c for c in mret.columns]   # should be MAG7
w = get_weights(tickers, BASE_WEIGHTS)

# Equal-weight portfolio (unmanaged)
ew = (mret * w).sum(axis=1).rename("ew")
ew_excess = (ew - rf).rename("ew_excess")

# Common-vol managed (same exposure applied to the whole risky sleeve)
# r_excess = s_{t-1} * (w' * (r - rf)) ; total = r_excess + rf
common_excess = (exposure_common_lag * (mret.sub(rf, axis=0) * w).sum(axis=1)).rename("common_excess")
common_total  = (common_excess + rf).rename("common_total") # total return of the Common-vol managed portfolio

# One-factor baseline (per-asset exposures)
# r_excess = sum_i w_i * s_{i,t-1} * (r_i - rf) ; total = r_excess + rf
# Align exposure_i_lag columns to returns
exposure_i_lag = exposure_i_lag.reindex(columns=tickers)
onefactor_excess = ( (mret.sub(rf, axis=0) * (w * exposure_i_lag)) ).sum(axis=1).rename("onefactor_excess")
onefactor_total  = (onefactor_excess + rf).rename("onefactor_total") # total return of the one-factor managed portfolio

strat = pd.concat([rf, ew, ew_excess, common_excess, common_total, onefactor_excess, onefactor_total], axis=1)
strat.to_parquet(STRATS_OUT)

# ---------- Quick console summary ----------
print("\nCalibrated exposure scalars:")
print(f"  k_common: {k_common:.6f}")
print("  k_i (per asset):")
print(k_i_ser.to_frame("k_i"))
print("\nSaved:")
print(f"  Monthly asset returns -> {Path(*RET_MONTHLY_OUT.parts[RET_MONTHLY_OUT.parts.index('data'):])}")
print(f"  Common-vol signal     -> {Path(*COMMON_SIG_OUT.parts[COMMON_SIG_OUT.parts.index('data'):])}")
print(f"  One-factor signals    -> {Path(*ONEFACTOR_SIG_OUT.parts[ONEFACTOR_SIG_OUT.parts.index('data'):])}")
print(f"  Strategy returns      -> {Path(*STRATS_OUT.parts[STRATS_OUT.parts.index('data'):])}")


Calibrated exposure scalars:
  k_common: 0.000051
  k_i (per asset):
            k_i
AAPL   0.000140
AMZN   0.000289
GOOGL  0.000255
META   0.000330
MSFT   0.000165
NVDA   0.000655
TSLA   0.001009

Saved:
  Monthly asset returns -> data\returns\monthly_asset_returns.parquet
  Common-vol signal     -> data\signals\common_vol_signal.parquet
  One-factor signals    -> data\signals\onefactor_signals.parquet
  Strategy returns      -> data\strategies\strategy_returns.parquet
