# FWER control under different processes

In [None]:
import math
import numpy as np
import pandas as pd
import scipy
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score, recall_score, f1_score
from functions import generate_autocorrelated_non_gaussian_data
from functions import expected_maximum_sharpe_ratio,variance_of_the_maximum_of_k_Sharpe_ratios

import logging
logging.basicConfig(
    format  = '%(asctime)-15s %(message)s',
    datefmt = '%Y-%m-%d %H:%M:%S',
    level   = logging.INFO,
)
def LOG(*args) -> None:
    logging.info(*args)

In [None]:
MODELS = ['gaussian', 'mild', 'moderate', 'severe']
RHOs = [0, .2]
SR0 = 0
SR1_list = [.5]
T = 60
REPS_H0    = 10_000   # null-calibration repetitions
REPS_MIX   = 10_000   # mixed H0/H1 repetitions
TRIALS = 10
P_H1 = .1
ALPHA = .05  # Desired FPR

In [None]:
if False: 
    # For debugging
    MODELS = ['gaussian']
    RHOs = [0]

In [None]:
# Stage 1: Null calibration (P_H1 = 0, global null)
# ---------------------------------------------------------------------

LOG("Starting null calibration (global H0)")

null_srs = { (rho, name): [] for rho in RHOs for name in MODELS }

for rho in RHOs:
    for name in MODELS:
        LOG(f"[H0 calibration] rho={rho}, model={name}")
        for i in range(REPS_H0):
            # All strategies under H0: SR = SR0
            X = generate_autocorrelated_non_gaussian_data(
                T, TRIALS, rho=rho, SR0=SR0, name=name
            )
            SR = X.mean(axis=0) / X.std(axis=0)      # Sharpe for each trial
            null_srs[(rho, name)].extend(SR)

# Compute variance of SR under H0 and the critical max-SR threshold for each (rho, model)
calib = {}

z_alpha = scipy.stats.norm.ppf(1 - ALPHA)

for (rho, name), srs in null_srs.items():
    srs = np.asarray(srs)
    var_SR0 = np.var(srs, ddof=1)  # empirical variance under pure H0

    E_max_SR0 = expected_maximum_sharpe_ratio(
        number_of_trials=TRIALS,
        variance=var_SR0,
    )
    sigma_max = math.sqrt(
        variance_of_the_maximum_of_k_Sharpe_ratios(
            number_of_trials=TRIALS,
            variance=var_SR0,
        )
    )

    SR0_adj = SR0 + E_max_SR0
    SR_c    = SR0_adj + sigma_max * z_alpha

    calib[(rho, name)] = {
        "var_SR0":   var_SR0,
        "E_max_SR0": E_max_SR0,
        "sigma_max": sigma_max,
        "SR0_adj":   SR0_adj,
        "SR_c":      SR_c,
    }

# Optional: inspect calibration
calib_df = pd.DataFrame(
    [
        {"rho": rho, "name": name, **vals}
        for (rho, name), vals in calib.items()
    ]
)
LOG("Null calibration completed")
display(calib_df)

In [None]:
# 10 minutes

# Stage 2: Mixed H0 / H1 experiment, using fixed SR_c from calibration
# ---------------------------------------------------------------------

LOG("Starting mixed H0/H1 experiment")

rows = []

for rho in RHOs:
    for name in MODELS:
        SR_c_info = calib[(rho, name)]
        SR_c      = SR_c_info["SR_c"]
        E_max_SR0 = SR_c_info["E_max_SR0"]
        sigma_max = SR_c_info["sigma_max"]
        SR0_adj   = SR_c_info["SR0_adj"]

        for SR1 in SR1_list:
            LOG(f"[Mixed] rho={rho}, model={name}, SR1={SR1}")
            for it in range(REPS_MIX):

                # Assign H1 / H0 per trial
                H1 = np.random.uniform(size=TRIALS) < P_H1
                H1.sort()   # cosmetic; does not affect distribution

                X0 = X1 = None
                K1 = H1.sum()
                K0 = TRIALS - K1

                if K0 > 0:
                    X0 = generate_autocorrelated_non_gaussian_data(
                        T, K0, rho=rho, SR0=SR0, name=name
                    )
                if K1 > 0:
                    X1 = generate_autocorrelated_non_gaussian_data(
                        T, K1, rho=rho, SR0=SR1, name=name
                    )

                if X0 is None:
                    X = X1
                elif X1 is None:
                    X = X0
                else:
                    X = np.concatenate([X0, X1], axis=1)

                # Moment diagnostics of the combined panel
                gamma3 = scipy.stats.skew(X.flatten())
                gamma4 = scipy.stats.kurtosis(X.flatten(), fisher=False)

                # Sharpe ratios per trial
                SR = X.mean(axis=0) / X.std(axis=0)
                sr_max = np.max(SR)
                var_SR_emp = np.var(SR, ddof=1)  # empirical within-iteration variance (diagnostic)

                # Global decision at iteration level (FWER test)
                reject = sr_max > SR_c

                # Store per-trial rows, carrying the iteration-level decision
                tmp = pd.DataFrame({
                    "SR":        SR,
                    "H1":        H1,
                })
                tmp["rho"]       = rho
                tmp["name"]      = name
                tmp["SR1"]       = SR1
                tmp["iteration"] = it
                tmp["Max(SR)"]   = sr_max
                tmp["Var[SR]"]   = var_SR_emp
                tmp["gamma3"]    = gamma3
                tmp["gamma4"]    = gamma4
                tmp["SR_c"]      = SR_c
                tmp["SR0_adj"]   = SR0_adj         # same for this (rho,name)
                tmp["E[Max(SR)]"] = E_max_SR0
                tmp["sigma_max"] = sigma_max
                tmp["Reject"]    = reject          # same for all trials in this iteration

                rows.append(tmp)

d = pd.concat(rows, ignore_index=True)

In [None]:
# Aggregation: performance metrics
# ---------------------------------------------------------------------

results = []

for rho in RHOs:
    for name in MODELS:
        for SR1 in SR1_list:
            tmp = d[
                (d["rho"] == rho)
                & (d["name"] == name)
                & (d["SR1"] == SR1)
            ]

            # Per-strategy truth
            y_true_strat = tmp["H1"].values.astype(bool)

            # Per-iteration decision, replicated at strategy level
            y_pred_iter  = tmp["Reject"].values.astype(bool)

            # FPP: false-positive proportion among H0 strategies
            mask_H0 = ~y_true_strat
            if mask_H0.sum() > 0:
                FPP = np.sum(y_pred_iter & mask_H0) / mask_H0.sum()
            else:
                FPP = np.nan

            # Precision/recall/F1 with per-strategy labeling
            if (y_pred_iter.any()) and (y_true_strat.any()):
                precision = precision_score(y_true_strat, y_pred_iter)
                recall    = recall_score(y_true_strat, y_pred_iter)
                f1        = f1_score(y_true_strat, y_pred_iter)
            else:
                # handle degenerate cases gracefully
                precision = np.nan
                recall    = np.nan
                f1        = np.nan

            # Empirical FWER at iteration level: P(reject | all H0)
            # To get this, restrict to iterations with no H1 in that iteration:
            it_group = tmp.groupby("iteration")
            it_df    = it_group["H1"].agg(any_H1=lambda x: x.any())
            it_rej   = it_group["Reject"].first()

            mask_all_H0_iters = ~it_df["any_H1"].values
            if mask_all_H0_iters.sum() > 0:
                FWER_emp = np.mean(it_rej.values[mask_all_H0_iters])
            else:
                FWER_emp = np.nan

            results.append({
                "name":      name,
                "rho":       rho,
                "SR1":       SR1,
                "T":         T,
                "P_H1":      P_H1,
                "gamma3":    tmp["gamma3"].mean(),
                "gamma4":    tmp["gamma4"].mean(),
                "SR_c":      tmp["SR_c"].mean(),
                "H1_mean":   tmp["H1"].mean(),
                "precision": precision,
                "recall":    recall,
                "f1":        f1,
                "FPP":       FPP,
                "FWER_emp":  FWER_emp,
                "alpha":     ALPHA,
            })

results = pd.DataFrame(results).sort_values(["name", "rho", "SR1"]).reset_index(drop=True)
results_rounded = results.round(3)

# Save if desired
results.to_csv("exhibit_4_corrected.csv", index=False)

display(results_rounded)
