
# Nexus Data Preprocessing

This notebook documents the Nexus equity preprocessing workflow using a staged structure inspired by the *Advanced Data Preprocessing Techniques for Financial* playbook.



## Define: Preprocessing Blueprint
- **Stage 1 – Data Cleaning & Alignment:** Detect schemas, repair gaps, and harmonise timestamps before any modelling.
- **Stage 2 – Noise Reduction & Signal Extraction:** Apply smoothing, frequency-domain filters, and latent-state models to dampen noise.
- **Stage 3 – Stationarity & Transformation:** Enforce statistical consistency via differencing, scaling, and variance-stabilising transforms.
- **Stage 4 – Feature Engineering & Enrichment:** Generate technical, macro, and alternative signals aligned with the 2025 Nexus objectives.
- **Stage 5 – Outlier & Anomaly Handling:** Tame extreme events with robust scaling and anomaly flags.
- **Stage 6 – Feature Selection & Dimensionality Reduction:** Reduce redundancy and highlight predictive structure.
- **Stage 7 – Temporal & Structural Adjustments:** Respect regime shifts, rolling dynamics, and event-driven structures.
- **Stage 8 – Cross-Sectional vs. Time-Series Treatment:** Differentiate per-asset normalisation from panel-level adjustments.
- **Stage 9 – Data Augmentation & Synthetic Data:** Expand scenario coverage through resampling and generative models.
- **Stage 10 – Preprocessing Pipelines (Automation):** Assemble modular layers into a reusable ETL-to-model input flow.


In [None]:
from __future__ import annotations

import argparse
from typing import List, Optional, Sequence, Tuple

import numpy as np
import pandas as pd


## 1. Data Cleaning & Alignment
**Handling Missing Data**
- Forward/Backward fill for time-series gaps.
- Model-based imputation (Kalman filters, EM for state-space models).
- Multiple imputation (Bayesian approaches for macroeconomic series).

**Timestamp Alignment**
- Align heterogeneous data frequencies (e.g., daily stock prices with monthly macro indicators).
- As-of joins for tick-by-tick vs. daily aggregates.

**Corporate Actions Adjustments**
- Price adjustment for dividends, stock splits, mergers, ticker changes.
- Rolling adjustment factors for continuity in historical series.


In [None]:
def _first_present(cols: Sequence[str], *candidates_groups: Sequence[str]) -> Optional[str]:
    cols_l = [c.lower().strip() for c in cols]
    for candidates in candidates_groups:
        for cand in candidates:
            if cand in cols_l:
                return cols[cols_l.index(cand)]
    return None

**_first_present** – Locates the first matching alias in the schema so downstream steps can rely on canonical column names.

In [None]:
def _coerce_datetime(series: pd.Series) -> pd.Series:
    return pd.to_datetime(series, errors="coerce", utc=False)

**_coerce_datetime** – Converts timestamp-like inputs into timezone-naive pandas datetimes while dropping invalid entries.

In [None]:
def forward_backward_fill(frame: pd.DataFrame, columns: Optional[Sequence[str]] = None) -> pd.DataFrame:
    columns = list(columns) if columns is not None else list(frame.columns)
    out = frame.copy()
    out[columns] = out[columns].ffill().bfill()
    return out

**forward_backward_fill** – Applies forward/backward fills to stabilise gaps across the selected columns in a time-series frame.

In [None]:
def kalman_impute(series: pd.Series, transition_variance: float = 1e-5, observation_variance: float = 1e-2) -> pd.Series:
    try:
        from pykalman import KalmanFilter  # type: ignore
    except Exception:
        return series.interpolate(method="linear").ffill().bfill()
    obs = series.to_numpy()
    mask = ~np.isnan(obs)
    if mask.sum() == 0:
        return series.copy()
    initial_state_mean = obs[mask][0]
    kf = KalmanFilter(transition_matrices=[1.0], observation_matrices=[1.0],
                      transition_covariance=[[transition_variance]],
                      observation_covariance=[[observation_variance]],
                      initial_state_mean=initial_state_mean)
    state_means, _ = kf.em(obs, n_iter=5).smooth(obs)
    return pd.Series(state_means[:, 0], index=series.index)

**kalman_impute** – Uses a 1D Kalman smoother (with linear interpolation fallback) to estimate missing observations in noisy economic series.

In [None]:
def multiple_imputation_draws(df: pd.DataFrame, n_draws: int = 5, random_state: Optional[int] = None) -> List[pd.DataFrame]:
    if n_draws <= 0:
        return []
    rng = np.random.default_rng(random_state)
    stds = df.select_dtypes(include=[np.number]).std().replace(0.0, np.nan)
    numeric_cols = stds.index.tolist()
    draws: List[pd.DataFrame] = []
    for _ in range(n_draws):
        draw = df.copy()
        if numeric_cols:
            scale = np.nan_to_num(stds.to_numpy(), nan=0.0)
            noise = rng.normal(loc=0.0, scale=scale, size=(len(df), len(numeric_cols)))
            noise_df = pd.DataFrame(noise, index=df.index, columns=numeric_cols)
            draw.loc[:, numeric_cols] = draw.loc[:, numeric_cols] + noise_df
            draw.loc[:, numeric_cols] = draw.loc[:, numeric_cols].ffill().bfill()
        draws.append(draw.ffill().bfill())
    return draws

**multiple_imputation_draws** – Generates stochastic imputations by injecting noise around observed values to approximate Bayesian multiple imputation.

In [None]:
def align_to_frequency(df: pd.DataFrame, date_col: str, freq: str, method: str = "ffill") -> pd.DataFrame:
    out = df.set_index(date_col).sort_index()
    if method == "ffill":
        out = out.resample(freq).ffill()
    elif method == "bfill":
        out = out.resample(freq).bfill()
    else:
        out = out.resample(freq).interpolate(method=method)
    return out.reset_index()

**align_to_frequency** – Resamples a time series onto a target frequency using configurable forward/backward/interpolated fills.

In [None]:
def asof_join(left: pd.DataFrame, right: pd.DataFrame, on: str, suffix: str = "_rhs",
              tolerance: Optional[pd.Timedelta] = None, direction: str = "backward") -> pd.DataFrame:
    merged = pd.merge_asof(left.sort_values(on), right.sort_values(on), on=on,
                            suffixes=("", suffix), tolerance=tolerance, direction=direction)
    return merged

**asof_join** – Performs an as-of join so high-frequency series can align with lower-frequency macro indicators without look-ahead bias.

In [None]:
def adjust_ohlc_with_adjclose(df: pd.DataFrame,
                              col_open: Optional[str],
                              col_high: Optional[str],
                              col_low: Optional[str],
                              col_close: Optional[str],
                              col_adjclose: Optional[str]) -> pd.DataFrame:
    out = df.copy()
    if col_adjclose is None or col_close is None:
        return out
    with np.errstate(divide="ignore", invalid="ignore"):
        factor = out[col_adjclose] / out[col_close]
    factor = factor.replace([np.inf, -np.inf], np.nan)
    factor = factor.where(factor > 0, np.nan).ffill().bfill()
    if col_open is not None:
        out["adj_open"] = out[col_open] * factor
    if col_high is not None:
        out["adj_high"] = out[col_high] * factor
    if col_low is not None:
        out["adj_low"] = out[col_low] * factor
    out["adj_close"] = out[col_close] * factor
    return out

**adjust_ohlc_with_adjclose** – Constructs split/dividend-adjusted OHLC stacks using the adjusted-close factor to maintain historical continuity.


## 2. Noise Reduction & Signal Extraction
**Smoothing Filters**
- Moving average, Savitzky–Golay filters for trend preservation.

**Fourier & Wavelet Transforms**
- Multi-resolution decomposition for denoising price series.

**Empirical Mode Decomposition (EMD)**
- Decompose non-linear/non-stationary signals into Intrinsic Mode Functions.

**Kalman Filtering**
- Real-time noise reduction and latent state estimation.


In [None]:
def moving_average_filter(series: pd.Series, window: int = 5) -> pd.Series:
    return series.rolling(window=window, min_periods=1, center=False).mean()

**moving_average_filter** – Applies a simple moving average to emphasise medium-term trends in noisy signals.

In [None]:
def savitzky_golay_filter(series: pd.Series, window_length: int = 7, polyorder: int = 3) -> pd.Series:
    min_length = polyorder + 2
    if min_length % 2 == 0:
        min_length += 1
    window_length = max(window_length, min_length)
    if window_length % 2 == 0:
        window_length += 1
    try:
        from scipy.signal import savgol_filter  # type: ignore
    except Exception:
        return series.rolling(window=window_length, min_periods=1, center=True).mean()
    filtered = savgol_filter(series.to_numpy(), window_length=window_length, polyorder=polyorder, mode="interp")
    return pd.Series(filtered, index=series.index)

**savitzky_golay_filter** – Uses a polynomial smoothing filter (with rolling-mean fallback) to retain trend curvature while denoising.

In [None]:
def fourier_lowpass_filter(series: pd.Series, cutoff_ratio: float = 0.1) -> pd.Series:
    values = series.to_numpy()
    n = values.shape[0]
    if n == 0:
        return series.copy()
    fft = np.fft.rfft(values)
    cutoff = int(len(fft) * cutoff_ratio)
    filtered_fft = np.zeros_like(fft)
    filtered_fft[:max(cutoff, 1)] = fft[:max(cutoff, 1)]
    filtered = np.fft.irfft(filtered_fft, n=n)
    return pd.Series(filtered, index=series.index)

**fourier_lowpass_filter** – Keeps only the low-frequency Fourier coefficients to suppress high-frequency noise in price series.

In [None]:
def wavelet_denoise(series: pd.Series, wavelet: str = "db4", level: int = 2) -> pd.Series:
    try:
        import pywt  # type: ignore
    except Exception:
        return series.copy()
    coeffs = pywt.wavedec(series.to_numpy(), wavelet, mode="smooth")
    sigma = np.median(np.abs(coeffs[-level])) / 0.6745 if level <= len(coeffs) - 1 else 0.0
    threshold = sigma * np.sqrt(2 * np.log(len(series))) if sigma > 0 else 0.0
    denoised_coeffs = [coeffs[0]]
    for c in coeffs[1:]:
        denoised_coeffs.append(pywt.threshold(c, threshold, mode="soft"))
    reconstructed = pywt.waverec(denoised_coeffs, wavelet, mode="smooth")
    return pd.Series(reconstructed[: len(series)], index=series.index)

**wavelet_denoise** – Performs wavelet shrinkage (with identity fallback) to remove transient spikes while keeping structure across scales.

In [None]:
def empirical_mode_decompose(series: pd.Series) -> List[pd.Series]:
    try:
        from PyEMD import EMD  # type: ignore
    except Exception:
        return [series.copy()]
    emd = EMD()
    imfs = emd(series.to_numpy())
    return [pd.Series(imf, index=series.index) for imf in imfs]

**empirical_mode_decompose** – Decomposes non-linear signals into intrinsic mode functions when the PyEMD library is available.

In [None]:
def kalman_smoother(series: pd.Series, transition_variance: float = 1e-5, observation_variance: float = 1e-2) -> pd.Series:
    try:
        from pykalman import KalmanFilter  # type: ignore
    except Exception:
        return series.ewm(alpha=0.2, adjust=False).mean()
    obs = series.to_numpy()
    mask = ~np.isnan(obs)
    if mask.sum() == 0:
        return series.copy()
    initial_state_mean = obs[mask][0]
    kf = KalmanFilter(transition_matrices=[1.0], observation_matrices=[1.0],
                      transition_covariance=[[transition_variance]],
                      observation_covariance=[[observation_variance]],
                      initial_state_mean=initial_state_mean)
    state_means, _ = kf.em(obs, n_iter=5).smooth(obs)
    return pd.Series(state_means[:, 0], index=series.index)

**kalman_smoother** – Applies a Kalman smoother (with exponential-weighted fallback) for real-time noise reduction.


## 3. Stationarity & Transformation
**Detrending & Differencing**
- Log returns (instead of raw prices).
- Seasonal decomposition for macroeconomic data.

**Normalization & Scaling**
- Volatility scaling (returns divided by realized volatility).
- Z-score scaling for features with different magnitudes (e.g., rates vs. sentiment).

**Box-Cox / Yeo-Johnson Transforms**
- Variance-stabilizing transformations for skewed economic data.


In [None]:
def compute_log_returns(series: pd.Series) -> pd.Series:
    return np.log(series).diff()

**compute_log_returns** – Transforms price levels into log returns to enforce stationarity in financial time series.

In [None]:
def seasonal_difference(series: pd.Series, period: int) -> pd.Series:
    return series.diff(periods=period)

**seasonal_difference** – Applies seasonal differencing to remove periodic structure from macroeconomic indicators.

In [None]:
def seasonal_adjust(series: pd.Series, period: int = 12) -> pd.Series:
    try:
        from statsmodels.tsa.seasonal import STL  # type: ignore
    except Exception:
        trend = series.rolling(window=period, min_periods=1).mean()
        return series - trend
    stl = STL(series, period=period, robust=True)
    result = stl.fit()
    return series - result.seasonal

**seasonal_adjust** – Removes seasonal components using STL (with rolling-mean fallback) to stabilise macro features.

In [None]:
def volatility_scale(returns: pd.Series, vol: pd.Series) -> pd.Series:
    return returns / vol.replace(0.0, np.nan)

**volatility_scale** – Divides returns by realised volatility to normalise distributions across market regimes.

In [None]:
def zscore_normalize(series: pd.Series) -> pd.Series:
    mean = series.mean()
    std = series.std()
    if std == 0:
        return series - mean
    return (series - mean) / std

**zscore_normalize** – Standardises a feature to zero mean and unit variance for cross-feature comparability.

In [None]:
def boxcox_transform(series: pd.Series, lmbda: Optional[float] = None) -> pd.Series:
    shifted = series + 1 - series.min()
    try:
        from scipy import stats  # type: ignore
    except Exception:
        return np.log1p(shifted)
    transformed, _ = stats.boxcox(shifted, lmbda=lmbda)
    return pd.Series(transformed, index=series.index)

**boxcox_transform** – Applies Box-Cox (with log fallback) to stabilise variance for strictly positive series.

In [None]:
def yeojohnson_transform(series: pd.Series, lmbda: Optional[float] = None) -> pd.Series:
    try:
        from scipy import stats  # type: ignore
    except Exception:
        return zscore_normalize(series)
    transformed, _ = stats.yeojohnson(series, lmbda=lmbda)
    return pd.Series(transformed, index=series.index)

**yeojohnson_transform** – Uses Yeo-Johnson (with z-score fallback) to stabilise skewed series that include negatives.


## 4. Feature Engineering & Enrichment
**Market Features**
- Volatility clustering (e.g., GARCH residuals as features).
- Higher-order moments (skewness, kurtosis of returns).
- Technical indicators (RSI, MACD, Bollinger Bands).

**Macroeconomic Features**
- Lagged effects (e.g., interest rates, CPI lags).
- Cyclical indicators (yield curve slope, credit spreads).

**Alternative Data Integration**
- Social sentiment (NLP preprocessing: embeddings, topic modeling).
- Satellite, ESG, shipping flows, Google Trends (scaled to market calendars).

**Market Regimes**
- Hidden Markov Models (HMM) or Bayesian Change Point Detection to label bull/bear/stagnant phases.


In [None]:
def compute_rsi(prices: pd.Series, window: int = 14) -> pd.Series:
    delta = prices.diff()
    gain = delta.clip(lower=0).ewm(alpha=1 / window, adjust=False, min_periods=window).mean()
    loss = -delta.clip(upper=0).ewm(alpha=1 / window, adjust=False, min_periods=window).mean()
    rs = gain / loss.replace(0, np.nan)
    rsi = 100 - (100 / (1 + rs))
    return rsi

**compute_rsi** – Computes the Relative Strength Index to capture short-horizon momentum in adjusted closing prices.

In [None]:
def compute_macd(prices: pd.Series, span_fast: int = 12, span_slow: int = 26, span_signal: int = 9) -> pd.DataFrame:
    ema_fast = prices.ewm(span=span_fast, adjust=False).mean()
    ema_slow = prices.ewm(span=span_slow, adjust=False).mean()
    macd_line = ema_fast - ema_slow
    signal = macd_line.ewm(span=span_signal, adjust=False).mean()
    hist = macd_line - signal
    return pd.DataFrame({"macd": macd_line, "macd_signal": signal, "macd_hist": hist})

**compute_macd** – Generates MACD, signal, and histogram series to summarise trend shifts via dual exponential averages.

In [None]:
def compute_bbands(prices: pd.Series, window: int = 20, n_std: float = 2.0) -> pd.DataFrame:
    ma = prices.rolling(window=window, min_periods=window).mean()
    sd = prices.rolling(window=window, min_periods=window).std()
    upper = ma + n_std * sd
    lower = ma - n_std * sd
    return pd.DataFrame({"bb_mid": ma, "bb_upper": upper, "bb_lower": lower})

**compute_bbands** – Builds Bollinger Bands around a rolling mean to contextualise prices against recent volatility.

In [None]:
def compute_garch_residuals(returns: pd.Series, p: int = 1, q: int = 1) -> Optional[pd.Series]:
    try:
        from arch import arch_model  # type: ignore
    except Exception:
        return None
    clean = returns.dropna()
    if clean.empty:
        return None
    model = arch_model(clean, vol="Garch", p=p, q=q, dist="normal")
    fit = model.fit(disp="off")
    resid = fit.resid / fit.conditional_volatility
    return resid.reindex(returns.index)

**compute_garch_residuals** – Fits a GARCH model (when `arch` is available) to extract volatility-clustered residual signals.

In [None]:
def rolling_higher_moments(returns: pd.Series, window: int = 60) -> pd.DataFrame:
    skew = returns.rolling(window=window, min_periods=window).skew()
    kurt = returns.rolling(window=window, min_periods=window).kurt()
    return pd.DataFrame({"rolling_skew": skew, "rolling_kurt": kurt})

**rolling_higher_moments** – Computes rolling skewness and kurtosis to quantify distributional shape changes.

In [None]:
def build_macro_lags(df: pd.DataFrame, date_col: str, columns: Sequence[str], lags: Sequence[int]) -> pd.DataFrame:
    out = df.sort_values(date_col).set_index(date_col)
    for col in columns:
        for lag in lags:
            out[f"{col}_lag{lag}"] = out[col].shift(lag)
    return out.reset_index()

**build_macro_lags** – Generates lagged macroeconomic predictors to capture delayed market reactions.

In [None]:
def integrate_alternative_signals(price_df: pd.DataFrame, alt_df: pd.DataFrame, date_col: str,
                                  tolerance: Optional[str] = "1D") -> pd.DataFrame:
    tol_td = pd.to_timedelta(tolerance) if tolerance is not None else None
    merged = asof_join(price_df, alt_df, on=date_col, suffix="_alt", tolerance=tol_td)
    return merged

**integrate_alternative_signals** – Adds alternative datasets (sentiment, ESG, etc.) via tolerant as-of joins aligned to the market calendar.

In [None]:
def detect_market_regimes(series: pd.Series, fast_window: int = 20, slow_window: int = 100) -> pd.Series:
    fast_ma = series.rolling(window=fast_window, min_periods=1).mean()
    slow_ma = series.rolling(window=slow_window, min_periods=1).mean()
    regime = pd.Series("stagnant", index=series.index)
    regime = regime.mask(fast_ma > slow_ma, "bull")
    regime = regime.mask(fast_ma < slow_ma, "bear")
    return regime

**detect_market_regimes** – Labels simple bull/bear/stagnant regimes using dual moving-average differentiation (HMM-ready placeholder).


## 5. Outlier & Anomaly Handling
**Winsorization / Clipping**
- Reduce impact of extreme tail events.

**Robust Scaling**
- Median & interquartile-based scaling (instead of mean/variance).

**Anomaly Detection**
- Isolation forests, robust PCA, or autoencoders for unusual price/volume/macro shocks.


In [None]:
def winsorize(series: pd.Series, lower_q: float = 0.01, upper_q: float = 0.99) -> pd.Series:
    lo, hi = series.quantile([lower_q, upper_q])
    return series.clip(lower=lo, upper=hi)

**winsorize** – Clips series tails at configurable quantiles to reduce outsized influence from market shocks.

In [None]:
def robust_scale(series: pd.Series) -> pd.Series:
    median = series.median()
    iqr = series.quantile(0.75) - series.quantile(0.25)
    if iqr == 0:
        return series - median
    return (series - median) / iqr

**robust_scale** – Normalises a feature using median and IQR for resilience to outliers.

In [None]:
def detect_anomalies_zscore(series: pd.Series, z_thresh: float = 3.0) -> pd.Series:
    z = zscore_normalize(series)
    return (np.abs(z) > z_thresh).astype(int)

**detect_anomalies_zscore** – Flags anomalies via absolute z-score thresholds as a lightweight alternative to heavier models.


## 6. Feature Selection & Dimensionality Reduction
**Filter & Wrapper Methods**
- Mutual information, stability selection for economic variables.

**PCA / ICA / RPCA**
- Remove redundancy from correlated indicators.

**Manifold Learning**
- t-SNE, UMAP for clustering latent regimes.

**Sparse Methods**
- Lasso/ElasticNet to select predictive macro factors.


In [None]:
def mutual_information_ranking(features: pd.DataFrame, target: pd.Series) -> pd.Series:
    try:
        from sklearn.feature_selection import mutual_info_regression  # type: ignore
    except Exception:
        corr = features.corrwith(target)
        return corr.abs().sort_values(ascending=False)
    mi = mutual_info_regression(features.fillna(0.0), target.fillna(0.0))
    return pd.Series(mi, index=features.columns).sort_values(ascending=False)

**mutual_information_ranking** – Scores features by mutual information (or correlation fallback) to prioritise informative signals.

In [None]:
def principal_component_features(features: pd.DataFrame, n_components: int = 3) -> pd.DataFrame:
    centered = features - features.mean()
    cov = np.cov(centered.fillna(0.0).to_numpy().T)
    eigvals, eigvecs = np.linalg.eigh(cov)
    order = np.argsort(eigvals)[::-1][:n_components]
    pcs = centered.to_numpy() @ eigvecs[:, order]
    columns = [f"pc{i+1}" for i in range(len(order))]
    return pd.DataFrame(pcs, index=features.index, columns=columns)

**principal_component_features** – Produces principal components via eigen-decomposition to summarise correlated inputs.

In [None]:
def independent_component_features(features: pd.DataFrame, n_components: int = 3) -> Optional[pd.DataFrame]:
    try:
        from sklearn.decomposition import FastICA  # type: ignore
    except Exception:
        return None
    ica = FastICA(n_components=n_components, random_state=42, whiten="unit-variance")
    comps = ica.fit_transform(features.fillna(0.0))
    columns = [f"ica{i+1}" for i in range(comps.shape[1])]
    return pd.DataFrame(comps, index=features.index, columns=columns)

**independent_component_features** – Runs FastICA (if available) to uncover statistically independent drivers.

In [None]:
def sparse_feature_selection(features: pd.DataFrame, target: pd.Series, alpha: float = 0.001) -> Optional[pd.Series]:
    try:
        from sklearn.linear_model import Lasso  # type: ignore
    except Exception:
        return None
    model = Lasso(alpha=alpha, max_iter=5000)
    model.fit(features.fillna(0.0), target.fillna(0.0))
    return pd.Series(model.coef_, index=features.columns)

**sparse_feature_selection** – Applies Lasso (when scikit-learn is installed) to highlight sparse predictive macro factors.


## 7. Temporal & Structural Adjustments
**Regime-Specific Preprocessing**
- Normalize/scale separately within regimes (bull vs. bear).

**Time-Varying Correlations**
- Dynamic Conditional Correlation (DCC-GARCH).

**Rolling Window Features**
- Adaptive statistics (mean/volatility/correlation over rolling periods).

**Event-Time Alignment**
- Align around earnings announcements, FOMC dates, recessions, policy changes.


In [None]:
def regime_specific_normalize(series: pd.Series, regimes: pd.Series) -> pd.Series:
    out = series.copy()
    for regime, mask in regimes.groupby(regimes).groups.items():
        out.iloc[list(mask)] = zscore_normalize(series.iloc[list(mask)])
    return out

**regime_specific_normalize** – Applies z-score scaling within each detected regime to respect structural breaks.

In [None]:
def dynamic_correlation(series_a: pd.Series, series_b: pd.Series, window: int = 60) -> pd.Series:
    return series_a.rolling(window=window, min_periods=window).corr(series_b)

**dynamic_correlation** – Tracks rolling correlation to proxy time-varying dependence (DCC-ready placeholder).

In [None]:
def rolling_window_features(series: pd.Series, window: int = 20) -> pd.DataFrame:
    roll_mean = series.rolling(window=window, min_periods=window).mean()
    roll_std = series.rolling(window=window, min_periods=window).std()
    return pd.DataFrame({"rolling_mean": roll_mean, "rolling_std": roll_std})

**rolling_window_features** – Generates rolling mean and volatility as adaptive statistics for downstream scaling.

In [None]:
def align_to_events(df: pd.DataFrame, date_col: str, events: Sequence[pd.Timestamp], window: int = 5) -> pd.DataFrame:
    frames = []
    df = df.sort_values(date_col)
    for event in events:
        mask = (df[date_col] >= event - pd.Timedelta(days=window)) & (df[date_col] <= event + pd.Timedelta(days=window))
        window_df = df.loc[mask].copy()
        window_df["event_date"] = event
        window_df["t_minus_event"] = (window_df[date_col] - event).dt.days
        frames.append(window_df)
    if not frames:
        return pd.DataFrame(columns=df.columns.tolist() + ["event_date", "t_minus_event"])
    return pd.concat(frames, ignore_index=True)

**align_to_events** – Builds event-time panels around key dates (earnings, policy) for structural analysis.


## 8. Cross-Sectional vs. Time-Series Treatment
**Cross-Sectional Normalization**
- Rank or z-score standardization across assets each day.

**Panel Data Handling**
- Fixed effects (sector/country dummies).
- Random effects for heterogeneous asset panels.


In [None]:
def cross_sectional_rank_normalize(df: pd.DataFrame, date_col: str, value_col: str) -> pd.Series:
    ranks = df.groupby(date_col)[value_col].rank(method="average")
    group_sizes = df.groupby(date_col)[value_col].transform("count")
    return (ranks - 0.5) / group_sizes

**cross_sectional_rank_normalize** – Produces daily fractional ranks for cross-sectional standardisation.

In [None]:
def panel_fixed_effects_encode(df: pd.DataFrame, categories: Sequence[str]) -> pd.DataFrame:
    out = df.copy()
    for cat in categories:
        dummies = pd.get_dummies(out[cat], prefix=cat, drop_first=True)
        out = pd.concat([out, dummies], axis=1)
    return out

**panel_fixed_effects_encode** – Adds fixed-effect dummies (e.g., sector, country) to absorb persistent panel biases.

In [None]:
def panel_random_effects_placeholder():
    raise NotImplementedError("Random effects estimators require specialised econometrics libraries.")

**panel_random_effects_placeholder** – Signals where to plug in a random-effects estimator when econometrics tooling is available.


## 9. Data Augmentation & Synthetic Data
**Bootstrapping & Block Bootstraps**
- Preserve autocorrelation in returns for resampling.

**Generative Models**
- GANs, VAEs for synthetic return/macro series.

**Backtesting Augmentation**
- Scenario generation (stress events, fat-tail shocks).


In [None]:
def block_bootstrap(series: pd.Series, block_size: int = 5, n_samples: int = 100, random_state: Optional[int] = None) -> np.ndarray:
    rng = np.random.default_rng(random_state)
    values = series.dropna().to_numpy()
    if len(values) == 0:
        return np.empty((n_samples, 0))
    blocks = [values[i:i + block_size] for i in range(0, len(values), block_size)]
    bootstrapped = np.array([
        np.concatenate(rng.choice(blocks, size=max(len(blocks), 1), replace=True))[:len(values)]
        for _ in range(n_samples)
    ])
    return bootstrapped

**block_bootstrap** – Samples contiguous blocks to retain autocorrelation when generating synthetic return paths.

In [None]:
def generate_synthetic_scenarios(mean: pd.Series, cov: pd.DataFrame, n_scenarios: int = 100, random_state: Optional[int] = None) -> pd.DataFrame:
    rng = np.random.default_rng(random_state)
    draws = rng.multivariate_normal(mean.fillna(0.0), cov.fillna(0.0), size=n_scenarios)
    return pd.DataFrame(draws, columns=mean.index)

**generate_synthetic_scenarios** – Creates Gaussian scenarios from estimated mean/covariance as a lightweight stand-in for GAN/VAEs.

In [None]:
def stress_event_shocks(series: pd.Series, shock_factor: float = 3.0) -> pd.Series:
    shocks = series.copy()
    tail_mask = detect_anomalies_zscore(series, z_thresh=2.0).astype(bool)
    shocks.loc[tail_mask] = shocks.loc[tail_mask] * shock_factor
    return shocks

**stress_event_shocks** – Amplifies detected tail events to craft stress scenarios for backtesting robustness.


## 10. Preprocessing Pipelines (Automation)
**Modular pipelines with:**
- ETL Layer: Raw data ingestion & corporate actions adjustment.
- Preprocessing Layer: Cleaning, scaling, alignment.
- Feature Layer: Market/macroeconomic/alternative features.
- Regime Layer: State-dependent preprocessing.
- Model Input Layer: Final normalized dataset for ML/RL agents.


In [None]:
def build_preprocessing_pipeline(df: pd.DataFrame,
                                 date_col: str,
                                 open_col: Optional[str],
                                 high_col: Optional[str],
                                 low_col: Optional[str],
                                 close_col: Optional[str],
                                 adj_close_col: Optional[str],
                                 volume_col: Optional[str]) -> pd.DataFrame:
    df = df.sort_values(date_col).drop_duplicates(subset=[date_col]).reset_index(drop=True)
    numeric_cols = [c for c in [open_col, high_col, low_col, close_col, adj_close_col, volume_col] if c is not None]
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col], errors="coerce")
    df[numeric_cols] = df[numeric_cols].ffill().bfill()
    df = adjust_ohlc_with_adjclose(df, open_col, high_col, low_col, close_col, adj_close_col)
    if volume_col is not None:
        df.loc[df[volume_col] == 0, volume_col] = np.nan
        df[volume_col] = df[volume_col].ffill().bfill()
    if "adj_close" not in df.columns:
        if close_col is None:
            raise ValueError("No usable close or adjusted close column.")
        df["adj_close"] = df[close_col]
    df["ret"] = compute_log_returns(df["adj_close"])
    df["ret_w"] = winsorize(df["ret"])
    df["vol_20"] = df["ret"].rolling(window=20, min_periods=20).std()
    df["vol_60"] = df["ret"].rolling(window=60, min_periods=60).std()
    df["rv_20_annual"] = df["vol_20"] * np.sqrt(252.0)
    df["rv_60_annual"] = df["vol_60"] * np.sqrt(252.0)
    df["ret_vol_scaled"] = volatility_scale(df["ret"], df["vol_20"])
    df["ret_vol_scaled_w"] = winsorize(df["ret_vol_scaled"])
    rsi14 = compute_rsi(df["adj_close"], window=14)
    macd_df = compute_macd(df["adj_close"], span_fast=12, span_slow=26, span_signal=9)
    bb_df = compute_bbands(df["adj_close"], window=20, n_std=2.0)
    df = pd.concat([df, rsi14.rename("rsi14"), macd_df, bb_df], axis=1)
    higher_moments = rolling_higher_moments(df["ret"], window=60)
    df = pd.concat([df, higher_moments], axis=1)
    regimes = detect_market_regimes(df["adj_close"], fast_window=20, slow_window=100)
    df["regime"] = regimes
    df["ret_regime_scaled"] = regime_specific_normalize(df["ret"], regimes)
    df["anomaly_flag"] = detect_anomalies_zscore(df["ret_vol_scaled_w"], z_thresh=3.0)
    return df

**build_preprocessing_pipeline** – Chains the cleaning, feature, regime, and anomaly steps into a single reusable data frame transformation.

In [None]:
def preprocess_cat_excel(input_path: str, output_base: str) -> Tuple[str, Optional[str]]:
    try:
        df = pd.read_excel(input_path)
    except Exception:
        df = pd.read_excel(input_path, engine="openpyxl")
    original_cols = list(df.columns)
    date_col = _first_present(original_cols, ["date", "timestamp", "time", "datetime"])
    open_col = _first_present(original_cols, ["open", "px_open", "o"])
    high_col = _first_present(original_cols, ["high", "px_high", "h"])
    low_col = _first_present(original_cols, ["low", "px_low", "l"])
    close_col = _first_present(original_cols, ["close", "px_last", "price", "last"])
    adj_close_col = _first_present(original_cols, ["adj close", "adjusted close"])
    if adj_close_col is not None:
        close_col = close_col or _first_present(original_cols, ["close", "px_last", "price", "last"])
    volume_col = _first_present(original_cols, ["volume", "vol", "qty", "turnover"])
    if date_col is None:
        maybe_date = original_cols[0]
        parsed = pd.to_datetime(df[maybe_date], errors="coerce")
        if parsed.notna().mean() > 0.7:
            date_col = maybe_date
    if date_col is None:
        raise ValueError("No date/time column detected.")
    df[date_col] = _coerce_datetime(df[date_col])
    df = df.dropna(subset=[date_col])
    processed = build_preprocessing_pipeline(df, date_col, open_col, high_col, low_col, close_col, adj_close_col, volume_col)
    adj_cols_available = [c for c in ["adj_open", "adj_high", "adj_low"] if c in processed.columns]
    out_cols = [date_col] + adj_cols_available + ["adj_close"]
    if volume_col is not None:
        out_cols.append(volume_col)
    out_cols += ["ret", "ret_w", "ret_vol_scaled", "ret_vol_scaled_w",
                 "vol_20", "vol_60", "rv_20_annual", "rv_60_annual",
                 "rsi14", "macd", "macd_signal", "macd_hist",
                 "bb_mid", "bb_upper", "bb_lower",
                 "rolling_skew", "rolling_kurt",
                 "regime", "ret_regime_scaled", "anomaly_flag"]
    tidy = processed[out_cols].copy()
    rename_map = {date_col: "date"}
    if volume_col is not None:
        rename_map[volume_col] = "volume"
    tidy = tidy.rename(columns=rename_map)
    tidy["date"] = pd.to_datetime(tidy["date"]).dt.tz_localize(None)
    csv_path = f"{output_base}.csv"
    tidy.to_csv(csv_path, index=False)
    parquet_path: Optional[str] = None
    try:
        import pyarrow  # type: ignore
        parquet_path = f"{output_base}.parquet"
        tidy.to_parquet(parquet_path, index=False)
    except Exception:
        pass
    return csv_path, parquet_path

**preprocess_cat_excel** – Executes the Nexus pipeline end-to-end: ingest Excel, harmonise schema, engineer features, and persist artefacts.

In [None]:
def main() -> None:
    parser = argparse.ArgumentParser(description="Preprocess equity time series (CAT US.xlsx style).")
    parser.add_argument("--input", required=True, help="Path to Excel file")
    parser.add_argument("--output_base", required=True, help="Output path prefix (no extension)")
    args = parser.parse_args()
    csv_path, pq_path = preprocess_cat_excel(args.input, args.output_base)
    print("Wrote:", csv_path)
    if pq_path:
        print("Wrote:", pq_path)
    else:
        print("Parquet not written (pyarrow missing).")

**main** – CLI entrypoint that wires command-line arguments into the preprocessing pipeline, reporting output artefacts.

In [None]:

if __name__ == "__main__":
    main()
