# Inspect y_logret_24h signals

This notebook loads train/val/test signal CSVs produced by `strategies.compute_signals_from_quantiles` for `BINANCE_BTCUSDT.P, 60`.



In [10]:
# Ensure project root is on sys.path for 'strategies' imports
import sys
from pathlib import Path

ROOT = Path.cwd().parents[0]
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

# Quick import check (optional)
try:
    from strategies.quantile_signals import estimate_prob_up
except Exception as e:
    print("Import warning:", e)
    # Fallback if needed: from quantile_signals import estimate_prob_up


In [11]:
from pathlib import Path
import pandas as pd

# Paths
root = Path("/Volumes/Extreme SSD/trading_data/cex/models/BINANCE_BTCUSDT.P, 60/diagnosis/y_logret_120h")
train_csv = root / "pred_train_signals.csv"
val_csv = root / "pred_val_signals.csv"
test_csv = root / "pred_test_signals.csv"

# Load
train_df = pd.read_csv(train_csv)
val_df = pd.read_csv(val_csv)
test_df = pd.read_csv(test_csv)

for df in (train_df, val_df, test_df):
    if 'timestamp' in df.columns:
        ts = pd.to_datetime(df['timestamp'], errors='coerce', utc=True)
        df['timestamp'] = ts.dt.tz_convert('UTC').dt.tz_localize(None)

print("Loaded:")
print("train:", train_df.shape, train_csv)
print("val:", val_df.shape, val_csv)
print("test:", test_df.shape, test_csv)

train_df.head(), val_df.head(), test_df.head()


Loaded:
train: (15453, 20) /Volumes/Extreme SSD/trading_data/cex/models/BINANCE_BTCUSDT.P, 60/diagnosis/y_logret_120h/pred_train_signals.csv
val: (3311, 20) /Volumes/Extreme SSD/trading_data/cex/models/BINANCE_BTCUSDT.P, 60/diagnosis/y_logret_120h/pred_val_signals.csv
test: (3313, 20) /Volumes/Extreme SSD/trading_data/cex/models/BINANCE_BTCUSDT.P, 60/diagnosis/y_logret_120h/pred_test_signals.csv


(            timestamp    y_true  pred_q05  pred_q10  pred_q15  pred_q25  \
 0 2023-01-31 00:00:00  0.020052 -0.076704 -0.051411  0.014310  0.014310   
 1 2023-01-31 01:00:00  0.021190 -0.076704 -0.051411  0.014586  0.014586   
 2 2023-01-31 02:00:00  0.018952 -0.076704 -0.051411  0.014674  0.014674   
 3 2023-01-31 03:00:00  0.022383 -0.076704 -0.051411  0.015676  0.015676   
 4 2023-01-31 04:00:00  0.021943 -0.076704 -0.051411  0.012701  0.012701   
 
    pred_q50  pred_q75  pred_q85  pred_q90  pred_q95  cross_violations  \
 0  0.014310  0.021838  0.024044   0.08092  0.109214                 1   
 1  0.014586  0.021838  0.024044   0.08092  0.109214                 1   
 2  0.014674  0.021838  0.024044   0.08092  0.109214                 1   
 3  0.015676  0.021838  0.024044   0.08092  0.109214                 1   
 4  0.012701  0.022181  0.024044   0.08092  0.109214                 1   
 
    max_cross_gap  exp_ret_avg  exp_ret_conservative   prob_up  tail_spread  \
 0       0.032117

In [12]:
# Load original merged predictions (no signals)
orig_train_csv = root / "pred_train.csv"
orig_val_csv = root / "pred_val.csv"
orig_test_csv = root / "pred_test.csv"

orig_train_df = pd.read_csv(orig_train_csv)
orig_val_df = pd.read_csv(orig_val_csv)
orig_test_df = pd.read_csv(orig_test_csv)

for df in (orig_train_df, orig_val_df, orig_test_df):
    if 'timestamp' in df.columns:
        ts = pd.to_datetime(df['timestamp'], errors='coerce', utc=True)
        df['timestamp'] = ts.dt.tz_convert('UTC').dt.tz_localize(None)

print("Loaded originals:")
print("train:", orig_train_df.shape, orig_train_csv)
print("val:", orig_val_df.shape, orig_val_csv)
print("test:", orig_test_df.shape, orig_test_csv)

orig_train_df.head(), orig_val_df.head(), orig_test_df.head()


Loaded originals:
train: (15453, 11) /Volumes/Extreme SSD/trading_data/cex/models/BINANCE_BTCUSDT.P, 60/diagnosis/y_logret_120h/pred_train.csv
val: (3311, 11) /Volumes/Extreme SSD/trading_data/cex/models/BINANCE_BTCUSDT.P, 60/diagnosis/y_logret_120h/pred_val.csv
test: (3313, 11) /Volumes/Extreme SSD/trading_data/cex/models/BINANCE_BTCUSDT.P, 60/diagnosis/y_logret_120h/pred_test.csv


(            timestamp    y_true  pred_q05  pred_q10  pred_q15  pred_q25  \
 0 2023-01-31 00:00:00  0.020052 -0.076704 -0.051411  0.014310 -0.017807   
 1 2023-01-31 01:00:00  0.021190 -0.076704 -0.051411  0.014586 -0.017807   
 2 2023-01-31 02:00:00  0.018952 -0.076704 -0.051411  0.014674 -0.017807   
 3 2023-01-31 03:00:00  0.022383 -0.076704 -0.051411  0.015676 -0.017807   
 4 2023-01-31 04:00:00  0.021943 -0.076704 -0.051411  0.012701 -0.017807   
 
    pred_q50  pred_q75  pred_q85  pred_q90  pred_q95  
 0 -0.000115  0.021838  0.024044   0.08092  0.109214  
 1 -0.000115  0.021838  0.024044   0.08092  0.109214  
 2 -0.000530  0.021838  0.024044   0.08092  0.109214  
 3 -0.000530  0.021838  0.024044   0.08092  0.109214  
 4 -0.000530  0.022181  0.024044   0.08092  0.109214  ,
             timestamp    y_true  pred_q05  pred_q10  pred_q15  pred_q25  \
 0 2024-11-04 21:00:00  0.130250 -0.077052 -0.053408  0.074706 -0.019589   
 1 2024-11-04 22:00:00  0.119305 -0.077052 -0.053408  0.067

In [23]:
import numpy as np

def add_prob_exp_q50_indicator(
    signals_df: pd.DataFrame,
    original_df: pd.DataFrame,
    tau_long: float = 0.55,
    tau_short: float = 0.45,
    exp_col: str = "exp_ret_avg",
    out_col: str = "signal_prob_exp_q50",
) -> pd.DataFrame:
    """Add a -1/0/1 indicator based on prob_up, expected return, and original pred_q50.

    Long rule:  prob_up >= tau_long and exp_col > pred_q50_orig > 0  →  +1
    Short rule: prob_up <  tau_short and exp_col < pred_q50_orig < 0  →  -1
    Else 0.
    """
    if "prob_up" not in signals_df.columns:
        raise KeyError("signals_df must contain 'prob_up'")
    if exp_col not in signals_df.columns:
        raise KeyError(f"signals_df must contain '{exp_col}'")
    if "pred_q50" not in original_df.columns:
        raise KeyError("original_df must contain 'pred_q50'")

    merged = signals_df.copy()
    if "timestamp" in signals_df.columns and "timestamp" in original_df.columns:
        q50_map = original_df[["timestamp", "pred_q50"]].rename(columns={"pred_q50": "pred_q50_orig"})
        merged = merged.merge(q50_map, on="timestamp", how="left")
    else:
        # Fallback: align by row order
        merged["pred_q50_orig"] = original_df["pred_q50"].values[: len(merged)]

    prob = pd.to_numeric(merged["prob_up"], errors="coerce")
    expv = pd.to_numeric(merged[exp_col], errors="coerce")
    q50o = pd.to_numeric(merged["pred_q50_orig"], errors="coerce")

    cond_long = (prob >= float(tau_long)) & (expv > q50o) & (q50o > 0.0)
    cond_short = (prob < float(tau_short)) & (expv < q50o) & (q50o < 0.0)
    indicator = np.where(cond_long, 1, np.where(cond_short, -1, 0)).astype(int)

    out = merged.copy()
    out[out_col] = indicator
    return out

# Add indicator to each split
train_ind = add_prob_exp_q50_indicator(train_df, orig_train_df, tau_long=0.55, tau_short=0.48, exp_col="exp_ret_avg", out_col="signal_prob_exp_q50")
val_ind = add_prob_exp_q50_indicator(val_df, orig_val_df, tau_long=0.55, tau_short=0.48, exp_col="exp_ret_avg", out_col="signal_prob_exp_q50")
test_ind = add_prob_exp_q50_indicator(test_df, orig_test_df, tau_long=0.55, tau_short=0.48, exp_col="exp_ret_avg", out_col="signal_prob_exp_q50")

print("Indicator value counts (train/val/test):")
print(train_ind["signal_prob_exp_q50"].value_counts().sort_index())
print(val_ind["signal_prob_exp_q50"].value_counts().sort_index())
print(test_ind["signal_prob_exp_q50"].value_counts().sort_index())

train_ind.head(), val_ind.head(), test_ind.head()


Indicator value counts (train/val/test):
signal_prob_exp_q50
0    7440
1    8013
Name: count, dtype: int64
signal_prob_exp_q50
-1       1
 0    3141
 1     169
Name: count, dtype: int64
signal_prob_exp_q50
0    3282
1      31
Name: count, dtype: int64


(            timestamp    y_true  pred_q05  pred_q10  pred_q15  pred_q25  \
 0 2023-01-31 00:00:00  0.020052 -0.076704 -0.051411  0.014310  0.014310   
 1 2023-01-31 01:00:00  0.021190 -0.076704 -0.051411  0.014586  0.014586   
 2 2023-01-31 02:00:00  0.018952 -0.076704 -0.051411  0.014674  0.014674   
 3 2023-01-31 03:00:00  0.022383 -0.076704 -0.051411  0.015676  0.015676   
 4 2023-01-31 04:00:00  0.021943 -0.076704 -0.051411  0.012701  0.012701   
 
    pred_q50  pred_q75  pred_q85  pred_q90  ...  max_cross_gap  exp_ret_avg  \
 0  0.014310  0.021838  0.024044   0.08092  ...       0.032117     0.016694   
 1  0.014586  0.021838  0.024044   0.08092  ...       0.032393     0.016832   
 2  0.014674  0.021838  0.024044   0.08092  ...       0.032481     0.016876   
 3  0.015676  0.021838  0.024044   0.08092  ...       0.033483     0.017377   
 4  0.012701  0.022181  0.024044   0.08092  ...       0.030508     0.015949   
 
    exp_ret_conservative   prob_up  tail_spread  signal_exp  signa

In [26]:
import numpy as np

def evaluate_indicator(df: pd.DataFrame, name: str):
    # Signals: -1/0/1
    s = pd.to_numeric(df['signal_prob_exp_q50'], errors='coerce').fillna(0).astype(int)
    # Log returns
    y_log = pd.to_numeric(df['y_true'], errors='coerce')

    trades = (s != 0)
    num_trades = int(trades.sum())
    wins = (np.sign(y_log[trades]) == s[trades]) & (np.sign(y_log[trades]) != 0)
    win_rate = float(wins.mean()) if num_trades > 0 else float('nan')

    # Per-trade simple return (not compounded): long = exp(y)-1, short = -(exp(y)-1)
    simple_ret = np.exp(y_log) - 1.0
    pnl_pct = s * simple_ret*100  # per-row percentage PnL (not log)

    # Cumulative (non-compounded) percentage PnL
    cumulative_pct = float(pnl_pct.sum())

    if 'timestamp' in df.columns:
        ts = pd.to_datetime(df['timestamp'], errors='coerce', utc=True).dt.tz_convert('UTC').dt.tz_localize(None)
        cum_series = pnl_pct.set_axis(ts).cumsum()
    else:
        cum_series = pnl_pct.cumsum()

    print(f"{name}: trades={num_trades}, win_rate={win_rate:.3f}, cumulative_pct={cumulative_pct:.6f}")
    return cum_series

cum_train = evaluate_indicator(train_ind, 'train')
cum_val = evaluate_indicator(val_ind, 'val')
cum_test = evaluate_indicator(test_ind, 'test')

cum_train.tail(), cum_val.tail(), cum_test.tail()



train: trades=8013, win_rate=0.939, cumulative_pct=40776.119003
val: trades=170, win_rate=0.718, cumulative_pct=690.828619
test: trades=31, win_rate=0.742, cumulative_pct=149.336846


(timestamp
 2024-11-04 16:00:00    40725.304104
 2024-11-04 17:00:00    40737.687552
 2024-11-04 18:00:00    40750.550515
 2024-11-04 19:00:00    40762.857117
 2024-11-04 20:00:00    40776.119003
 dtype: float64,
 timestamp
 2025-03-22 15:00:00    690.828619
 2025-03-22 16:00:00    690.828619
 2025-03-22 17:00:00    690.828619
 2025-03-22 18:00:00    690.828619
 2025-03-22 19:00:00    690.828619
 dtype: float64,
 timestamp
 2025-08-07 16:00:00    149.336846
 2025-08-07 17:00:00    149.336846
 2025-08-07 18:00:00    149.336846
 2025-08-07 19:00:00    149.336846
 2025-08-07 20:00:00    149.336846
 dtype: float64)

In [None]:
# Ridge regression on quantile vector to predict y_true (train+val fit)
import numpy as np
import pandas as pd
import re

from sklearn.linear_model import Ridge
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler

# Select quantile columns like pred_q05, pred_q10, ..., pred_q95
qcols = [c for c in train_df.columns if re.fullmatch(r'pred_q\d{2}', str(c))]
qcols = sorted(qcols, key=lambda c: int(c[-2:]))
if not qcols:
    raise ValueError("No quantile columns found in train_df matching 'pred_q##'.")

trv_X = pd.concat([train_df[qcols], val_df[qcols]], ignore_index=True).apply(pd.to_numeric, errors='coerce')
trv_y = pd.concat([train_df['y_true'], val_df['y_true']], ignore_index=True).apply(pd.to_numeric, errors='coerce')
mask = trv_X.notna().all(axis=1) & trv_y.notna()
trv_X = trv_X.loc[mask].to_numpy(dtype=float)
trv_y = trv_y.loc[mask].to_numpy(dtype=float)

scaler = StandardScaler()
trv_Xs = scaler.fit_transform(trv_X)

# Simple time series CV to choose alpha
alphas = [0.0, 0.01, 0.1, 1.0, 5.0, 10.0]
cv = TimeSeriesSplit(n_splits=5)

best_alpha, best_rmse = None, float('inf')
for a in alphas:
    rmses = []
    for tr_idx, va_idx in cv.split(trv_Xs):
        Xtr, Xva = trv_Xs[tr_idx], trv_Xs[va_idx]
        ytr, yva = trv_y[tr_idx], trv_y[va_idx]
        model = Ridge(alpha=a, fit_intercept=True, random_state=42)
        model.fit(Xtr, ytr)
        yhat = model.predict(Xva)
        rmses.append(np.sqrt(np.mean((yhat - yva) ** 2)))
    avg_rmse = float(np.mean(rmses))
    if avg_rmse < best_rmse:
        best_rmse, best_alpha = avg_rmse, a

model = Ridge(alpha=best_alpha, fit_intercept=True, random_state=42)
model.fit(trv_Xs, trv_y)

# Evaluate on splits
for name, d in [('train', train_df), ('val', val_df), ('test', test_df)]:
    X = d[qcols].apply(pd.to_numeric, errors='coerce')
    m = X.notna().all(axis=1) & pd.to_numeric(d['y_true'], errors='coerce').notna()
    Xs = scaler.transform(X.loc[m].to_numpy(dtype=float))
    y = pd.to_numeric(d['y_true'], errors='coerce').loc[m].to_numpy(dtype=float)
    yhat = model.predict(Xs)
    rmse_val = float(np.sqrt(np.mean((yhat - y) ** 2))) if len(y) else float('nan')
    corr_val = float(np.corrcoef(yhat, y)[0,1]) if len(y) else float('nan')
    print(f"{name}: Ridge[qs] (alpha={best_alpha}) RMSE={rmse_val:.6g}, Corr={corr_val:.4f}")


train: Ridge[qs] (alpha=10.0) RMSE=0.00959049, Corr=0.9247
val: Ridge[qs] (alpha=10.0) RMSE=0.0281981, Corr=0.1419
test: Ridge[qs] (alpha=10.0) RMSE=0.0217679, Corr=0.0150


In [12]:
import numpy as np

def rmse(a, b):
    a = np.asarray(a, dtype=float)
    b = np.asarray(b, dtype=float)
    return float(np.sqrt(np.mean((a - b) ** 2)))

def brier(p, ybin):
    p = np.asarray(p, dtype=float)
    yb = np.asarray(ybin, dtype=float)
    return float(np.mean((p - yb) ** 2))


def eval_split(df: pd.DataFrame, name: str):
    y = pd.to_numeric(df['y_true'], errors='coerce')
    exp = pd.to_numeric(df['exp_ret_avg'], errors='coerce')
    prob = pd.to_numeric(df['prob_up'], errors='coerce')

    # RMSE for expected return vs true
    m = y.notna() & exp.notna()
    rmse_val = rmse(exp[m], y[m]) if m.any() else float('nan')

    # Brier score for probability of positive
    ybin = (y > 0).astype(int)
    m2 = ybin.notna() & prob.notna()
    brier_val = brier(prob[m2], ybin[m2]) if m2.any() else float('nan')

    print(f"{name}: RMSE(exp_ret_avg,y_true)={rmse_val:.6g}, Brier(prob_up)={brier_val:.6g}")
    return rmse_val, brier_val

print("Metrics (RMSE for exp_ret_avg, Brier for prob_up):")
rmse_train, brier_train = eval_split(train_ind, 'train')
rmse_val, brier_val = eval_split(val_ind, 'val')
rmse_test, brier_test = eval_split(test_ind, 'test')


Metrics (RMSE for exp_ret_avg, Brier for prob_up):
train: RMSE(exp_ret_avg,y_true)=0.0207627, Brier(prob_up)=0.195347
val: RMSE(exp_ret_avg,y_true)=0.0271975, Brier(prob_up)=0.248301
test: RMSE(exp_ret_avg,y_true)=0.0187889, Brier(prob_up)=0.251163


In [39]:
import numpy as np
import pandas as pd

# Quick checks: normalized errors, base-rate Brier/BSS, reliability/ECE, correlations

def rmse(a, b):
    a = np.asarray(a, dtype=float)
    b = np.asarray(b, dtype=float)
    return float(np.sqrt(np.mean((a - b) ** 2)))


def corr_r2(a, b):
    a = pd.to_numeric(a, errors='coerce')
    b = pd.to_numeric(b, errors='coerce')
    m = a.notna() & b.notna()
    if not m.any():
        return float('nan'), float('nan')
    r = float(np.corrcoef(a[m], b[m])[0, 1])
    return r, r * r


def brier_score(p, ybin):
    p = np.asarray(p, dtype=float)
    yb = np.asarray(ybin, dtype=float)
    return float(np.mean((p - yb) ** 2))


def base_rate_brier(ybin):
    p_hat = float(np.mean(ybin))
    return p_hat * (1.0 - p_hat), p_hat


def reliability_bins(prob, ybin, num_bins=10):
    df = pd.DataFrame({'p': pd.to_numeric(prob, errors='coerce'), 'y': pd.to_numeric(ybin, errors='coerce')})
    df = df.dropna()
    if df.empty:
        return pd.DataFrame(), float('nan')
    edges = np.linspace(0.0, 1.0, num_bins + 1)
    df['bin'] = pd.cut(df['p'], bins=edges, include_lowest=True, right=True)
    grp = df.groupby('bin', observed=True)
    tab = grp.agg(count=('y', 'size'), mean_p=('p', 'mean'), emp_rate=('y', 'mean')).reset_index()
    N = float(tab['count'].sum())
    tab = tab[tab['count'] > 0]
    ece = float(np.sum((tab['count'] / N) * np.abs(tab['mean_p'] - tab['emp_rate']))) if N > 0 else float('nan')
    return tab, ece


def split_quick_checks(df: pd.DataFrame, name: str):
    y = pd.to_numeric(df['y_true'], errors='coerce')
    exp_avg = pd.to_numeric(df['exp_ret_avg'], errors='coerce')
    exp_con = pd.to_numeric(df['exp_ret_conservative'], errors='coerce')
    prob = pd.to_numeric(df['prob_up'], errors='coerce')
    ybin = (y > 0).astype(int)

    # RMSE and normalized RMSE
    mask_avg = y.notna() & exp_avg.notna()
    sigma_y = float(y[mask_avg].std(ddof=0)) if mask_avg.any() else float('nan')
    rmse_avg = rmse(exp_avg[mask_avg], y[mask_avg]) if mask_avg.any() else float('nan')
    nrmse_avg = (rmse_avg / sigma_y) if (sigma_y and sigma_y > 0) else float('nan')

    mask_con = y.notna() & exp_con.notna()
    rmse_con = rmse(exp_con[mask_con], y[mask_con]) if mask_con.any() else float('nan')
    nrmse_con = (rmse_con / sigma_y) if (sigma_y and sigma_y > 0) else float('nan')

    # Correlation / R^2
    r_avg, r2_avg = corr_r2(exp_avg, y)

    # Brier, base-rate Brier, BSS
    mask_prob = prob.notna() & ybin.notna()
    brier = brier_score(prob[mask_prob], ybin[mask_prob]) if mask_prob.any() else float('nan')
    brier_base, p_hat = base_rate_brier(ybin[mask_prob]) if mask_prob.any() else (float('nan'), float('nan'))
    bss = (1.0 - brier / brier_base) if (brier_base and brier_base > 0) else float('nan')

    # Reliability / ECE
    rel, ece = reliability_bins(prob[mask_prob], ybin[mask_prob], num_bins=10) if mask_prob.any() else (pd.DataFrame(), float('nan'))

    # Prob summary
    prob_desc = prob.describe(percentiles=[0.1, 0.25, 0.5, 0.75, 0.9])

    print(f"{name}:\n"
          f"  RMSE(avg)={rmse_avg:.6g}, NRMSE(avg)={nrmse_avg:.6g}, RMSE(cons)={rmse_con:.6g}, NRMSE(cons)={nrmse_con:.6g}\n"
          f"  Corr(avg,y)={r_avg:.4f}, R2(avg)={r2_avg:.4f}\n"
          f"  Brier={brier:.6g}, BaseBrier={brier_base:.6g}, BSS={bss:.4f}, p_hat={p_hat:.3f}\n"
          f"  ECE={ece:.4f}\n"
          f"  prob_up summary: mean={prob_desc['mean']:.3f}, std={prob_desc['std']:.3f}, median={prob_desc['50%']:.3f}, p10={prob_desc['10%']:.3f}, p90={prob_desc['90%']:.3f}")
    return {
        'rmse_avg': rmse_avg, 'nrmse_avg': nrmse_avg, 'rmse_con': rmse_con, 'nrmse_con': nrmse_con,
        'corr_avg': r_avg, 'r2_avg': r2_avg, 'brier': brier, 'brier_base': brier_base, 'bss': bss, 'p_hat': p_hat,
        'ece': ece, 'reliability': rel,
    }

print("Quick checks:")
qc_train = split_quick_checks(train_ind, 'train')
qc_val = split_quick_checks(val_ind, 'val')
qc_test = split_quick_checks(test_ind, 'test')

# Show non-empty reliability tables for val/test
qc_val['reliability'].head(12), qc_test['reliability'].head(12)


Quick checks:
train:
  RMSE(avg)=0.0209008, NRMSE(avg)=0.839464, RMSE(cons)=0.0220408, NRMSE(cons)=0.885251
  Corr(avg,y)=0.8385, R2(avg)=0.7031
  Brier=0.23619, BaseBrier=0.24943, BSS=0.0531, p_hat=0.524
  ECE=0.0909
  prob_up summary: mean=0.518, std=0.037, median=0.518, p10=0.491, p90=0.553
val:
  RMSE(avg)=0.0270382, NRMSE(avg)=0.986567, RMSE(cons)=0.0277797, NRMSE(cons)=1.01362
  Corr(avg,y)=0.2047, R2(avg)=0.0419
  Brier=0.249018, BaseBrier=0.249879, BSS=0.0034, p_hat=0.511
  ECE=0.0127
  prob_up summary: mean=0.512, std=0.016, median=0.509, p10=0.493, p90=0.536
test:
  RMSE(avg)=0.0188572, NRMSE(avg)=1.00241, RMSE(cons)=0.020022, NRMSE(cons)=1.06432
  Corr(avg,y)=0.0525, R2(avg)=0.0028
  Brier=0.248689, BaseBrier=0.247644, BSS=-0.0042, p_hat=0.549
  ECE=0.0337
  prob_up summary: mean=0.515, std=0.011, median=0.514, p10=0.502, p90=0.527


(          bin  count    mean_p  emp_rate
 0  (0.4, 0.5]    628  0.491131  0.455414
 1  (0.5, 0.6]   2697  0.516560  0.523915,
           bin  count    mean_p  emp_rate
 0  (0.4, 0.5]    122  0.497169  0.704918
 1  (0.5, 0.6]   3205  0.515496  0.542590)

In [14]:
# Post-hoc linear mapping from exp_ret to y_true (train+val fit, test untouched)
import numpy as np
import pandas as pd

# Build train+val frame
trv = pd.concat([train_df, val_df], ignore_index=True)

# Helper: closed-form OLS with intercept for columns in Xcols
def fit_ols(df, ycol, xcols):
    y = pd.to_numeric(df[ycol], errors='coerce')
    X = df[xcols].apply(pd.to_numeric, errors='coerce')
    m = y.notna()
    for c in xcols:
        m &= X[c].notna()
    y = y[m].to_numpy(dtype=float)
    X = X.loc[m, :].to_numpy(dtype=float)
    # Add intercept
    Xw = np.column_stack([np.ones(len(X)), X])
    beta, *_ = np.linalg.lstsq(Xw, y, rcond=None)
    return beta, xcols

# Fit two variants on train+val
beta1, cols1 = fit_ols(trv, 'y_true', ['exp_ret_avg'])
beta2, cols2 = fit_ols(trv, 'y_true', ['exp_ret_avg', 'tail_spread'])

# Predict helper
def predict_ols(df, beta, xcols):
    X = df[xcols].apply(pd.to_numeric, errors='coerce').to_numpy(dtype=float)
    Xw = np.column_stack([np.ones(len(X)), X])
    return Xw @ beta

# Evaluate on splits
for name, d in [('train', train_df), ('val', val_df), ('test', test_df)]:
    y = pd.to_numeric(d['y_true'], errors='coerce').to_numpy(dtype=float)
    m1 = np.isfinite(y)
    y1 = y[m1]
    yhat1 = predict_ols(d.iloc[m1.nonzero()[0]], beta1, cols1)
    yhat2 = predict_ols(d.iloc[m1.nonzero()[0]], beta2, cols2)
    def _rmse(a,b):
        return float(np.sqrt(np.mean((a-b)**2))) if len(a)>0 else float('nan')
    def _corr(a,b):
        if len(a) == 0:
            return float('nan')
        c = np.corrcoef(a, b)[0,1]
        return float(c)
    rm1, r1 = _rmse(y1, yhat1), _corr(y1, yhat1)
    rm2, r2 = _rmse(y1, yhat2), _corr(y1, yhat2)
    print(f"{name}: OLS[exp] RMSE={rm1:.6g}, Corr={r1:.4f} | OLS[exp,tail] RMSE={rm2:.6g}, Corr={r2:.4f}")


train: OLS[exp] RMSE=0.0118507, Corr=0.8799 | OLS[exp,tail] RMSE=0.0101492, Corr=0.9160
val: OLS[exp] RMSE=0.0279753, Corr=0.1333 | OLS[exp,tail] RMSE=0.0288044, Corr=0.1192
test: OLS[exp] RMSE=0.0195037, Corr=0.0989 | OLS[exp,tail] RMSE=0.0224342, Corr=0.0767


In [17]:
# Ridge regression on quantile vector to predict y_true (train+val fit)
import numpy as np
import pandas as pd
import re

from sklearn.linear_model import Ridge
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler

# Select quantile columns like pred_q05, pred_q10, ..., pred_q95
qcols = [c for c in train_df.columns if re.fullmatch(r'pred_q\d{2}', str(c))]
qcols = sorted(qcols, key=lambda c: int(c[-2:]))
if not qcols:
    raise ValueError("No quantile columns found in train_df matching 'pred_q##'.")

trv_X = pd.concat([train_df[qcols], val_df[qcols]], ignore_index=True).apply(pd.to_numeric, errors='coerce')
trv_y = pd.concat([train_df['y_true'], val_df['y_true']], ignore_index=True).apply(pd.to_numeric, errors='coerce')
mask = trv_X.notna().all(axis=1) & trv_y.notna()
trv_X = trv_X.loc[mask].to_numpy(dtype=float)
trv_y = trv_y.loc[mask].to_numpy(dtype=float)

scaler = StandardScaler()
trv_Xs = scaler.fit_transform(trv_X)

# Simple time series CV to choose alpha
alphas = [0.0, 0.01, 0.1, 1.0, 5.0, 10.0]
cv = TimeSeriesSplit(n_splits=5)

best_alpha, best_rmse = None, float('inf')
for a in alphas:
    rmses = []
    for tr_idx, va_idx in cv.split(trv_Xs):
        Xtr, Xva = trv_Xs[tr_idx], trv_Xs[va_idx]
        ytr, yva = trv_y[tr_idx], trv_y[va_idx]
        model = Ridge(alpha=a, fit_intercept=True, random_state=42)
        model.fit(Xtr, ytr)
        yhat = model.predict(Xva)
        rmses.append(np.sqrt(np.mean((yhat - yva) ** 2)))
    avg_rmse = float(np.mean(rmses))
    if avg_rmse < best_rmse:
        best_rmse, best_alpha = avg_rmse, a

model = Ridge(alpha=best_alpha, fit_intercept=True, random_state=42)
model.fit(trv_Xs, trv_y)

# Evaluate on splits
for name, d in [('train', train_df), ('val', val_df), ('test', test_df)]:
    X = d[qcols].apply(pd.to_numeric, errors='coerce')
    m = X.notna().all(axis=1) & pd.to_numeric(d['y_true'], errors='coerce').notna()
    Xs = scaler.transform(X.loc[m].to_numpy(dtype=float))
    y = pd.to_numeric(d['y_true'], errors='coerce').loc[m].to_numpy(dtype=float)
    yhat = model.predict(Xs)
    rmse_val = float(np.sqrt(np.mean((yhat - y) ** 2))) if len(y) else float('nan')
    corr_val = float(np.corrcoef(yhat, y)[0,1]) if len(y) else float('nan')
    print(f"{name}: Ridge[qs] (alpha={best_alpha}) RMSE={rmse_val:.6g}, Corr={corr_val:.4f}")


train: Ridge[qs] (alpha=10.0) RMSE=0.00959049, Corr=0.9247
val: Ridge[qs] (alpha=10.0) RMSE=0.0281981, Corr=0.1419
test: Ridge[qs] (alpha=10.0) RMSE=0.0217679, Corr=0.0150


In [40]:
# Isotonic regression mapping y ~ f(exp_ret_avg) (train+val fit)
import numpy as np
import pandas as pd
from sklearn.isotonic import IsotonicRegression

# Prepare train+val
trv_exp = pd.to_numeric(trv['exp_ret_avg'], errors='coerce')
trv_y = pd.to_numeric(trv['y_true'], errors='coerce')
mask = trv_exp.notna() & trv_y.notna()
X_trv = trv_exp.loc[mask].to_numpy(dtype=float)
y_trv = trv_y.loc[mask].to_numpy(dtype=float)

iso = IsotonicRegression(y_min=None, y_max=None, increasing=True, out_of_bounds='clip')
iso.fit(X_trv, y_trv)

# Evaluate
for name, d in [('train', train_df), ('val', val_df), ('test', test_df)]:
    x = pd.to_numeric(d['exp_ret_avg'], errors='coerce')
    y = pd.to_numeric(d['y_true'], errors='coerce')
    m = x.notna() & y.notna()
    x1 = x.loc[m].to_numpy(dtype=float)
    y1 = y.loc[m].to_numpy(dtype=float)
    yhat = iso.predict(x1)
    rmse_val = float(np.sqrt(np.mean((yhat - y1) ** 2))) if len(y1) else float('nan')
    corr_val = float(np.corrcoef(yhat, y1)[0,1]) if len(y1) else float('nan')
    print(f"{name}: Isotonic(exp) RMSE={rmse_val:.6g}, Corr={corr_val:.4f}")


train: Isotonic(exp) RMSE=0.0130173, Corr=0.8576
val: Isotonic(exp) RMSE=0.0273142, Corr=0.1886
test: Isotonic(exp) RMSE=0.0202509, Corr=0.0481


In [41]:
# Calibrate exp_ret_avg and prob_up via isotonic (fit on train+val), then apply to all splits
import numpy as np
import pandas as pd
from sklearn.isotonic import IsotonicRegression

# Prepare train+val data
trv_all = pd.concat([train_df, val_df], ignore_index=True)

# Calibrator for expected return: exp_ret_avg -> y_true
x_exp = pd.to_numeric(trv_all['exp_ret_avg'], errors='coerce')
y_exp = pd.to_numeric(trv_all['y_true'], errors='coerce')
mask_exp = x_exp.notna() & y_exp.notna()
iso_exp = IsotonicRegression(increasing=True, out_of_bounds='clip')
iso_exp.fit(x_exp.loc[mask_exp].to_numpy(dtype=float), y_exp.loc[mask_exp].to_numpy(dtype=float))

# Calibrator for probability: prob_up -> 1[y_true>0]
p_prob = pd.to_numeric(trv_all['prob_up'], errors='coerce')
y_bin = (pd.to_numeric(trv_all['y_true'], errors='coerce') > 0).astype(float)
mask_prob = p_prob.notna() & y_bin.notna()
iso_prob = IsotonicRegression(increasing=True, out_of_bounds='clip')
iso_prob.fit(p_prob.loc[mask_prob].to_numpy(dtype=float), y_bin.loc[mask_prob].to_numpy(dtype=float))

# Apply to splits
for name, d in [('train', train_df), ('val', val_df), ('test', test_df)]:
    d['exp_ret_cal'] = iso_exp.predict(pd.to_numeric(d['exp_ret_avg'], errors='coerce').to_numpy(dtype=float))
    d['prob_up_cal'] = iso_prob.predict(pd.to_numeric(d['prob_up'], errors='coerce').to_numpy(dtype=float))
    print(f"{name}: added exp_ret_cal, prob_up_cal; shapes now {d.shape}")

# Preview
train_df[['exp_ret_avg','exp_ret_cal','prob_up','prob_up_cal']].head(), \
val_df[['exp_ret_avg','exp_ret_cal','prob_up','prob_up_cal']].head(), \
test_df[['exp_ret_avg','exp_ret_cal','prob_up','prob_up_cal']].head()


train: added exp_ret_cal, prob_up_cal; shapes now (15521, 22)
val: added exp_ret_cal, prob_up_cal; shapes now (3325, 22)
test: added exp_ret_cal, prob_up_cal; shapes now (3327, 22)


(   exp_ret_avg  exp_ret_cal   prob_up  prob_up_cal
 0     0.003349     0.009203  0.521131     0.652327
 1     0.003528     0.009934  0.521131     0.652327
 2     0.003232     0.009203  0.521131     0.652327
 3     0.003621     0.009934  0.521131     0.652327
 4     0.003398     0.009203  0.521131     0.652327,
    exp_ret_avg  exp_ret_cal   prob_up  prob_up_cal
 0    -0.001214    -0.006704  0.535163     0.781407
 1    -0.001400    -0.007575  0.535163     0.781407
 2    -0.001160    -0.006704  0.535163     0.781407
 3    -0.001399    -0.007575  0.526237     0.693793
 4    -0.001282    -0.006704  0.526237     0.693793,
    exp_ret_avg  exp_ret_cal   prob_up  prob_up_cal
 0    -0.001586    -0.008531  0.514295     0.479282
 1    -0.001865    -0.009556  0.514295     0.479282
 2    -0.002095    -0.010184  0.514295     0.479282
 3    -0.002016    -0.010184  0.514295     0.479282
 4    -0.002201    -0.012830  0.514295     0.479282)

In [None]:
# Apply indicator/evaluation on calibrated expected return and probability
import pandas as pd

# Helper to use calibrated columns with existing indicator function
def add_indicator_with_cal(signals_df: pd.DataFrame, original_df: pd.DataFrame,
                           tau_long: float = 0.51, tau_short: float = 0.49,
                           exp_col: str = 'exp_ret_cal', prob_col: str = 'prob_up_cal',
                           out_col: str = 'signal_prob_exp_q50') -> pd.DataFrame:
    tmp = signals_df.copy()
    tmp['exp_ret_avg'] = pd.to_numeric(tmp[exp_col], errors='coerce')
    tmp['prob_up'] = pd.to_numeric(tmp[prob_col], errors='coerce')
    return add_prob_exp_q50_indicator(tmp, original_df, tau_long=tau_long, tau_short=tau_short,
                                      exp_col='exp_ret_avg', out_col=out_col)

# Build calibrated indicators
train_ind_cal = add_indicator_with_cal(train_df, orig_train_df)
val_ind_cal = add_indicator_with_cal(val_df, orig_val_df)
test_ind_cal = add_indicator_with_cal(test_df, orig_test_df)

print("Calibrated indicator value counts (train/val/test):")
print(train_ind_cal['signal_prob_exp_q50'].value_counts().sort_index())
print(val_ind_cal['signal_prob_exp_q50'].value_counts().sort_index())
print(test_ind_cal['signal_prob_exp_q50'].value_counts().sort_index())

# Evaluate calibrated indicators
cum_train_cal = evaluate_indicator(train_ind_cal, 'train_cal')
cum_val_cal = evaluate_indicator(val_ind_cal, 'val_cal')
cum_test_cal = evaluate_indicator(test_ind_cal, 'test_cal')

print(f"train_cal: {pct_from_cum_log(cum_train_cal):.2f}%")
print(f"val_cal:   {pct_from_cum_log(cum_val_cal):.2f}%")
print(f"test_cal:  {pct_from_cum_log(cum_test_cal):.2f}%")


Calibrated indicator value counts (train/val/test):
signal_prob_exp_q50
-1    1839
 0    7865
 1    5817
Name: count, dtype: int64
signal_prob_exp_q50
-1     369
 0    2378
 1     578
Name: count, dtype: int64
signal_prob_exp_q50
-1      58
 0    2596
 1     673
Name: count, dtype: int64
train_cal: trades=7656, win_rate=0.867, cumulative_return=150.599633
val_cal: trades=947, win_rate=0.582, cumulative_return=4.121872
test_cal: trades=731, win_rate=0.549, cumulative_return=2.070407
train_cal: 25385725917155634173395410323406757775886159554795107967539500548096.00%
val_cal:   6067.46%
test_cal:  692.80%


In [44]:
pct_from_cum_log

<function __main__.pct_from_cum_log(cum_series)>

In [23]:
# Per-quantile calibration (CQR-style residual shifts), then recompute prob_up
import re
import numpy as np
import pandas as pd
from strategies.quantile_signals import estimate_prob_up

# Identify quantile columns
qcols = sorted([c for c in train_df.columns if re.fullmatch(r'pred_q\d{2}', str(c))], key=lambda c: int(c[-2:]))
if not qcols:
    raise ValueError("No pred_q## columns found.")

# Train+val residual quantiles at each p
trv = pd.concat([train_df, val_df], ignore_index=True)
shifts: dict[str, float] = {}
for c in qcols:
    p = int(c[-2:]) / 100.0
    y = pd.to_numeric(trv['y_true'], errors='coerce')
    q = pd.to_numeric(trv[c], errors='coerce')
    resid = (y - q).dropna()
    shifts[c] = float(np.quantile(resid.to_numpy(), p)) if len(resid) else 0.0

print("Per-quantile residual shifts (train+val):")
for c in qcols:
    print(f"  {c}: shift={shifts[c]:+.6f}")

# Apply shifts, enforce monotonicity, recompute prob_up
for name, d in [('train', train_df), ('val', val_df), ('test', test_df)]:
    # Create calibrated quantile columns
    qcal = []
    for c in qcols:
        cc = f"{c}_qcal"
        d[cc] = pd.to_numeric(d[c], errors='coerce') + shifts[c]
        qcal.append(cc)
    # Monotone fix
    d[qcal] = d[qcal].cummax(axis=1)
    # Recompute prob_up from calibrated curve
    d['prob_up_qcal'] = d[qcal].apply(lambda row: estimate_prob_up(row, qcols=qcal), axis=1)
    # Report Brier score for calibrated prob
    ybin = (pd.to_numeric(d['y_true'], errors='coerce') > 0).astype(float)
    m = ybin.notna() & d['prob_up_qcal'].notna()
    brier = float(np.mean((d.loc[m, 'prob_up_qcal'].to_numpy() - ybin.loc[m].to_numpy())**2)) if m.any() else float('nan')
    print(f"{name}: calibrated prob_up Brier={brier:.6g} (cols added: {len(qcal)} qcal + prob_up_qcal)")


Per-quantile residual shifts (train+val):
  pred_q05: shift=+0.001509
  pred_q10: shift=-0.000529
  pred_q15: shift=-0.000030
  pred_q25: shift=+0.000630
  pred_q50: shift=-0.000044
  pred_q75: shift=-0.000278
  pred_q85: shift=-0.001768
  pred_q90: shift=-0.001677
  pred_q95: shift=-0.002197
train: calibrated prob_up Brier=0.191965 (cols added: 9 qcal + prob_up_qcal)
val: calibrated prob_up Brier=0.248301 (cols added: 9 qcal + prob_up_qcal)
test: calibrated prob_up Brier=0.25132 (cols added: 9 qcal + prob_up_qcal)


In [30]:
# Combine isotonic-calibrated exp_ret with per-quantile-calibrated prob_up and evaluate
# Uses exp_ret_cal + prob_up_qcal

# Build combined indicators
train_ind_combo = add_indicator_with_cal(train_df, orig_train_df, exp_col='exp_ret_cal', prob_col='prob_up_qcal', out_col='signal_prob_exp_q50_cal')
val_ind_combo = add_indicator_with_cal(val_df, orig_val_df, exp_col='exp_ret_cal', prob_col='prob_up_qcal', out_col='signal_prob_exp_q50_cal')
test_ind_combo = add_indicator_with_cal(test_df, orig_test_df, exp_col='exp_ret_cal', prob_col='prob_up_qcal', out_col='signal_prob_exp_q50_cal', tau_long=0.55, tau_short=0.47)

print("Combined (exp_ret_cal + prob_up_qcal) indicator counts (train/val/test):")
print(train_ind_combo['signal_prob_exp_q50_cal'].value_counts().sort_index())
print(val_ind_combo['signal_prob_exp_q50_cal'].value_counts().sort_index())
print(test_ind_combo['signal_prob_exp_q50_cal'].value_counts().sort_index())

# Evaluate
cum_train_combo = evaluate_indicator(train_ind_combo.rename(columns={'signal_prob_exp_q50_cal': 'signal_prob_exp_q50'}), 'train_combo')
cum_val_combo = evaluate_indicator(val_ind_combo.rename(columns={'signal_prob_exp_q50_cal': 'signal_prob_exp_q50'}), 'val_combo')
cum_test_combo = evaluate_indicator(test_ind_combo.rename(columns={'signal_prob_exp_q50_cal': 'signal_prob_exp_q50'}), 'test_combo')

print(f"train_combo: {pct_from_cum_log(cum_train_combo):.2f}%")
print(f"val_combo:   {pct_from_cum_log(cum_val_combo):.2f}%")
print(f"test_combo:  {pct_from_cum_log(cum_test_combo):.2f}%")


Combined (exp_ret_cal + prob_up_qcal) indicator counts (train/val/test):
signal_prob_exp_q50_cal
-1    5368
 0    5952
 1    4201
Name: count, dtype: int64
signal_prob_exp_q50_cal
-1     192
 0    2732
 1     401
Name: count, dtype: int64
signal_prob_exp_q50_cal
-1     105
 0    3171
 1      51
Name: count, dtype: int64
train_combo: trades=9569, win_rate=0.945, cumulative_return=208.857665
val_combo: trades=593, win_rate=0.578, cumulative_return=4.179296
test_combo: trades=156, win_rate=0.551, cumulative_return=0.397202
train_combo: 507845077095603046846669933891975786539291390679672293304488703376732358512808572299924996096.00%
val_combo:   6431.98%
test_combo:  48.77%


In [27]:
# Probability calibration diagnostics for prob_up variants (orig, isotonic, per-quantile)
import pandas as pd

def eval_probs(df: pd.DataFrame, name: str, columns=(('orig','prob_up'), ('iso','prob_up_cal'), ('qcal','prob_up_qcal'))):
    y = pd.to_numeric(df['y_true'], errors='coerce')
    ybin = (y > 0).astype(int)
    for label, col in columns:
        if col not in df.columns:
            print(f"{name} {label}: '{col}' missing, skip")
            continue
        p = pd.to_numeric(df[col], errors='coerce')
        m = p.notna() & ybin.notna()
        if not m.any():
            print(f"{name} {label}: no valid rows")
            continue
        br = brier_score(p[m], ybin[m])
        br_base, p_hat = base_rate_brier(ybin[m])
        bss = (1.0 - br / br_base) if (br_base and br_base > 0) else float('nan')
        rel, ece = reliability_bins(p[m], ybin[m], num_bins=10)
        desc = p[m].describe(percentiles=[0.1, 0.25, 0.5, 0.75, 0.9])
        print(
            f"{name} {label}: Brier={br:.6g}, Base={br_base:.6g}, BSS={bss:.4f}, ECE={ece:.4f}, "
            f"mean={desc['mean']:.3f}, std={desc['std']:.3f}, med={desc['50%']:.3f}, "
            f"p10={desc['10%']:.3f}, p90={desc['90%']:.3f}, p_hat={p_hat:.3f}"
        )
        # Show first few reliability bins
        try:
            print(rel.head(10).to_string(index=False))
        except Exception:
            pass

print("Calibrated probability diagnostics (orig vs isotonic vs per-quantile):")
eval_probs(train_df, 'train')
eval_probs(val_df, 'val')
eval_probs(test_df, 'test')


Calibrated probability diagnostics (orig vs isotonic vs per-quantile):
train orig: Brier=0.195347, Base=0.24943, BSS=0.2168, ECE=0.2722, mean=0.492, std=0.091, med=0.521, p10=0.351, p90=0.585, p_hat=0.524
       bin  count   mean_p  emp_rate
(0.2, 0.3]    690 0.263354  0.011594
(0.3, 0.4]   2130 0.357523  0.021596
(0.4, 0.5]   3455 0.452548  0.170767
(0.5, 0.6]   8555 0.549907  0.795675
(0.6, 0.7]    691 0.608626  0.984081
train iso: Brier=0.106623, Base=0.24943, BSS=0.5725, ECE=0.0289, mean=0.519, std=0.361, med=0.558, p10=0.026, p90=0.972, p_hat=0.524
          bin  count   mean_p  emp_rate
(-0.001, 0.1]   3811 0.032168  0.031750
   (0.1, 0.2]   1083 0.154273  0.135734
   (0.2, 0.3]    323 0.262217  0.201238
   (0.3, 0.4]    375 0.337900  0.264000
   (0.4, 0.5]   1560 0.440873  0.364103
   (0.5, 0.6]   1152 0.561535  0.591146
   (0.6, 0.7]    346 0.620469  0.658960
   (0.7, 0.8]   1220 0.741029  0.780328
   (0.8, 0.9]   2309 0.832284  0.881334
   (0.9, 1.0]   3342 0.952635  0.967983


In [25]:
# Detailed trade distribution summary for indicator-based signals
import numpy as np
import pandas as pd

def summarize_trade_distribution(
    df: pd.DataFrame,
    signal_col: str = 'signal_prob_exp_q50',
    y_col: str = 'y_true',
    prob_col: str | None = 'prob_up',
    exp_col: str | None = 'exp_ret_avg',
    num_prob_bins: int = 10,
    num_exp_bins: int = 10,
):
    """
    Build a detailed summary of simulated trades induced by a {-1,0,1} signal column.

    Returns a dict of DataFrames:
      - 'overall': single-row overall stats
      - 'by_side': stats for long and short subsets
      - 'quantiles': return quantiles for all/long/short
      - 'by_prob_bin': per-bin stats by prob_col (if available)
      - 'by_exp_bin': per-bin stats by exp_col (if available)
    """
    if signal_col not in df.columns:
        raise KeyError(f"'{signal_col}' not found")
    if y_col not in df.columns:
        raise KeyError(f"'{y_col}' not found")

    s = pd.to_numeric(df[signal_col], errors='coerce').fillna(0).astype(int)
    y_log = pd.to_numeric(df[y_col], errors='coerce')
    simple_ret = np.exp(y_log) - 1.0
    trade_ret = s * simple_ret
    trades_mask = s != 0

    # Wins among trades
    wins_mask = trades_mask & y_log.notna()
    wins = ((np.sign(y_log[wins_mask]) == s[wins_mask]) & (np.sign(y_log[wins_mask]) != 0)).astype(float)

    # Helper to compute stats safely
    def _stats(ret: pd.Series, mask: pd.Series):
        rr = ret[mask].dropna()
        if rr.empty:
            return dict(count=0, win_rate=np.nan, mean=np.nan, median=np.nan, std=np.nan,
                        min=np.nan, p05=np.nan, p10=np.nan, p25=np.nan, p50=np.nan,
                        p75=np.nan, p90=np.nan, p95=np.nan, max=np.nan,
                        es_5=np.nan, top5_mean=np.nan)
        q = rr.quantile([0.05,0.10,0.25,0.50,0.75,0.90,0.95])
        k = max(int(len(rr) * 0.05), 1)
        es_5 = rr.nsmallest(k).mean()
        top5 = rr.nlargest(k).mean()
        return dict(
            count=int(len(rr)),
            win_rate=float(((np.sign(y_log[mask]) == s[mask]) & (np.sign(y_log[mask]) != 0)).mean()) if mask.any() else np.nan,
            mean=float(rr.mean()),
            median=float(rr.median()),
            std=float(rr.std(ddof=0)),
            min=float(rr.min()),
            p05=float(q.loc[0.05]),
            p10=float(q.loc[0.10]),
            p25=float(q.loc[0.25]),
            p50=float(q.loc[0.50]),
            p75=float(q.loc[0.75]),
            p90=float(q.loc[0.90]),
            p95=float(q.loc[0.95]),
            max=float(rr.max()),
            es_5=float(es_5),
            top5_mean=float(top5),
        )

    overall = {
        'total_rows': int(len(df)),
        'num_trades': int(trades_mask.sum()),
        'num_longs': int((s == 1).sum()),
        'num_shorts': int((s == -1).sum()),
        'trades_share': float(trades_mask.mean()),
        'long_share': float(((s == 1).sum()) / trades_mask.sum()) if trades_mask.sum() else np.nan,
        'short_share': float(((s == -1).sum()) / trades_mask.sum()) if trades_mask.sum() else np.nan,
        'win_rate': float(wins.mean()) if len(wins) else np.nan,
        'avg_trade_ret': float(trade_ret[trades_mask].mean()) if trades_mask.any() else np.nan,
        'median_trade_ret': float(trade_ret[trades_mask].median()) if trades_mask.any() else np.nan,
    }
    overall_df = pd.DataFrame([overall])

    by_side_rows = []
    by_side_rows.append({'side': 'long', **_stats(trade_ret, s == 1)})
    by_side_rows.append({'side': 'short', **_stats(trade_ret, s == -1)})
    by_side_df = pd.DataFrame(by_side_rows)

    # Quantiles table for all / long / short
    qs = [0.01,0.05,0.10,0.25,0.50,0.75,0.90,0.95,0.99]
    def _quantiles(series: pd.Series):
        if series.dropna().empty:
            return pd.Series({q: np.nan for q in qs})
        return series.quantile(qs)
    quantiles_df = pd.DataFrame({
        'all': _quantiles(trade_ret[trades_mask]),
        'long': _quantiles(trade_ret[s == 1]),
        'short': _quantiles(trade_ret[s == -1]),
    })
    quantiles_df.index.name = 'quantile'
    quantiles_df = quantiles_df.reset_index()

    # Per-bin by probability
    by_prob_bin = pd.DataFrame()
    if prob_col and prob_col in df.columns:
        p = pd.to_numeric(df[prob_col], errors='coerce')
        tm = trades_mask & p.notna()
        tmp = pd.DataFrame({
            'p': p[tm],
            'ret': trade_ret[tm],
            'win': ((np.sign(y_log[tm]) == s[tm]) & (np.sign(y_log[tm]) != 0)).astype(float),
            'side': s[tm],
        })
        edges = np.linspace(0.0, 1.0, int(num_prob_bins) + 1)
        tmp['bin'] = pd.cut(tmp['p'], bins=edges, include_lowest=True, right=True)
        g = tmp.groupby('bin', observed=True)
        by_prob_bin = g.agg(
            trades=('ret', 'size'),
            mean_ret=('ret', 'mean'),
            median_ret=('ret', 'median'),
            win_rate=('win', 'mean'),
            long_share=('side', lambda a: float((a == 1).mean()) if len(a) else np.nan),
            short_share=('side', lambda a: float((a == -1).mean()) if len(a) else np.nan),
            p05=('ret', lambda a: float(np.quantile(a, 0.05)) if len(a) else np.nan),
            p95=('ret', lambda a: float(np.quantile(a, 0.95)) if len(a) else np.nan),
        ).reset_index()

    # Per-bin by expected return (quantile bins)
    by_exp_bin = pd.DataFrame()
    if exp_col and exp_col in df.columns:
        x = pd.to_numeric(df[exp_col], errors='coerce')
        tm = trades_mask & x.notna()
        tmp = pd.DataFrame({
            'x': x[tm],
            'ret': trade_ret[tm],
            'win': ((np.sign(y_log[tm]) == s[tm]) & (np.sign(y_log[tm]) != 0)).astype(float),
            'side': s[tm],
        })
        try:
            tmp['bin'] = pd.qcut(tmp['x'], q=min(int(num_exp_bins), max(1, tmp['x'].nunique())), duplicates='drop')
            g = tmp.groupby('bin', observed=True)
            by_exp_bin = g.agg(
                trades=('ret', 'size'),
                mean_ret=('ret', 'mean'),
                median_ret=('ret', 'median'),
                win_rate=('win', 'mean'),
                long_share=('side', lambda a: float((a == 1).mean()) if len(a) else np.nan),
                short_share=('side', lambda a: float((a == -1).mean()) if len(a) else np.nan),
                p05=('ret', lambda a: float(np.quantile(a, 0.05)) if len(a) else np.nan),
                p95=('ret', lambda a: float(np.quantile(a, 0.95)) if len(a) else np.nan),
            ).reset_index()
        except Exception:
            by_exp_bin = pd.DataFrame()

    return {
        'overall': overall_df,
        'by_side': by_side_df,
        'quantiles': quantiles_df,
        'by_prob_bin': by_prob_bin,
        'by_exp_bin': by_exp_bin,
    }

# Example: summarize distribution for test_ind if available
try:
    trade_summary_test = summarize_trade_distribution(
        test_ind,
        signal_col='signal_prob_exp_q50',
        y_col='y_true',
        prob_col='prob_up' if 'prob_up' in test_ind.columns else None,
        exp_col='exp_ret_avg' if 'exp_ret_avg' in test_ind.columns else None,
    )
    print("Trade summary (test_ind): overall")
    print(trade_summary_test['overall'].to_string(index=False))
    print("\nQuantiles (first 9 rows):")
    print(trade_summary_test['quantiles'].to_string(index=False))
except Exception as e:
    print("Trade summary (test_ind) skipped:", e)

summarize_trade_distribution(test_ind)

Trade summary (test_ind): overall
 total_rows  num_trades  num_longs  num_shorts  trades_share  long_share  short_share  win_rate  avg_trade_ret  median_trade_ret
       3313          31         31           0      0.009357         1.0          0.0  0.741935       0.048173          0.072114

Quantiles (first 9 rows):
 quantile       all      long  short
     0.01 -0.041478 -0.041478    NaN
     0.05 -0.033795 -0.033795    NaN
     0.10 -0.025776 -0.025776    NaN
     0.25 -0.000186 -0.000186    NaN
     0.50  0.072114  0.072114    NaN
     0.75  0.083455  0.083455    NaN
     0.90  0.089075  0.089075    NaN
     0.95  0.094328  0.094328    NaN
     0.99  0.096032  0.096032    NaN


{'overall':    total_rows  num_trades  num_longs  num_shorts  trades_share  long_share  \
 0        3313          31         31           0      0.009357         1.0   
 
    short_share  win_rate  avg_trade_ret  median_trade_ret  
 0          0.0  0.741935       0.048173          0.072114  ,
 'by_side':     side  count  win_rate      mean    median       std       min       p05  \
 0   long     31  0.741935  0.048173  0.072114  0.045769 -0.043192 -0.033795   
 1  short      0       NaN       NaN       NaN       NaN       NaN       NaN   
 
         p10       p25       p50       p75       p90       p95       max  \
 0 -0.025776 -0.000186  0.072114  0.083455  0.089075  0.094328  0.096368   
 1       NaN       NaN       NaN       NaN       NaN       NaN       NaN   
 
        es_5  top5_mean  
 0 -0.043192   0.096368  
 1       NaN        NaN  ,
 'quantiles':    quantile       all      long  short
 0      0.01 -0.041478 -0.041478    NaN
 1      0.05 -0.033795 -0.033795    NaN
 2      0.1