In [34]:
from pyspark.sql.window import Window
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, lead, log

In [35]:
# Initializes Spark session with MongoDB connector
jar_files_path = "file:///C:/Users/llucp/spark_jars/"

jar_files = [
    "mongo-spark-connector_2.12-10.1.1.jar",
    "mongodb-driver-core-4.10.1.jar",
    "mongodb-driver-sync-4.10.1.jar",
    "bson-4.10.1.jar"
]

MONGO_URI = "mongodb://localhost:27017/"
DB_NAME = "tfg"
FORMATTING_TRADE = "formatting_trade"
FORMATTING_LOB = "formatting_lob"
EXPLOITATION = "exploitation"

spark = (
    SparkSession.builder
    .appName("FormattedZone")
    .config("spark.jars", ",".join([jar_files_path + jar for jar in jar_files]))
    .config("spark.mongodb.read.connection.uri", MONGO_URI)
    .config("spark.mongodb.write.connection.uri", MONGO_URI)
    .config("spark.mongodb.read.database", DB_NAME)
    .config("spark.mongodb.write.database", DB_NAME)
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

In [36]:
def load_trade() -> DataFrame:
    """
    Loads formatting zone trade data and returns it as a dataframe.
    """
    trades = (
        spark.read.format("mongodb")
        .option("database", DB_NAME)
        .option("collection", FORMATTING_TRADE)
        .load()
    )

    return trades

trades = load_trade()

In [37]:
def calculate_forward_log_returns(df: DataFrame, lag: int, N: int) -> DataFrame:
    """
    Calculates forward log-returns of last traded price over N periods, accounting for a decision lag.
    """
    w = Window.orderBy("timestamp")
    base = lead(col("last_traded_price"), lag).over(w)
    future = lead(col("last_traded_price"), lag + N).over(w)

    return df.withColumn(f"fwd_log_return_{N}", log(future) - log(base))

# Given a list of forward log-return horizons, calculates and adds them to the dataframe
fwd_log_return_horizons = [2, 3, 4, 5, 10, 20, 40, 60, 120, 240]
lag = 1  # Decision lag of 1 period

for N in fwd_log_return_horizons:
    trades = calculate_forward_log_returns(trades, lag, N)

In [38]:
import re
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional, Union

In [39]:
# Selects only the forward log-return columns and converts to Pandas dataframe for analysis
cols = [f"fwd_log_return_{N}" for N in fwd_log_return_horizons]
fwd_log_returns = trades.select(*cols).dropna().toPandas()

In [40]:
def extract_fwd_log_return_horizon(colname: str) -> int:
    """
    Given the column name of a forward log-return, extracts and returns the horizon (N).
    """
    m = re.search(r"(\d+)$", colname)
    
    return int(m.group(1))

In [41]:
def apply_stride_fwd_log_returns(df: pd.DataFrame, cols: list[str]) -> dict[str, pd.Series]:
    """
    Returns dict of stride-sampled series per forward log-return.
    """
    out = {}

    for c in cols:
        N = extract_fwd_log_return_horizon(c)
        out[c] = df[c].iloc[::N].dropna().reset_index(drop=True)

    return out

fwd_log_returns_dict = apply_stride_fwd_log_returns(fwd_log_returns, cols)

In [42]:
from scipy import stats
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller, kpss
from sklearn.feature_selection import mutual_info_regression
from statsmodels.stats.diagnostic import acorr_ljungbox, het_arch

In [43]:
# Optional tests / models
RUNS_AVAILABLE = False

try:
    from statsmodels.sandbox.stats.runs import runstest_1samp as runs_test
    RUNS_AVAILABLE = True

except Exception:
    runs_test = None

BDS_AVAILABLE = False
bds_impl = None

try:
    # Prefer arch.unitroot implementation
    from arch.unitroot import BDS as BDS_cls
    BDS_AVAILABLE = True
    bds_impl = "arch"

except Exception:
    try:
        from statsmodels.sandbox.stats.runs import bds as bds_sm
        BDS_AVAILABLE = True
        bds_impl = "sm"

    except Exception:
        pass

MS_AVAILABLE = False

try:
    from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression
    MS_AVAILABLE = True
    
except Exception:
    pass

In [44]:
# Utility functions
def _safe_float(x):
    try:
        return float(x)
    
    except Exception:
        return None

def _series_clean(s: pd.Series) -> pd.Series:
    return pd.Series(s).dropna().astype(float)

In [45]:
def summarize_series(s: pd.Series) -> Dict[str, Any]:
    s = _series_clean(s)
    out = {
        "n": int(s.size),
        "mean": _safe_float(s.mean()),
        "std": _safe_float(s.std(ddof=1)),
        "skew": _safe_float(s.skew()),
        "kurt": _safe_float(s.kurt()),
        "min": _safe_float(s.min()),
        "max": _safe_float(s.max()),
        "median": _safe_float(s.median()),
    }
    
    return out

In [46]:
def normality_tests(s: pd.Series, shapiro_max_n: int = 5000) -> Dict[str, Any]:
    s = _series_clean(s)
    out = {}

    # Shapiro–Wilk (only up to ~5k recommended)
    try:
        if s.size <= shapiro_max_n:
            sh_stat, sh_p = stats.shapiro(s.values)
            out["shapiro_stat"] = _safe_float(sh_stat)
            out["shapiro_p"] = _safe_float(sh_p)

        else:
            out["shapiro_skipped"] = f"n={s.size} > {shapiro_max_n}"

    except Exception as e:
        out["shapiro_error"] = str(e)

    # Jarque–Bera
    try:
        jb_stat, jb_p = stats.jarque_bera(s.values)
        out["jb_stat"] = _safe_float(jb_stat)
        out["jb_p"] = _safe_float(jb_p)

    except Exception as e:
        out["jb_error"] = str(e)

    # Anderson–Darling for normal (gives critical values)
    try:
        ad_res = stats.anderson(s.values, dist="norm")
        out["anderson_stat"] = _safe_float(ad_res.statistic)
        out["anderson_crit_vals"] = [ _safe_float(x) for x in ad_res.critical_values.tolist() ]
        out["anderson_signif"] = [ _safe_float(x) for x in ad_res.significance_level.tolist() ]
        
    except Exception as e:
        out["anderson_error"] = str(e)

    return out

In [47]:
def autocorr_tests(s: pd.Series, lb_lags: List[int] = [10, 20, 40]) -> Dict[str, Any]:
    s = _series_clean(s)
    out = {"ljung_box_returns": {}, "ljung_box_squared": {}}

    for L in lb_lags:
        try:
            lb_r = acorr_ljungbox(s, lags=[L], return_df=True)
            p_r = lb_r["lb_pvalue"].iloc[-1]
            out["ljung_box_returns"][f"lags_{L}"] = _safe_float(p_r)

        except Exception as e:
            out["ljung_box_returns"][f"lags_{L}"] = {"error": str(e)}

        try:
            lb_r2 = acorr_ljungbox(s**2, lags=[L], return_df=True)
            p_r2 = lb_r2["lb_pvalue"].iloc[-1]
            out["ljung_box_squared"][f"lags_{L}"] = _safe_float(p_r2)

        except Exception as e:
            out["ljung_box_squared"][f"lags_{L}"] = {"error": str(e)}
            
    return out

In [48]:
def arch_lm_tests(s: pd.Series, lags: List[int] = [5, 10, 20]) -> Dict[str, Any]:
    s = _series_clean(s)
    out = {}

    for L in lags:
        try:
            lm_stat, lm_p, f_stat, f_p = het_arch(s, nlags=L)
            out[f"lags_{L}"] = {"LM_p": _safe_float(lm_p), "F_p": _safe_float(f_p)}

        except Exception as e:
            out[f"lags_{L}"] = {"error": str(e)}
            
    return out

In [49]:
def stationarity_tests(s: pd.Series) -> Dict[str, Any]:
    s = _series_clean(s)
    out = {}

    try:
        adf_stat, adf_p, *_ = adfuller(s, autolag="AIC")
        out["adf_stat"] = _safe_float(adf_stat)
        out["adf_p"] = _safe_float(adf_p)

    except Exception as e:
        out["adf_error"] = str(e)

    try:
        kpss_stat, kpss_p, *_ = kpss(s, regression="c", nlags="auto")
        out["kpss_stat"] = _safe_float(kpss_stat)
        out["kpss_p"] = _safe_float(kpss_p)
        
    except Exception as e:
        out["kpss_error"] = str(e)

    return out

In [50]:
def bds_test(s: pd.Series, max_dim: int = 5) -> Dict[str, Any]:
    s = _series_clean(s)

    if not BDS_AVAILABLE:
        return {"available": False, "error": "BDS not available (need arch or statsmodels sandbox)."}
    
    out = {"available": True, "impl": bds_impl, "results": {}}

    if bds_impl == "arch":
        try:
            res = BDS_cls(s, max_dim=max_dim)

            # Attempt to extract per-dimension stats/p-values if exposed
            for m in range(2, max_dim + 1):
                try:
                    stat = res.stat[m]
                    pval = res.pvalue[m]

                except Exception:
                    stat, pval = None, None
                out["results"][f"m_{m}"] = {"stat": _safe_float(stat), "p": _safe_float(pval)}

        except Exception as e:
            out["error"] = str(e)

    elif bds_impl == "sm":
        try:
            for m in range(2, max_dim + 1):
                try:
                    stat, pval = bds_sm(s.values, m=m, eps=None)

                except Exception:
                    stat, pval = None, None
                out["results"][f"m_{m}"] = {"stat": _safe_float(stat), "p": _safe_float(pval)}
                
        except Exception as e:
            out["error"] = str(e)

    return out

In [51]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
from numpy.linalg import matrix_rank
from statsmodels.stats.diagnostic import linear_reset

def robust_tsay_style_test(y: pd.Series, p: int = 5):
    """
    Tsay-style nonlinearity check:
    AR(p) vs AR(p)+quadratic(lags^2). Handles rank deficiency and provides fallbacks.
    Returns dict with 'method' in {'F-nested','RESET','unavailable'}.
    """
    y = pd.Series(y).dropna().astype(float)

    # Build lag matrix
    X_lin = pd.concat([y.shift(i) for i in range(1, p+1)], axis=1)
    X_lin.columns = [f"L{i}" for i in range(1, p+1)]
    df = pd.concat([y, X_lin], axis=1).dropna()
    if df.empty or df.shape[0] < (p + 10):
        return {"method": "unavailable", "error": "not enough data after lags"}

    y0 = df.iloc[:, 0].values
    Xl = df.loc[:, X_lin.columns].values

    # Standardize lags to mitigate collinearity
    Xl = (Xl - Xl.mean(axis=0, keepdims=True)) / (Xl.std(axis=0, keepdims=True) + 1e-12)

    # Quadratic terms
    Xq = Xl**2

    # Drop zero-variance columns
    keep_lin = Xl.std(axis=0) > 1e-12
    keep_quad = Xq.std(axis=0) > 1e-12
    Xl = Xl[:, keep_lin]
    Xq = Xq[:, keep_quad]

    # If no valid quadratic terms remain, bail out
    if Xq.shape[1] == 0:
        # Fall back to RESET (general nonlinearity)
        X1 = sm.add_constant(Xl, has_constant='add')
        m1 = sm.OLS(y0, X1).fit()
        try:
            reset = linear_reset(m1, power=2, use_f=True)
            return {"method": "RESET", "F": float(reset.fvalue), "p": float(reset.pvalue)}
        except Exception as e:
            return {"method": "unavailable", "error": f"No quadratic terms; RESET failed: {e}"}

    # Remove duplicate columns (within tol) to increase effective rank
    def unique_cols(X, tol=1e-10):
        cols = []
        idx = []
        for j in range(X.shape[1]):
            x = X[:, [j]]
            if not cols:
                cols.append(x)
                idx.append(j)
            else:
                M = np.hstack(cols + [x])
                if matrix_rank(M) > matrix_rank(np.hstack(cols)) + 0:  # added rank?
                    cols.append(x)
                    idx.append(j)
        return X[:, idx]

    Xl_u = unique_cols(Xl)
    Xq_u = unique_cols(Xq)

    # Design matrices
    X1 = sm.add_constant(Xl_u, has_constant='add')
    X2 = sm.add_constant(np.hstack([Xl_u, Xq_u]), has_constant='add')

    # Check ranks
    r1 = matrix_rank(X1)
    r2 = matrix_rank(X2)
    if r2 <= r1:
        # No added rank → cannot form nested F; fall back to RESET
        m1 = sm.OLS(y0, X1).fit()
        try:
            reset = linear_reset(m1, power=2, use_f=True)
            return {"method": "RESET", "F": float(reset.fvalue), "p": float(reset.pvalue)}
        except Exception as e:
            return {"method": "unavailable", "error": f"Rank did not increase; RESET failed: {e}"}

    # Fit and compare nested F
    m1 = sm.OLS(y0, X1).fit()
    m2 = sm.OLS(y0, X2).fit()
    try:
        F, pval, df_diff = m2.compare_f_test(m1)  # robust nested F
        return {"method": "F-nested", "F": float(F), "p": float(pval), "df_diff": int(df_diff)}
    except Exception as e:
        # Final fallback: RESET on the linear model
        try:
            reset = linear_reset(m1, power=2, use_f=True)
            return {"method": "RESET", "F": float(reset.fvalue), "p": float(reset.pvalue), "note": str(e)}
        except Exception as e2:
            return {"method": "unavailable", "error": f"compare_f_test failed: {e}; RESET failed: {e2}"}


In [52]:
def lagged_mutual_information(s: pd.Series, max_lag: int = 30, n_neighbors: int = 5, random_state: int = 0) -> pd.DataFrame:
    y = _series_clean(s)
    rows = []

    for L in range(1, max_lag + 1):
        x = y.shift(L).dropna()
        z = y.loc[x.index]

        if len(x) < 50:
            rows.append((L, np.nan))
            continue

        try:
            mi = mutual_info_regression(x.values.reshape(-1, 1), z.values,
                                        n_neighbors=n_neighbors, random_state=random_state)[0]
            
        except Exception:
            mi = np.nan

        rows.append((L, mi))
        
    return pd.DataFrame(rows, columns=["lag", "mutual_information"])

In [53]:
def quantile_acf(s: pd.Series, tau: float = 0.05, max_lag: int = 20) -> pd.DataFrame:
    y = _series_clean(s)
    thr = y.quantile(tau)
    I = (y <= thr).astype(int)
    rows = []

    for L in range(1, max_lag + 1):
        v1 = I[L:].values
        v2 = I.shift(L)[L:].values

        if len(v1) < 10:
            rows.append((L, np.nan))
            continue

        try:
            c = np.corrcoef(v1, v2)[0, 1]

        except Exception:
            c = np.nan

        rows.append((L, c))
        
    return pd.DataFrame(rows, columns=["lag", "qacf"])

In [54]:
def signs_runs_test(s: pd.Series) -> Dict[str, Any]:
    if not RUNS_AVAILABLE:
        return {"available": False, "error": "runs_test not available"}
    
    y = _series_clean(s)
    signs = np.sign(y.values)
    signs = signs[signs != 0]

    if len(signs) < 50:
        return {"available": True, "error": "not enough non-zero signs"}
    
    try:
        z_stat, pval = runs_test(signs)
        return {"available": True, "z": _safe_float(z_stat), "p": _safe_float(pval)}
    
    except Exception as e:
        return {"available": True, "error": str(e)}

In [55]:
def markov_switching_fit(s: pd.Series) -> Dict[str, Any]:
    if not MS_AVAILABLE:
        return {"available": False, "error": "MarkovRegression not available"}
    
    y = _series_clean(s)

    try:
        mod = MarkovRegression(y, k_regimes=2, trend="c", switching_variance=True)
        res = mod.fit(disp=False, maxiter=200)
        out = {
            "available": True,
            "llf": _safe_float(res.llf),
            "params": [ _safe_float(v) for v in res.params.tolist() ],
            "avg_regime_probs": [ _safe_float(v) for v in res.smoothed_marginal_probabilities.mean(axis=0).tolist() ]
        }

        return out
    
    except Exception as e:
        return {"available": True, "error": str(e)}

In [56]:
def run_battery_for_series(s: pd.Series, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """
    Run a comprehensive set of tests on a single time series.
    Returns a nested dict of results.
    """
    cfg = {
        "lb_lags": [10, 20, 40],
        "arch_lags": [5, 10, 20],
        "bds_max_dim": 5,
        "tsay_p": 5,
        "mi_max_lag": 20,
        "mi_neighbors": 5,
        "qacf_tau_list": [0.01, 0.05, 0.10],
        "qacf_max_lag": 20,
        "shapiro_max_n": 5000,
        "with_markov_switching": True
    }

    if config:
        cfg.update(config)

    results = {}

    # Descriptives & normality
    results["summary"] = summarize_series(s)
    results["normality"] = normality_tests(s, shapiro_max_n=cfg["shapiro_max_n"])

    # Linear dependence & heteroskedasticity
    results["autocorr"] = autocorr_tests(s, lb_lags=cfg["lb_lags"])
    results["arch_lm"] = arch_lm_tests(s, lags=cfg["arch_lags"])

    # Stationarity
    results["stationarity"] = stationarity_tests(s)

    # Nonlinear / information tests
    results["bds"] = bds_test(s, max_dim=cfg["bds_max_dim"])
    results["tsay"] = robust_tsay_style_test(s, p=cfg["tsay_p"])
    results["runs"] = signs_runs_test(s)

    # Mutual information & quantile ACF (store as dict for JSON-ability)
    mi_df = lagged_mutual_information(s, max_lag=cfg["mi_max_lag"], n_neighbors=cfg["mi_neighbors"])
    results["mutual_information"] = mi_df.to_dict(orient="list")

    qacf_by_tau = {}

    for tau in cfg["qacf_tau_list"]:
        qacf_df = quantile_acf(s, tau=tau, max_lag=cfg["qacf_max_lag"])
        qacf_by_tau[str(tau)] = qacf_df.to_dict(orient="list")
        
    results["qacf"] = qacf_by_tau

    # Optional regime switching
    if cfg.get("with_markov_switching", True):
        results["markov_switching"] = markov_switching_fit(s)

    return results

In [57]:
def run_battery_over_stride_dict(stride_dict: Dict[Union[str, int], pd.Series], config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """
    Apply the battery to each horizon (key) in a dict of pandas Series.
    """
    out = {}

    for key, series in stride_dict.items():
        try:
            out[str(key)] = run_battery_for_series(series, config=config)

        except Exception as e:
            out[str(key)] = {"error": str(e)}
            
    return out

In [58]:
def flatten_results_to_dataframe(results: Dict[str, Any]) -> pd.DataFrame:
    """
    Flatten the nested results dict into a DataFrame for quick filtering/comparison.
    Each top-level key is a horizon; columns are hierarchical paths.
    """
    rows = []

    for horizon, res in results.items():
        flat = {}

        def _flatten(prefix, obj):
            if isinstance(obj, dict):
                for k, v in obj.items():
                    _flatten(f"{prefix}.{k}" if prefix else str(k), v)

            elif isinstance(obj, list):
                flat[prefix] = str(obj)  # keep as string for overview

            else:
                flat[prefix] = v if (v := obj) is not None else np.nan

        _flatten("", res)
        flat["horizon"] = horizon
        rows.append(flat)

    df = pd.DataFrame(rows).set_index("horizon")

    # Move a few key p-values (if present) up front for convenience
    preferred_cols = [c for c in [
        "normality.jb_p",
        "autocorr.ljung_box_returns.lags_20",
        "autocorr.ljung_box_squared.lags_20",
        "arch_lm.lags_10.LM_p",
        "stationarity.adf_p",
        "stationarity.kpss_p",
        "tsay.p",
        "runs.p",
        "bds.results.m_2.p",
        "bds.results.m_3.p"
    ] if c in df.columns]

    others = [c for c in df.columns if c not in preferred_cols]
    
    return df[preferred_cols + others]

In [59]:
results = run_battery_over_stride_dict(
    fwd_log_returns_dict, config={
        "lb_lags": [10, 20, 40],
        "arch_lags": [5, 10, 20],
        "bds_max_dim": 5,
        "tsay_p": 5,
        "mi_max_lag": 20,
        "mi_neighbors": 5,
        "qacf_tau_list": [0.01, 0.05, 0.10],
        "qacf_max_lag": 20,
        "with_markov_switching": True,   # set False if needed
    }
)

look-up table. The actual p-value is greater than the p-value returned.

look-up table. The actual p-value is greater than the p-value returned.

look-up table. The actual p-value is greater than the p-value returned.

look-up table. The actual p-value is greater than the p-value returned.

look-up table. The actual p-value is greater than the p-value returned.

look-up table. The actual p-value is greater than the p-value returned.

  c /= stddev[:, None]
  c /= stddev[None, :]
look-up table. The actual p-value is greater than the p-value returned.

  c /= stddev[:, None]
  c /= stddev[None, :]
  c /= stddev[:, None]
  c /= stddev[None, :]
look-up table. The actual p-value is greater than the p-value returned.

  c /= stddev[:, None]
  c /= stddev[None, :]
  c /= stddev[:, None]
  c /= stddev[None, :]
look-up table. The actual p-value is greater than the p-value returned.

  c /= stddev[:, None]
  c /= stddev[None, :]
  c /= stddev[:, None]
  c /= stddev[None, :]
  c /= stddev[:, None

In [60]:
df_overview = flatten_results_to_dataframe(results)
df_overview.to_csv("results_overview.csv")