Autocovariances and an automatic Newey–West lag rule

In [4]:
from __future__ import annotations
import numpy as np
from typing import Tuple, Dict, Any

def _autocovariances(x: Array, max_lag: int) -> Array:
    """Sample autocovariances gamma_k = cov(x_t, x_{t-k}) for k=0..max_lag (denominator n)."""
    x = np.asarray(x, dtype=float)
    x = x - x.mean()
    n = x.size
    gammas = np.empty(max_lag + 1, dtype=float)
    for k in range(max_lag + 1):
        gammas[k] = np.dot(x[k:], x[:n - k]) / n
    return gammas

def _nw_default_lag(n: int) -> int:
    """Andrews-style automatic bandwidth: floor(4 * (n/100)^(2/9)), at least 1."""
    L = int(np.floor(4.0 * (n / 100.0) ** (2.0 / 9.0)))
    return max(L, 1)

def hac_se_mean(x: Array, L: int | None = None) -> Tuple[float, float, float]:
    """
    Newey–West (HAC) standard error for the sample mean.
    Returns (se_mean, var_mean, nw_lag). Weights are Bartlett: w_k = 1 - k/(L+1).
    """
    x = np.asarray(x, dtype=float)
    n = x.size
    if L is None:
        L = _nw_default_lag(n)
    gam = _autocovariances(x, L)                  # gamma_0..gamma_L
    weights = 1.0 - (np.arange(L + 1) / (L + 1))  # Bartlett
    var_mean = (gam[0] + 2.0 * np.sum(weights[1:] * gam[1:])) / n
    se_mean = np.sqrt(max(var_mean, 0.0))
    return se_mean, var_mean, L

def lo_corrected_sharpe(x: Array, q: int) -> float:
    """
    Lo (2002) autocorrelation-aware annualization of Sharpe.
    x: per-bar net returns (after frictions)
    q: bars per year at the sampling frequency of x
    """
    x = np.asarray(x, dtype=float)
    n = x.size
    mu = x.mean()
    sigma = x.std(ddof=1)
    if sigma == 0.0:
        return np.nan
    sr_1 = mu / sigma

    K = min(q - 1, n - 1) if q > 1 else 0
    if K <= 0:
        return sr_1 * np.sqrt(max(q, 1))

    gam = _autocovariances(x, K)
    if gam[0] <= 0:
        return sr_1 * np.sqrt(q)
    rho = gam[1:] / gam[0]

    denom = 1.0 + 2.0 * np.sum(((q - np.arange(1, K + 1)) / q) * rho)
    denom = max(denom, 1e-12)
    return sr_1 * np.sqrt(q / denom)

Stationary block bootstrap indices

In [5]:
def _stationary_bootstrap_indices(n: int, b: int, rng: np.random.Generator) -> Array:
    """
    Stationary bootstrap (Politis & Romano) indices with expected block length b.
    Geometric block lengths with parameter p=1/b; blocks start uniformly at random.
    """
    p = 1.0 / float(b)
    idx = np.empty(n, dtype=int)
    t = 0
    while t < n:
        start = rng.integers(0, n)
        L = 1
        while (t + L < n) and (rng.random() > p):
            L += 1
        jmax = min(L, n - t)
        idx[t:t + jmax] = (start + np.arange(jmax)) % n
        t += jmax
    return idx

def lo_sharpe_bootstrap_ci(
    x: Array, q: int, B: int = 2000, b: int | None = None,
    alpha: float = 0.05, random_state: int = 123
) -> Tuple[float, float]:
    """
    Stationary block bootstrap CI for Lo-corrected annualized Sharpe.
    Returns (lower, upper) percentile CI.
    """
    x = np.asarray(x, dtype=float)
    n = x.size
    if b is None:
        b = max(5, int(round(n ** (1.0 / 3.0))))
    rng = np.random.default_rng(random_state)
    stats = np.empty(B, dtype=float)
    for _ in range(B):
        idx = _stationary_bootstrap_indices(n, b, rng)
        xb = x[idx]
        stats[_] = lo_corrected_sharpe(xb, q)
    lo, hi = np.quantile(stats, [alpha / 2.0, 1.0 - alpha / 2.0])
    return float(lo), float(hi)

Robustness to outliers

In [6]:
import numpy as np

def mse_mae_comparison(x: np.ndarray, reference: float = 0.0) -> dict:
    """
    Compare Mean Squared Error (MSE) and Mean Absolute Error (MAE)
    of a return series x relative to a reference value (default 0.0).
    """
    x = np.asarray(x, dtype=float)
    errors = x - reference
    mse = np.mean(errors ** 2)
    mae = np.mean(np.abs(errors))
    return {"MSE": mse, "MAE": mae}

Asymmetric penalties

In [7]:
import numpy as np

def quantile_loss(y_true: np.ndarray, y_pred: np.ndarray, tau: float) -> float:
    """
    Quantile (pinball) loss function.
    """
    y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
    errors = y_true - y_pred
    loss = np.where(errors >= 0, tau * errors, (1 - tau) * -errors)
    return float(np.mean(loss))


Dynamic evaluation and model stability

In [8]:
import numpy as np
from typing import Literal, Optional, Tuple

LossName = Literal["mse", "mae", "quantile"]

def _loss_vector(y_true: np.ndarray,
                 y_pred: np.ndarray,
                 loss: LossName = "mse",
                 tau: Optional[float] = None) -> np.ndarray:
    """Pointwise loss ℓ_t = ℓ(y_t, ŷ_t)."""
    y_true = np.asarray(y_true, dtype=float)
    y_pred = np.asarray(y_pred, dtype=float)
    e = y_true - y_pred
    if loss == "mse":
        return e**2
    elif loss == "mae":
        return np.abs(e)
    elif loss == "quantile":
        if tau is None or not (0.0 < tau < 1.0):
            raise ValueError("For quantile loss, provide tau in (0,1).")
        return np.where(e >= 0.0, tau*e, (1.0 - tau)*(-e))
    else:
        raise ValueError("loss must be 'mse', 'mae', or 'quantile'.")

def rolling_error(y_true: np.ndarray,
                  y_pred: np.ndarray,
                  window: int,
                  loss: LossName = "mse",
                  tau: Optional[float] = None) -> np.ndarray:
    """
    Rolling error series: average loss over a moving window of size 'window'.
    Returns an array of length n with NaN for the initial warmup (window-1).
    """
    l = _loss_vector(y_true, y_pred, loss=loss, tau=tau)
    n = l.size
    out = np.full(n, np.nan)
    # rolling mean via cumulative sum (O(n))
    cs = np.insert(np.cumsum(l), 0, 0.0)
    vals = (cs[window:] - cs[:-window]) / float(window)
    out[window-1:] = vals
    return out

def ew_error(y_true: np.ndarray,
             y_pred: np.ndarray,
             lam: float = 0.97,
             loss: LossName = "mse",
             tau: Optional[float] = None) -> np.ndarray:
    """
    Exponentially weighted error: EW_t = (1-λ) * Σ_{j>=0} λ^j ℓ_{t-j}.
    λ close to 1 => slower decay (more memory).
    """
    l = _loss_vector(y_true, y_pred, loss=loss, tau=tau)
    ew = np.empty_like(l)
    acc = 0.0
    one_minus = 1.0 - lam
    for t, lt in enumerate(l):
        acc = lam*acc + one_minus*lt
        ew[t] = acc
    return ew

def cusum_alarm(series: np.ndarray,
                mu0: Optional[float] = None,
                k: float = 0.0,
                h: float = 3.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    One-sided CUSUM (increase detection) on a loss series.
    g_t = max(0, g_{t-1} + (x_t - mu0 - k)); alarm if g_t > h·σ0.
    σ0 estimated from a robust scale of the first 10% (or use naive std).
    Returns (g, threshold, alarms_mask).
    """
    x = np.asarray(series, dtype=float)
    n = x.size
    # baseline mean/scale from an initial stable segment
    m = max(5, int(np.ceil(0.1*n)))
    x0 = x[:m][np.isfinite(x[:m])]
    mu0 = float(np.nanmean(x0)) if mu0 is None else float(mu0)
    sigma0 = float(np.nanstd(x0, ddof=1)) if np.isfinite(x0).any() else 1.0
    thr = h * sigma0

    g = np.zeros(n, dtype=float)
    alarms = np.zeros(n, dtype=bool)
    for t in range(n):
        xt = x[t]
        if not np.isfinite(xt):
            g[t] = g[t-1] if t > 0 else 0.0
            continue
        inc = xt - mu0 - k
        g[t] = max(0.0, (g[t-1] if t > 0 else 0.0) + inc)
        if g[t] > thr:
            alarms[t] = True
            g[t] = 0.0  # optional: reset after alarm
    thresh_series = np.full(n, thr)
    return g, thresh_series, alarms

Python illustration for a scaled Beta daily-range model

In [9]:
import numpy as np
from typing import Tuple, Optional, Dict

def fit_beta_mom(u: np.ndarray, eps: float = 1e-8) -> Tuple[float, float]:
    """
    Method-of-Moments fit for Beta(alpha, beta) on u in (0,1).
    When v is very small, t can become non-positive due to numerical issues;
    we then fall back to a large-concentration Beta centered at m.
    """
    u = np.asarray(u, dtype=float)
    u = u[np.isfinite(u)]
    u = u[(u > 0.0) & (u < 1.0)]  # strict interior for stability
    if u.size < 5:
        raise ValueError("Need at least 5 observations strictly inside (0,1).")

    m = float(np.mean(u))
    v = float(np.var(u))  # population variance
    v = max(v, eps)
    t = m * (1.0 - m) / v - 1.0

    if t <= 0.0:
        # Near-degenerate case: collapse to a high-concentration Beta around m
        k = 1e3
        alpha = max(m * k, eps)
        beta = max((1.0 - m) * k, eps)
    else:
        alpha = max(m * t, eps)
        beta  = max((1.0 - m) * t, eps)
    return alpha, beta


def fit_scaled_beta_from_hlc(
    high: np.ndarray,
    low: np.ndarray,
    x_in_range: np.ndarray,
    eps: float = 1e-12
) -> Dict[str, float]:
    """
    Fit a scaled Beta to normalized daily positions u = (x - low) / (high - low).
    Days with non-finite values or zero/near-zero ranges are discarded.
    Observations exactly at the bounds (u=0 or u=1) are included in diagnostics
    but are excluded from the MoM fit to avoid instability.
    """
    high = np.asarray(high, dtype=float)
    low  = np.asarray(low,  dtype=float)
    x    = np.asarray(x_in_range, dtype=float)

    if not (len(high) == len(low) == len(x)):
        raise ValueError("high, low, and x_in_range must be the same length.")

    width = high - low
    valid = (
        np.isfinite(high) & np.isfinite(low) & np.isfinite(x) &
        (width > eps) & (x >= low - 1e-12) & (x <= high + 1e-12)
    )
    if not np.any(valid):
        raise ValueError("No valid observations after filtering.")

    u_all = (x[valid] - low[valid]) / width[valid]
    # Diagnostics over [0,1]
    mean_u = float(np.mean(u_all))
    var_u  = float(np.var(u_all))

    # Strict interior for MoM stability
    u_fit = u_all[(u_all > 0.0) & (u_all < 1.0)]
    if u_fit.size < 5:
        raise ValueError("Not enough interior points in (0,1) to fit Beta.")

    alpha, beta = fit_beta_mom(u_fit)

    return {
        "alpha": float(alpha),
        "beta":  float(beta),
        "mean_u": mean_u,
        "var_u":  var_u,
        "n_used": int(u_all.size),
    }

Python illustration for L-moments

In [None]:
import numpy as np
from math import comb

def l_moments_from_scratch(returns: np.ndarray) -> dict:
    """
    This implementation is based on the relationships between L-moments and
    Probability Weighted Moments (PWMs).
    """
    # 1. --- Data Preparation ---
    returns = np.asarray(returns, dtype=float)
    returns = returns[~np.isnan(returns)]
    n = len(returns)

    if n < 4:
        return {
            "L-location (mean)": np.nan,
            "L-scale (dispersion)": np.nan,
            "L-skewness": np.nan,
            "L-kurtosis": np.nan,
        }

    # Sort the returns to get order statistics
    x_sorted = np.sort(returns)

    # 2. --- Calculate Probability Weighted Moments (PWMs) ---
    # Unbiased sample PWM estimators b_r = 1/n * sum_{i=r+1 to n} [C(i-1,r)/C(n-1,r) * x_i]
    b = np.zeros(4)
    b[0] = np.mean(x_sorted) # b_0 is the sample mean

    # More efficient vectorized calculation for b_1, b_2, b_3
    # Denominators
    d1 = n * (n - 1)
    d2 = d1 * (n - 2)
    d3 = d2 * (n - 3)

    # Numerator sums
    i = np.arange(1, n + 1)
    s1 = np.sum((i[1:] - 1) * x_sorted[1:])
    s2 = np.sum((i[2:] - 1) * (i[2:] - 2) * x_sorted[2:])
    s3 = np.sum((i[3:] - 1) * (i[3:] - 2) * (i[3:] - 3) * x_sorted[3:])

    if d1 > 0: b[1] = s1 / d1
    if d2 > 0: b[2] = s2 / d2
    if d3 > 0: b[3] = s3 / d3

    # 3. --- Convert PWMs to L-moments ---
    l1 = b[0]
    l2 = 2 * b[1] - b[0]
    l3 = 6 * b[2] - 6 * b[1] + b[0]
    l4 = 20 * b[3] - 30 * b[2] + 12 * b[1] - b[0]

    # 4. --- Calculate L-moment Ratios (dimensionless) ---
    # Handle division by zero if L-scale is zero
    if l2 == 0:
        l_skew = np.nan
        l_kurt = np.nan
    else:
        l_skew = l3 / l2
        l_kurt = l4 / l2

    return {
        "L-location (mean)": l1,
        "L-scale (dispersion)": l2,
        "L-skewness": l_skew,
        "L-kurtosis": l_kurt}

Python illustration for GMM

In [None]:
import numpy as np
from scipy.optimize import minimize

def gmm_objective(theta, X, y, weight_matrix):
    """
    Compute the GMM objective function.
    Moment condition: E[(y - X*theta) * X] = 0
    """
    residuals = y - X @ theta
    # Sample moment conditions: g_n(theta)
    moments = X.T @ residuals / len(y)
    # Quadratic form with weighting matrix
    return moments.T @ weight_matrix @ moments

# --- Example usage ---
np.random.seed(42)
n, k = 200, 2
X = np.column_stack((np.ones(n), np.random.randn(n)))
true_theta = np.array([1.0, 2.0])
y = X @ true_theta + 0.5 * np.random.randn(n)

# Identity matrix as initial weight
W = np.eye(k)

# Initial guess for theta
theta0 = np.zeros(k)

res = minimize(gmm_objective, theta0, args=(X, y, W), method="BFGS")
theta_hat = res.x
print("GMM estimates:", theta_hat)

Python illustration for confidence interval

In [None]:
import numpy as np
from scipy.stats import binom

def median_ci_sign_test(data, alpha=0.05, grid_points=200):
    """
    Confidence interval for the median using the Sign Test.
    """
    data = np.asarray(data)
    N = len(data)

    # Candidate grid between min and max
    candidates = np.linspace(np.min(data), np.max(data), grid_points)

    plausible = []
    for mu0 in candidates:
        greater = np.sum(data > mu0)
        # Two-sided p-value under Binomial(N, 0.5)
        p_val = 2 * min(
            binom.cdf(greater, N, 0.5),
            1 - binom.cdf(greater - 1, N, 0.5)
        )
        if p_val > alpha:
            plausible.append(mu0)

    if not plausible:
        return None

    return min(plausible), max(plausible)

Python illustration of intervals for risk-adjusted performance metrics

In [None]:
import numpy as np
import pandas as pd

def calculate_sortino(returns, target=0, periods=252):
    """Calculates the annualized Sortino ratio."""
    mean_return = returns.mean() * periods
    downside_returns = returns[returns < target]
    downside_std = downside_returns.std() * np.sqrt(periods)
    if downside_std == 0:
        return np.inf
    return (mean_return - target) / downside_std

def calculate_calmar(returns, periods=252):
    """Calculates the annualized Calmar ratio."""
    cagr = (1 + returns.mean()) ** periods - 1
    equity_curve = (1 + returns).cumprod()
    running_max = equity_curve.cummax()
    drawdown = (equity_curve - running_max) / running_max
    mdd = abs(drawdown.min())
    if mdd == 0:
        return np.inf
    return cagr / mdd

def block_bootstrap(data, block_size):
    """Generates one bootstrap sample using the moving block bootstrap."""
    n = len(data)
    num_blocks = n // block_size
    block_starts = np.random.randint(0, n - block_size + 1, size=num_blocks)
    indices = [np.arange(start, start + block_size) for start in block_starts]
    bootstrap_indices = np.concatenate(indices)
    return data.iloc[bootstrap_indices[:n]]

def bootstrap_distribution(returns, metric_func, block_size=22, n_bootstrap=5000):
    """Generates a bootstrap distribution for a given performance metric."""
    bootstrap_metrics = []
    for _ in range(n_bootstrap):
        bootstrap_sample = block_bootstrap(returns, block_size)
        metric = metric_func(bootstrap_sample)
        bootstrap_metrics.append(metric)
    return bootstrap_metrics

Python illustration of potential drawdowns

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Tuple, Dict
from math import ceil

def stationary_bootstrap(returns: np.ndarray, L: int, n_paths: int, rng: np.random.Generator) -> np.ndarray:
    """
    Politis & Romano stationary bootstrap.
    L: mean block length.
    """
    n = len(returns)
    p = 1.0 / max(1, L)
    out = np.empty((n_paths, n), dtype=float)
    for k in range(n_paths):
        path = np.empty(n, dtype=float)
        i = rng.integers(0, n)
        for t in range(n):
            if t == 0:
                path[t] = returns[i]
            else:
                if rng.random() < p:
                    i = rng.integers(0, n)  # start a new block
                else:
                    i = (i + 1) % n        # continue current block (wrap-around)
                path[t] = returns[i]
        out[k] = path
    return out

def moving_block_bootstrap(returns: np.ndarray, L: int, n_paths: int, rng: np.random.Generator) -> np.ndarray:
    """
    Overlapping moving-block bootstrap (MBB).
    L: block length.
    """
    n = len(returns)
    # Create overlapping blocks
    blocks = np.array([returns[i:i+L] for i in range(n - L + 1)], dtype=float)
    n_blocks_needed = ceil(n / L)
    out = np.empty((n_paths, n), dtype=float)
    for k in range(n_paths):
        idx = rng.integers(0, len(blocks), size=n_blocks_needed)
        sampled = blocks[idx].ravel()[:n]
        out[k] = sampled
    return out

def equity_from_returns(ret: np.ndarray, initial: float = 1.0) -> np.ndarray:
    """Equity curve from returns."""
    return initial * np.cumprod(1.0 + ret)

def max_drawdown_and_duration(equity: np.ndarray) -> Tuple[float, int, bool]:
    """
    Returns:
      mdd_pct (in percent),
      mdd_duration (peak to recovery, in periods),
      censored (True if recovery not achieved within sample).
    """
    n = len(equity)
    run_max = np.maximum.accumulate(equity)
    drawdown = (run_max - equity) / np.where(run_max == 0, 1, run_max)  # fraction
    i_trough = int(np.argmax(drawdown))
    mdd = float(drawdown[i_trough])
    # peak index prior to trough
    i_peak = int(np.argmax(equity[:i_trough+1]))
    # find recovery (first t >= i_trough with equity[t] >= equity[i_peak])
    recovery_idx = None
    for t in range(i_trough, n):
        if equity[t] >= equity[i_peak] - 1e-12:
            recovery_idx = t
            break
    if recovery_idx is None:
        duration = (n - 1) - i_peak
        censored = True
    else:
        duration = recovery_idx - i_peak
        censored = False
    return mdd * 100.0, int(duration), censored

def simulate_drawdown_distribution(
    returns: np.ndarray,
    n_paths: int = 5000,
    L: int = None,
    method: str = "stationary",
    seed: int = 42
) -> Dict[str, np.ndarray]:
    """
    Bootstrap synthetic paths and compute MDD/duration per path.
    """
    rng = np.random.default_rng(seed)
    n = len(returns)
    if L is None:
        # rule of thumb for time-series bootstrap
        L = max(2, int(round((3 * n) ** (1 / 3))))
    if method == "stationary":
        paths = stationary_bootstrap(returns, L, n_paths, rng)
    elif method == "mbb":
        paths = moving_block_bootstrap(returns, L, n_paths, rng)
    else:
        raise ValueError("method must be 'stationary' or 'mbb'")

    mdds = np.empty(n_paths, dtype=float)
    durs = np.empty(n_paths, dtype=int)
    cens = np.zeros(n_paths, dtype=bool)
    for i in range(n_paths):
        eq = equity_from_returns(paths[i])
        mdd_pct, dur, cflag = max_drawdown_and_duration(eq)
        mdds[i] = mdd_pct
        durs[i] = dur
        cens[i] = cflag
    return {"mdd_pct": mdds, "duration": durs, "censored": cens, "L": L, "method": method, "n_paths": n_paths}

def tail_expected_shortfall(x: np.ndarray, alpha: float = 0.95) -> float:
    """Mean of the worst (1-alpha) tail."""
    q = np.quantile(x, alpha)
    tail = x[x >= q]
    return float(tail.mean()) if len(tail) else float('nan')