In [None]:
import sys
import os
sys.path.append('../src') 

# Load environment variables from .env file
from dotenv import load_dotenv
load_dotenv('../.env')  # Load .env file from parent directory

import warnings
import pandas as pd
import matplotlib

In [None]:
import os
from data.loader import load_stock_universe, load_etf_universe
import pandas as pd
# ✅ Verify API key
rapidapi_key = os.getenv('RAPIDAPI_KEY')
if rapidapi_key:
    print(f"✅ RAPIDAPI_KEY loaded (first 10 chars: {rapidapi_key[:10]}...)")
else:
    print("⚠️ RAPIDAPI_KEY not found in environment variables")
    print("   Make sure your .env contains: RAPIDAPI_KEY=your_key_here")

# ---- Fetch stocks (cached to *_universe.parquet) ----
stocks, sectors = load_stock_universe(update=False, include_sectors=True)


# ---- Fetch ETFs (cached to *_etf.parquet) ----
etfs = load_etf_universe(
    etf_symbols=None,             # or pass a list like ["SPY","QQQ",...]
    etf_csv_path=None,            # or a CSV path with a Symbol column
    update=False,                 # True to re-fetch
    rate_limit=1.0,
    interval="1d"
)

# ---- Quick summaries ----
def _summary(block, label):
    if not block:
        print(f"❌ No {label} loaded"); return
    n_syms = len(block.get("Close", pd.DataFrame()).columns) if "Close" in block else 0
    n_days = len(block.get("Close", pd.DataFrame()).index) if "Close" in block else 0
    print(f"✅ {label}: {n_syms} symbols × {n_days} days")

_summary(stocks, "stocks")
_summary(etfs, "ETFs")


In [None]:
data = {
    k: pd.concat([stocks.get(k, pd.DataFrame()),
                  etfs.get(k, pd.DataFrame())], axis=1).sort_index()
    for k in (set(stocks) | set(etfs))
}

In [None]:
# ✅ View what's loaded

for k, v in data.items():
    print(f"{k}: shape = {v.shape}")

In [None]:
import pandas as pd
import numpy as np
import nolds
import pandas_ta as ta
import warnings
from joblib import Parallel, delayed

# ===================== Params =====================
HURST_WINDOWS        = (128,)         # rolling window(s) for Hurst on returns
HURST_EMA_FOR        = 128            # which H track to smooth (must be in HURST_WINDOWS)
HURST_HALFLIFE       = 5              # halflife EMA for H smoothing; set None/0 to disable
HURST_EMA_SPAN       = None           # alternative to halflife; keep None if using halflife

BB_LENGTH            = 20
BB_STD               = 2.0

INCLUDE_TA           = False
MA_LIST              = [20, 50, 100, 200]

warnings.filterwarnings("ignore", category=FutureWarning, message=".*deprecated.*")

# ===================== Helpers =====================
def _rolling_hurst_vec(x: np.ndarray) -> float:
    try:
        return float(nolds.hurst_rs(np.asarray(x, dtype=float), fit="poly"))
    except Exception:
        return np.nan

def _ensure_bollinger(df: pd.DataFrame, src='adjclose', length=BB_LENGTH, std=BB_STD) -> pd.DataFrame:
    out = df.copy()
    try:
        bb = ta.bbands(out[src], length=length, std=std)
    except Exception:
        bb = None

    if bb is not None and not bb.empty:
        lo = bb.filter(like="BBL").iloc[:, 0]
        mid = bb.filter(like="BBM").iloc[:, 0]
        up = bb.filter(like="BBU").iloc[:, 0]
        out["bb_lower"] = lo
        out["bb_middle"] = mid
        out["bb_upper"] = up
    else:
        mid = out[src].rolling(length, min_periods=max(2, length//2)).mean()
        sd = out[src].rolling(length, min_periods=max(2, length//2)).std(ddof=0)
        out["bb_middle"] = mid
        out["bb_upper"]  = mid + std * sd
        out["bb_lower"]  = mid - std * sd

    rng = (out["bb_upper"] - out["bb_lower"]).replace(0, np.nan)
    out["bb_pct_b"] = (out[src] - out["bb_lower"]) / rng
    out["bb_width_pct"] = (out["bb_upper"] - out["bb_lower"]) / out["bb_middle"].replace(0, np.nan)
    return out

def compute_hurst_windows(series: pd.Series,
                          windows=HURST_WINDOWS,
                          ema_smooth_for=HURST_EMA_FOR,
                          ema_span=HURST_EMA_SPAN,
                          ema_halflife=HURST_HALFLIFE,
                          prefix="hurst") -> pd.DataFrame:
    out = pd.DataFrame(index=series.index)
    s = pd.to_numeric(series, errors="coerce")

    for w in windows:
        col = f"{prefix}_{w}"
        out[col] = s.rolling(window=w, min_periods=w).apply(_rolling_hurst_vec, raw=False)

    if ema_smooth_for is not None:
        base_col = f"{prefix}_{ema_smooth_for}"
        if base_col in out.columns:
            if ema_halflife is not None and ema_halflife > 0:
                out[f"{base_col}_emaHL{ema_halflife}"] = out[base_col].ewm(
                    halflife=ema_halflife, adjust=False, min_periods=1
                ).mean()
            elif ema_span is not None and ema_span > 1:
                out[f"{base_col}_ema{ema_span}"] = out[base_col].ewm(
                    span=ema_span, adjust=False, min_periods=1
                ).mean()
    return out

# ===================== WORKER =====================
def compute_indicators(symbol: str, series: pd.Series):
    df = pd.DataFrame(series).copy()
    df.columns = ['adjclose']
    df['adjclose'] = pd.to_numeric(df['adjclose'], errors='coerce')
    df['ret'] = np.log(df['adjclose']).diff()

    # Hurst
    hurst_block = compute_hurst_windows(
        df['ret'],
        windows=HURST_WINDOWS,
        ema_smooth_for=HURST_EMA_FOR,
        ema_span=HURST_EMA_SPAN,
        ema_halflife=HURST_HALFLIFE,
        prefix="hurst_ret"
    )
    out = df.join(hurst_block, how='left')

    # Bollinger
    out = _ensure_bollinger(out, src='adjclose', length=BB_LENGTH, std=BB_STD)

    if INCLUDE_TA:
        for ma in MA_LIST:
            out[f'ma_{ma}'] = ta.sma(out['adjclose'], length=ma)

    return symbol, out

# ===================== PARALLEL =====================
adj_df = data['AdjClose'].copy()

results = Parallel(n_jobs=-1, backend="multiprocessing")(
    delayed(compute_indicators)(symbol, adj_df[symbol]) for symbol in adj_df.columns
)

indicators_by_symbol = {sym: df for sym, df in results}

In [None]:
indicators_by_symbol['TSLA']['hurst_ret_128_emaHL5'].plot()

In [None]:
import numpy as np
import pandas as pd
from joblib import Parallel, delayed

def _ensure_mas(df: pd.DataFrame, src='adjclose', periods=(10,20,30,50,75,100,150,200)):
    for p in periods:
        col = f'ma_{p}'
        if col not in df.columns:
            df[col] = df[src].rolling(p, min_periods=p//2).mean()
    return df

def add_trend_features(
    df: pd.DataFrame,
    src_col: str = 'adjclose',
    ma_periods=(10, 20, 30, 50, 75, 100, 150, 200),
    slope_window: int = 20,
    eps: float = 1e-5,
    persist_halflife: int = 5,
    slope_scale: float = 0.05,   # scale for % slope -> [-1,1] via tanh
    weights=(0.5, 0.3, 0.2)      # (sign, slope, alignment) weights
) -> pd.DataFrame:
    """
    Adds continuous trend features (no manual buckets):

      - trend_score_sign ∈ [-1,1]: mean sign of MA slopes (deadzone eps)
      - trend_score_slope ∈ [-1,1]: avg % slope across MAs (tanh-normalized)
      - trend_alignment  ∈ [-1,1]: signed stacking of adjacent MAs
      - trend_persist_ema: EMA( trend_score_sign ) with halflife
      - trend_score_granular ∈ [-1,1]: weighted blend of (sign, slope, alignment)

    Also keeps helper columns:
      - sign_ma_{p}
      - pct_slope_ma_{p}   (per-day % slope of MA p over slope_window)
    """
    df = _ensure_mas(df, src=src_col, periods=ma_periods)

    # ---- 1) Sign of MA slopes over a lookback ----
    sign_cols, pct_slope_cols = [], []
    for p in ma_periods:
        ma = df[f'ma_{p}']
        # per-bar slope in price units
        raw_slope = (ma - ma.shift(slope_window)) / float(slope_window)
        # convert to % slope (relative to prior MA level) to be scale-invariant
        pct_slope = raw_slope / ma.shift(slope_window)
        df[f'pct_slope_ma_{p}'] = pct_slope.astype('float32')
        pct_slope_cols.append(f'pct_slope_ma_{p}')

        # soft sign with deadzone
        sign = np.where(raw_slope > eps, 1, np.where(raw_slope < -eps, -1, 0)).astype('float32')
        df[f'sign_ma_{p}'] = sign
        sign_cols.append(f'sign_ma_{p}')

    # Mean sign over non-zero votes
    sign_mat = df[sign_cols].to_numpy(dtype='float32')
    nz = (sign_mat != 0).sum(axis=1)
    sums = sign_mat.sum(axis=1)
    score_sign = np.divide(
        sums,
        np.where(nz == 0, np.nan, nz),
        out=np.zeros_like(sums, dtype='float32'),
        where=nz != 0
    )
    df['trend_score_sign'] = score_sign.astype('float32')

    # ---- 2) Magnitude via % slope, squashed to [-1,1] ----
    # average % slope across MAs, then tanh to bound & damp outliers
    avg_pct_slope = df[pct_slope_cols].mean(axis=1)
    df['trend_score_slope'] = np.tanh(avg_pct_slope / slope_scale).astype('float32')

    # ---- 3) Stacking / alignment of adjacent MAs ----
    # For uptrend: ma_short > ma_long; for downtrend: reverse.
    up_pairs = 0
    down_pairs = 0
    total_pairs = 0
    # compute signed alignment per row: (#up_pairs - #down_pairs) / total_pairs
    # adjacent pairs only for stability
    for i in range(len(ma_periods) - 1):
        a = df[f'ma_{ma_periods[i]}']
        b = df[f'ma_{ma_periods[i+1]}']
        up = (a > b).astype('int8')
        down = (a < b).astype('int8')
        if i == 0:
            up_pairs = up
            down_pairs = down
        else:
            up_pairs = up_pairs + up
            down_pairs = down_pairs + down
        total_pairs += 1
    # signed alignment in [-1,1]
    with np.errstate(invalid='ignore', divide='ignore'):
        align = (up_pairs - down_pairs) / float(total_pairs if total_pairs else 1)
    df['trend_alignment'] = align.astype('float32')

    # ---- 4) Persistence: EMA of the sign score ----
    if persist_halflife and persist_halflife > 0:
        df['trend_persist_ema'] = (
            df['trend_score_sign']
            .ewm(halflife=persist_halflife, adjust=False, min_periods=1)
            .mean()
            .astype('float32')
        )
    else:
        df['trend_persist_ema'] = df['trend_score_sign'].astype('float32')

    # ---- 5) Weighted blended granular score ----
    w_sign, w_slope, w_align = weights
    wsum = float(w_sign + w_slope + w_align) or 1.0
    df['trend_score_granular'] = (
        (w_sign * df['trend_score_sign'] +
         w_slope * df['trend_score_slope'] +
         w_align * df['trend_alignment']) / wsum
    ).astype('float32')

    return df


def _add_trend_for_symbol(sym, df):
    try:
        out = add_trend_features(
            df.copy(),
            src_col='adjclose',
            ma_periods=(10, 20, 30, 50, 75, 100, 150, 200),
            slope_window=20,
            eps=1e-5,
            persist_halflife=5,   # ~1 trading week
            slope_scale=0.05,     # 5%/day slope ~ tanh(1)
            weights=(0.5, 0.3, 0.2)
        )
        return sym, out
    except Exception as e:
        print(f"[trend_features] {sym} failed: {e}")
        return sym, df

# Parallel pass
results = Parallel(n_jobs=-1, backend="loky")(
    delayed(_add_trend_for_symbol)(sym, indicators_by_symbol[sym])
    for sym in indicators_by_symbol.keys()
)

indicators_by_symbol = {sym: df for sym, df in results}

In [None]:
import numpy as np
import pandas as pd
from joblib import Parallel, delayed

# ---------- helpers ----------
def _ensure_mas(df: pd.DataFrame, src='adjclose', periods=(20,50,100,200)) -> pd.DataFrame:
    out = df.copy()
    for p in periods:
        col = f'ma_{p}'
        if col not in out.columns:
            out[col] = out[src].rolling(p, min_periods=max(2, p//2)).mean()
    return out

def _zscore(s: pd.Series) -> pd.Series:
    m = s.mean(skipna=True); sd = s.std(skipna=True)
    if sd is None or sd == 0 or np.isnan(sd): 
        return s - m
    return (s - m) / sd

# ---------- main feature builder ----------
def add_min_pct_dist_ma(
    df: pd.DataFrame,
    ma_lengths=[20, 50, 100, 200],
    normalize=True,                      # per-symbol z-score columns
    add_phase_features=True              # pct_dist_ma_20, pct_dist_ma_50, relative_dist_20_50
) -> pd.DataFrame:
    """
    Adds % distance to each MA, min absolute % distance, and (optionally)
    20/50-specific phase features and per-column z-scores.
    """
    out = _ensure_mas(df, src='adjclose', periods=tuple(ma_lengths)).copy()

    # % distances to each MA
    pct_cols = []
    for ma in ma_lengths:
        col_ma = f'ma_{ma}'
        if col_ma not in out.columns:
            raise ValueError(f"Missing column: {col_ma}")
        denom = out[col_ma].replace(0, np.nan)
        col_pct = f'pct_dist_ma_{ma}'
        out[col_pct] = (out['adjclose'] - out[col_ma]) / denom
        pct_cols.append(col_pct)

    # min absolute % distance across the set of MAs
    out['min_pct_dist_ma'] = out[pct_cols].abs().min(axis=1)

    # phase features (short vs mid-term stretch)
    if add_phase_features:
        if 'pct_dist_ma_20' in out and 'pct_dist_ma_50' in out:
            out['relative_dist_20_50'] = out['pct_dist_ma_20'] - out['pct_dist_ma_50']

    # optional z-scored versions (per symbol)
    if normalize:
        for c in pct_cols + ['min_pct_dist_ma'] + (['relative_dist_20_50'] if 'relative_dist_20_50' in out else []):
            out[f'{c}_z'] = _zscore(out[c])

    return out

# ---------- parallel application across your dict ----------
def _add_min_pct_for_symbol(sym: str, df: pd.DataFrame, ma_lengths=(20,50,100,200),
                            normalize=True, add_phase_features=True):
    try:
        out = add_min_pct_dist_ma(df, ma_lengths=list(ma_lengths),
                                  normalize=normalize, add_phase_features=add_phase_features)
        return sym, out
    except Exception as e:
        print(f"[min_pct_dist_ma] {sym} failed: {e}")
        return sym, df

# Run after your indicators_by_symbol has 'adjclose' (and any existing columns you want to keep)
results = Parallel(n_jobs=-1, backend="loky")(
    delayed(_add_min_pct_for_symbol)(
        sym, indicators_by_symbol[sym],
        ma_lengths=(20,50,100,200),
        normalize=True,
        add_phase_features=True
    )
    for sym in indicators_by_symbol.keys()
)

# Rebuild the dict
indicators_by_symbol = {sym: df for sym, df in results}

In [None]:
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import os

RS_LOOKBACK = 60
RS_SLOPE_WIN = 20
SPY_SYMBOL = "SPY"  # change if you use another benchmark

# ---- Build sector benchmarks (ETF adjclose per sector) ----
def build_sector_benchmarks(indicators_by_symbol, sectors, sector_to_etf):
    if 'sectors' not in globals():
        raise NameError("`sectors` dict is not defined in this scope.")

    # collect ETF adjclose series for mapped sectors
    sector_benchmarks = {}
    for sec, etf_sym in sector_to_etf.items():
        df_etf = indicators_by_symbol.get(etf_sym)
        if df_etf is not None and 'adjclose' in df_etf.columns:
            sector_benchmarks[sec] = pd.to_numeric(df_etf['adjclose'], errors='coerce')
        # else: silently skip; RS worker will handle None

    # (optional) sanity: warn for sector names present in `sectors` but missing a benchmark
    missing = {sectors[sym] for sym in sectors if isinstance(sectors[sym], str)} - set(sector_benchmarks.keys())
    if missing:
        print(f"⚠️ No ETF benchmark for sectors: {sorted(missing)[:8]}{' …' if len(missing)>8 else ''}")

    return sector_benchmarks


sector_to_etf = {
    "technology services": "XLK",
    "electronic technology": "XLK",
    "finance": "XLF",
    "retail trade": "XLY",
    "consumer services": "XLY",
    "consumer durables": "XLY",
    "consumer non-durables": "XLP",
    "health technology": "XLV",
    "health services": "XLV",
    "producer manufacturing": "XLI",
    "industrial services": "XLI",
    "distribution services": "XLI",
    "transportation": "XLI",
    "energy minerals": "XLE",
    "non-energy minerals": "XLB",
    "process industries": "XLB",
    "communications": "XLC",
    "commercial services": "XLC",
    "utilities": "XLU",
    "miscellaneous": "SPY",
}

# Normalize lookup
sector_benchmarks = {
    sector: indicators_by_symbol[etf]['adjclose']
    for sector, etf in sector_to_etf.items()
    if etf in indicators_by_symbol
}


def _rs_worker(sym: str, df: pd.DataFrame, sector_bench: pd.Series, spy_bench: pd.Series):
    """
    Return ONLY the new RS columns for symbol `sym` to reduce IPC payload size.
    Adds:
      - rel_strength_sector, rel_strength_sector_norm, rel_strength_sector_slope20
      - rel_strength_spy,    rel_strength_spy_norm,    rel_strength_spy_slope20
    """
    try:
        if 'adjclose' not in df.columns:
            return sym, None

        idx   = df.index
        price = pd.to_numeric(df['adjclose'], errors='coerce')

        out = pd.DataFrame(index=idx)

        # ---- helper to compute RS set safely ----
        def _compute_rs(block_prefix: str, bench: pd.Series):
            if bench is None:
                return
            bench_s = pd.to_numeric(bench.reindex(idx), errors='coerce').replace(0, np.nan)
            if bench_s.notna().sum() < 10 or price.notna().sum() < 10:
                return
            rs = price / bench_s
            roll = rs.rolling(RS_LOOKBACK, min_periods=max(5, RS_LOOKBACK//3)).mean()
            rs_norm = (rs / roll) - 1.0
            rs_slope = (rs - rs.shift(RS_SLOPE_WIN)) / float(RS_SLOPE_WIN)
            out[f'{block_prefix}']            = rs.astype('float32')
            out[f'{block_prefix}_norm']       = rs_norm.astype('float32')
            out[f'{block_prefix}_slope20']    = rs_slope.astype('float32')

        # Sector RS
        _compute_rs('rel_strength_sector', sector_bench)

        # SPY RS (skip for SPY itself)
        if spy_bench is not None and sym != SPY_SYMBOL:
            _compute_rs('rel_strength_spy', spy_bench)

        return sym, (out if not out.empty else None)

    except Exception as e:
        print(f"[RS worker] {sym} failed: {e}")
        return sym, None


# ---- Build SPY benchmark series once ----
spy_series = None
if SPY_SYMBOL in indicators_by_symbol and 'adjclose' in indicators_by_symbol[SPY_SYMBOL].columns:
    spy_series = pd.to_numeric(indicators_by_symbol[SPY_SYMBOL]['adjclose'], errors='coerce')
else:
    print(f"⚠️ {SPY_SYMBOL} not found in indicators_by_symbol or missing 'adjclose' — SPY RS will be skipped.")

# ---- Parallel compute: pass both sector benchmark and SPY series ----
results = Parallel(
    n_jobs=max(1, os.cpu_count() - 1),
    backend="loky",         # use 'threading' if memory is tight
    batch_size=8,
    verbose=0
)(
    delayed(_rs_worker)(
        sym,
        indicators_by_symbol[sym],
        sector_benchmarks.get(sectors.get(sym,"").lower()) if 'sectors' in globals() else None,
        spy_series
    )
    for sym in indicators_by_symbol.keys()
)

# ---- Merge results safely (overwrite or create columns) ----
for sym, small_df in results:
    if small_df is not None and not small_df.empty:
        base = indicators_by_symbol[sym]
        # align index (in case); then assign columns in-place
        small_df = small_df.reindex(base.index)
        for col in small_df.columns:
            base[col] = small_df[col].astype('float32')

In [None]:
indicators_by_symbol['TSLA'].columns
# sectors['TSLA']

In [None]:
import numpy as np
import pandas as pd
import pandas_ta as ta
from joblib import Parallel, delayed

ATR_LENGTH = 14

def _add_atr_pct_for_symbol(sym: str, df: pd.DataFrame,
                            high_s: pd.Series, low_s: pd.Series,
                            length: int = ATR_LENGTH):
    try:
        out = df.copy()
        h = pd.to_numeric(high_s, errors='coerce')
        l = pd.to_numeric(low_s, errors='coerce')
        c = pd.to_numeric(out['adjclose'], errors='coerce')
        atr = ta.atr(h, l, c, length=length)
        out['atr_percent'] = (atr / c) * 100
        return sym, out
    except Exception as e:
        print(f"[ATR%] {sym} failed: {e}")
        return sym, df

# Wide OHLC inputs
high_df = data['High']
low_df  = data['Low']

# Parallel: same rebuild pattern you already use
results = Parallel(n_jobs=-1, backend="loky")(
    delayed(_add_atr_pct_for_symbol)(
        sym,
        indicators_by_symbol[sym],
        high_df[sym],
        low_df[sym],
        ATR_LENGTH
    )
    for sym in indicators_by_symbol.keys()
    if sym in high_df.columns and sym in low_df.columns
)

indicators_by_symbol = {sym: df for sym, df in results}

In [None]:
indicators_by_symbol['TSLA'].columns

In [None]:
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import os

VOL_WIN_SHORT = 20
VOL_WIN_LONG  = 50
OBV_USE_ADJCLOSE = True

def _basic_volume_features(df: pd.DataFrame) -> pd.DataFrame:
    out = pd.DataFrame(index=df.index)

    has_vol = 'volume' in df.columns
    if has_vol:
        vol = pd.to_numeric(df['volume'], errors='coerce')

        # rolling means
        out['vol_ma_20'] = vol.rolling(VOL_WIN_SHORT, min_periods=5).mean()
        out['vol_ma_50'] = vol.rolling(VOL_WIN_LONG,  min_periods=10).mean()

        # z-scores
        mu20 = vol.rolling(20, min_periods=5).mean()
        sd20 = vol.rolling(20, min_periods=5).std(ddof=0)
        out['vol_z_20'] = (vol - mu20) / sd20.replace(0, np.nan)

        mu60 = vol.rolling(60, min_periods=15).mean()
        sd60 = vol.rolling(60, min_periods=15).std(ddof=0)
        out['vol_z_60'] = (vol - mu60) / sd60.replace(0, np.nan)

        # relative volume
        out['rvol_20'] = vol / out['vol_ma_20'].replace(0, np.nan)
        out['rvol_50'] = vol / out['vol_ma_50'].replace(0, np.nan)

    # dollar volume & OBV need price; they’ll be skipped if price missing
    px_col = 'adjclose' if (OBV_USE_ADJCLOSE and 'adjclose' in df.columns) else ('close' if 'close' in df.columns else None)
    if px_col is not None and has_vol:
        px = pd.to_numeric(df[px_col], errors='coerce')
        vol = pd.to_numeric(df['volume'], errors='coerce')
        dvol = (px * vol)
        dvol_ma20 = dvol.rolling(20, min_periods=5).mean()
        out['dollar_vol_ma_20'] = dvol_ma20
        out['rdollar_vol_20']   = dvol / dvol_ma20.replace(0, np.nan)

        obv = (np.sign(px.diff()).fillna(0.0) * vol).fillna(0.0).cumsum()
        out['obv'] = obv
        obv_mu = obv.rolling(60, min_periods=20).mean()
        obv_sd = obv.rolling(60, min_periods=20).std(ddof=0)
        out['obv_z_60'] = (obv - obv_mu) / obv_sd.replace(0, np.nan)

    # cast light
    for c in out.columns:
        out[c] = pd.to_numeric(out[c], errors='coerce').astype('float32')
    return out

def _vol_worker(sym: str):
    df = indicators_by_symbol[sym]
    small = _basic_volume_features(df)
    # Always return the frame, even if all NaN columns (not empty)
    return sym, small

# Ensure OHLCV columns are present in indicators_by_symbol
# Columns we want to pull in from `data`
cols_to_add = ["Open", "High", "Low", "Close", "Volume"]

for sym, df in indicators_by_symbol.items():
    for col in cols_to_add:
        if col in data and sym in data[col]:
            # Get the Series for this column/symbol
            series = data[col][sym]

            # Align to main DataFrame index
            series = series.reindex(df.index)

            # Lowercase column name
            col_lower = col.lower()

            # Add or overwrite
            if col_lower not in df.columns:
                df[col_lower] = series
            else:
                df[col_lower].update(series)

    indicators_by_symbol[sym] = df
# indicators_by_symbol['TSLA']#['Open']

In [None]:

# run (threads) and write back explicitly
results = Parallel(
    n_jobs=min(8, os.cpu_count() or 4),
    backend="threading",
    prefer="threads",
    verbose=0
)(
    delayed(_vol_worker)(sym) for sym in indicators_by_symbol.keys()
)

added_report = []
for sym, small_df in results:
    base = indicators_by_symbol[sym]
    # ensure index alignment
    small_df = small_df.reindex(base.index)

    # add/overwrite columns
    before = set(base.columns)
    for col in small_df.columns:
        base[col] = small_df[col]
    indicators_by_symbol[sym] = base  # explicit write-back

    added = sorted(set(base.columns) - before)
    # keep a tiny sample report for sanity
    if added:
        added_report.append((sym, added[:4] + (['...'] if len(added) > 4 else [])))

# Optional: quick confirmation print for a few symbols
for sym, cols in added_report[:5]:
    print(f"[volume] {sym}: added {cols}")

In [None]:
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import os

RANGE_LOOKBACKS = (5, 10, 20)

def _range_features_one(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute range-based features for a single symbol DataFrame.
    Requires columns: 'open','high','low','close' (lowercase).
    Uses 'atr_percent' and 'rvol_20' if present for extras.
    """
    need = {'open','high','low','close'}
    if not need.issubset(df.columns):
        return pd.DataFrame(index=df.index)  # nothing to do

    out = pd.DataFrame(index=df.index)

    o = pd.to_numeric(df['open'], errors='coerce')
    h = pd.to_numeric(df['high'], errors='coerce')
    l = pd.to_numeric(df['low'],  errors='coerce')
    c = pd.to_numeric(df['close'],errors='coerce')

    # Basic ranges
    hl = h - l
    out['hl_range'] = hl
    out['hl_range_pct_close'] = hl / c.replace(0, np.nan)

    # True range (Wilder)
    prev_c = c.shift(1)
    tr = pd.concat([h - l, (h - prev_c).abs(), (l - prev_c).abs()], axis=1).max(axis=1)
    out['true_range'] = tr
    out['tr_pct_close'] = tr / c.replace(0, np.nan)

    # Rolling n-day constructs
    for n in RANGE_LOOKBACKS:
        n_high = h.rolling(n, min_periods=max(3, n//3)).max()
        n_low  = l.rolling(n, min_periods=max(3, n//3)).min()
        n_rng  = n_high - n_low
        n_rng_ma = hl.rolling(n, min_periods=max(3, n//3)).mean()
        n_rng_mu = hl.rolling(n, min_periods=max(3, n//3)).mean()
        n_rng_sd = hl.rolling(n, min_periods=max(3, n//3)).std(ddof=0)

        out[f'{n}d_high'] = n_high
        out[f'{n}d_low']  = n_low
        out[f'{n}d_range'] = n_rng
        out[f'{n}d_range_pct_close'] = n_rng / c.replace(0, np.nan)

        denom = (n_high - n_low).replace(0, np.nan)
        out[f'pos_in_{n}d_range'] = (c - n_low) / denom

        out[f'breakout_up_{n}d'] = (c - n_high) / n_high.replace(0, np.nan)
        out[f'breakout_dn_{n}d'] = (c - n_low)  / n_low.replace(0, np.nan)

        out[f'range_expansion_{n}d'] = hl / n_rng_ma.replace(0, np.nan)
        out[f'range_z_{n}d'] = (hl - n_rng_mu) / n_rng_sd.replace(0, np.nan)

    # Gaps
    out['gap_pct'] = (o - prev_c) / prev_c.replace(0, np.nan)

    if 'atr_percent' in df.columns:
        atrp = pd.to_numeric(df['atr_percent'], errors='coerce')  # already percent
        out['gap_atr_ratio'] = out['gap_pct'] / (atrp.replace(0, np.nan) / 100.0)

    # Interaction with RVOL if available
    if 'rvol_20' in df.columns:
        rvol20 = pd.to_numeric(df['rvol_20'], errors='coerce')
        out['range_x_rvol20'] = out['hl_range_pct_close'] * rvol20

    # Light dtypes
    for ccol in out.columns:
        out[ccol] = pd.to_numeric(out[ccol], errors='coerce').astype('float32')

    return out

def _range_worker(sym: str):
    base = indicators_by_symbol[sym]
    small = _range_features_one(base)
    return sym, small

# --- run (threads) and join into indicators_by_symbol ---
results = Parallel(
    n_jobs=min(8, os.cpu_count() or 4),
    backend="threading",
    prefer="threads",
    verbose=0
)(
    delayed(_range_worker)(sym) for sym in indicators_by_symbol.keys()
)

for sym, small_df in results:
    if small_df is not None and not small_df.empty:
        base = indicators_by_symbol[sym]
        small_df = small_df.reindex(base.index)
        for col in small_df.columns:
            base[col] = small_df[col]

In [None]:
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import os

def _forward_returns(df: pd.DataFrame, max_days: int = 20) -> pd.DataFrame:
    """
    Compute forward returns from 1 to max_days trading days out.
    Returns a DataFrame with columns: ret_1d ... ret_{max_days}d
    """
    out = pd.DataFrame(index=df.index)
    if "adjclose" not in df.columns:
        return out

    px = pd.to_numeric(df["adjclose"], errors="coerce")
    for n in range(1, max_days + 1):
        out[f"ret_{n}d"] = px.shift(-n) / px - 1

    # lighter dtype
    for c in out.columns:
        out[c] = out[c].astype("float32")

    return out


def _ret_worker(sym: str, max_days: int = 20):
    df = indicators_by_symbol[sym]
    small = _forward_returns(df, max_days=max_days)
    return sym, small


# --- run in parallel and join ---
results = Parallel(
    n_jobs=min(8, os.cpu_count() or 4),
    backend="threading",
    prefer="threads",
    verbose=0
)(
    delayed(_ret_worker)(sym, max_days=20) for sym in indicators_by_symbol.keys()
)

for sym, small_df in results:
    if small_df is not None and not small_df.empty:
        base = indicators_by_symbol[sym]
        small_df = small_df.reindex(base.index)
        for col in small_df.columns:
            base[col] = small_df[col]

In [None]:
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import os

def _volvol_features_one(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute rolling volume volatility features on a single symbol frame.
    Requires a 'volume' column (lowercase).
    """
    out = pd.DataFrame(index=df.index)
    if 'volume' not in df.columns:
        return out

    v = pd.to_numeric(df['volume'], errors='coerce')

    # Rolling std of volume (short & long)
    out['vol_rolling_20d'] = v.rolling(20, min_periods=10).std(ddof=0)
    out['vol_rolling_60d'] = v.rolling(60, min_periods=20).std(ddof=0)

    # Vol of vol: std of the 20d rolling std over another 20d window
    out['vol_of_vol_20d'] = out['vol_rolling_20d'].rolling(20, min_periods=10).std(ddof=0)

    # Light dtypes
    for c in out.columns:
        out[c] = pd.to_numeric(out[c], errors='coerce').astype('float32')

    return out

def _volvol_worker(sym: str):
    base = indicators_by_symbol[sym]
    small = _volvol_features_one(base)
    return sym, small

# Use threads to avoid pickling large frames; adjust n_jobs as you like
results = Parallel(
    n_jobs=min(8, os.cpu_count() or 4),
    backend="threading",
    prefer="threads",
    verbose=0
)(
    delayed(_volvol_worker)(sym) for sym in indicators_by_symbol.keys()
)

# Join back into the master dict
for sym, small_df in results:
    if small_df is not None and not small_df.empty:
        base = indicators_by_symbol[sym]
        small_df = small_df.reindex(base.index)
        for col in small_df.columns:
            base[col] = small_df[col]

In [None]:
import pandas as pd
import numpy as np

# ---- 1) Provide or load your S&P 500 list ----
# Replace this with your maintained list (or load from a CSV)
sp500_tickers = ["MMM", "AOS", "ABT", "ABBV", "ACN", "ADBE", "AMD", "AES", "AFL", "A", 
                 "APD", "ABNB", "AKAM", "ALB", "ARE", "ALGN", "ALLE", "LNT", "ALL", "GOOGL", 
                 "GOOG", "MO", "AMZN", "AMCR", "AEE", "AEP", "AXP", "AIG", "AMT", "AWK", 
                 "AMP", "AME", "AMGN", "APH", "ADI", "AON", "APA", "APO", "AAPL", "AMAT", 
                 "APTV", "ACGL", "ADM", "ANET", "AJG", "AIZ", "T", "ATO", "ADSK", "ADP", 
                 "AZO", "AVB", "AVY", "AXON", "BKR", "BALL", "BAC", "BAX", "BDX", "BRK-B", 
                 "BBY", "TECH", "BIIB", "BLK", "BX", "XYZ", "BK", "BA", "BKNG", "BSX", "BMY", 
                 "AVGO", "BR", "BRO", "BF-B", "BLDR", "BG", "BXP", "CHRW", "CDNS", "CZR",
                 "CPT", "CPB", "COF", "CAH", "KMX", "CCL", "CARR", "CAT", "CBOE", "CBRE", 
                 "CDW", "COR", "CNC", "CNP", "CF", "CRL", "SCHW", "CHTR", "CVX", "CMG", "CB", 
                 "CHD", "CI", "CINF", "CTAS", "CSCO", "C", "CFG", "CLX", "CME", "CMS", "KO", 
                 "CTSH", "COIN", "CL", "CMCSA", "CAG", "COP", "ED", "STZ", "CEG", "COO", "CPRT", 
                 "GLW", "CPAY", "CTVA", "CSGP", "COST", "CTRA", "CRWD", "CCI", "CSX", "CMI", 
                 "CVS", "DHR", "DRI", "DDOG", "DVA", "DAY", "DECK", "DE", "DELL", "DAL", "DVN",
                 "DXCM", "FANG", "DLR", "DG", "DLTR", "D", "DPZ", "DASH", "DOV", "DOW", "DHI", 
                 "DTE", "DUK", "DD", "EMN", "ETN", "EBAY", "ECL", "EIX", "EW", "EA", "ELV", "EMR", 
                 "ENPH", "ETR", "EOG", "EPAM", "EQT", "EFX", "EQIX", "EQR", "ERIE", "ESS", "EL",
                 "EG", "EVRG", "ES", "EXC", "EXE", "EXPE", "EXPD", "EXR", "XOM", "FFIV", "FDS", 
                 "FICO", "FAST", "FRT", "FDX", "FIS", "FITB", "FSLR", "FE", "FI", "F", "FTNT", 
                 "FTV", "FOXA", "FOX", "BEN", "FCX", "GRMN", "IT", "GE", "GEHC", "GEV", "GEN", 
                 "GNRC", "GD", "GIS", "GM", "GPC", "GILD", "GPN", "GL", "GDDY", "GS", "HAL",
                 "HIG", "HAS", "HCA", "DOC", "HSIC", "HSY", "HPE", "HLT", "HOLX", "HD", "HON",
                 "HRL", "HST", "HWM", "HPQ", "HUBB", "HUM", "HBAN", "HII", "IBM", "IEX", "IDXX", 
                 "ITW", "INCY", "IR", "PODD", "INTC", "ICE", "IFF", "IP", "IPG", "INTU", "ISRG",
                 "IVZ", "INVH", "IQV", "IRM", "JBHT", "JBL", "JKHY", "J", "JNJ", "JCI", "JPM", 
                 "K", "KVUE", "KDP", "KEY", "KEYS", "KMB", "KIM", "KMI", "KKR", "KLAC", "KHC", 
                 "KR", "LHX", "LH", "LRCX", "LW", "LVS", "LDOS", "LEN", "LII", "LLY", "LIN", "LYV", 
                 "LKQ", "LMT", "L", "LOW", "LULU", "LYB", "MTB", "MPC", "MKTX", "MAR", "MMC", "MLM", 
                 "MAS", "MA", "MTCH", "MKC", "MCD", "MCK", "MDT", "MRK", "META", "MET", "MTD", "MGM", 
                 "MCHP", "MU", "MSFT", "MAA", "MRNA", "MHK", "MOH", "TAP", "MDLZ", "MPWR", "MNST", 
                 "MCO", "MS", "MOS", "MSI", "MSCI", "NDAQ", "NTAP", "NFLX", "NEM", "NWSA", "NWS", 
                 "NEE", "NKE", "NI", "NDSN", "NSC", "NTRS", "NOC", "NCLH", "NRG", "NUE", "NVDA", 
                 "NVR", "NXPI", "ORLY", "OXY", "ODFL", "OMC", "ON", "OKE", "ORCL", "OTIS", "PCAR",
                 "PKG", "PLTR", "PANW", "PH", "PAYX", "PAYC", "PYPL", "PNR", "PEP", "PFE", "PCG",
                 "PM", "PSX", "PNW", "PNC", "POOL", "PPG", "PPL", "PFG", "PG", "PGR", "PLD", "PRU", 
                 "PEG", "PTC", "PSA", "PHM", "PWR", "QCOM", "DGX", "RL", "RJF", "RTX", "O", "REG",
                 "REGN", "RF", "RSG", "RMD", "RVTY", "ROK", "ROL", "ROP", "ROST", "RCL", "SPGI", 
                 "CRM", "SBAC", "SLB", "STX", "SRE", "NOW", "SHW", "SPG", "SWKS", "SJM", "SW", "SNA",
                 "SOLV", "SO", "LUV", "SWK", "SBUX", "STT", "STLD", "STE", "SYK", "SMCI", "SYF", "SNPS", 
                 "SYY", "TMUS", "TROW", "TTWO", "TPR", "TRGP", "TGT", "TEL", "TDY", "TER", "TSLA", 
                 "TXN", "TPL", "TXT", "TMO", "TJX", "TKO", "TTD", "TSCO", "TT", "TDG", "TRV", "TRMB", 
                 "TFC", "TYL", "TSN", "USB", "UBER", "UDR", "ULTA", "UNP", "UAL", "UPS", "URI", "UNH", 
                 "UHS", "VLO", "VTR", "VLTO", "VRSN", "VRSK", "VZ", "VRTX", "VTRS", "VICI", "V", "VST", 
                 "VMC", "WRB", "GWW", "WAB", "WBA", "WMT", "DIS", "WBD", "WM", "WAT", "WEC", "WFC", "WELL", 
                 "WST", "WDC", "WY", "WSM", "WMB", "WTW", "WDAY", "WYNN", "XEL", "XYL", "YUM", "ZBRA", "ZBH", "ZTS"]
sp500_tickers = [t for t in sp500_tickers if t in indicators_by_symbol]

# --- 2) Build per-symbol "above MAxx" indicators ---
above_ma20 = {}
above_ma50 = {}
above_ma200 = {}

for sym in sp500_tickers:
    df = indicators_by_symbol[sym]

    # Choose a price column
    price_col = 'close' if 'close' in df.columns else ('adjclose' if 'adjclose' in df.columns else None)
    if price_col is None:
        continue

    px = pd.to_numeric(df[price_col], errors='coerce')

    # MA20
    if 'ma_20' in df.columns:
        ma20 = pd.to_numeric(df['ma_20'], errors='coerce')
    else:
        ma20 = px.rolling(20, min_periods=10).mean()
    above_ma20[sym] = (px > ma20).astype('float32')

    # MA50
    if 'ma_50' in df.columns:
        ma50 = pd.to_numeric(df['ma_50'], errors='coerce')
    else:
        ma50 = px.rolling(50, min_periods=25).mean()
    above_ma50[sym] = (px > ma50).astype('float32')

    # MA200
    if 'ma_200' in df.columns:
        ma200 = pd.to_numeric(df['ma_200'], errors='coerce')
    else:
        ma200 = px.rolling(200, min_periods=100).mean()
    above_ma200[sym] = (px > ma200).astype('float32')

# --- 3) Bail gracefully if no data ---
if not above_ma50:
    raise ValueError("No S&P 500 members with usable price/MA data found in indicators_by_symbol.")

# --- 4) Combine to compute % above per date ---
panel_20 = pd.DataFrame(above_ma20)
panel_50 = pd.DataFrame(above_ma50)
panel_200 = pd.DataFrame(above_ma200)

pct_above_20 = panel_20.mean(axis=1, skipna=True) * 100.0
pct_above_50 = panel_50.mean(axis=1, skipna=True) * 100.0
pct_above_200 = panel_200.mean(axis=1, skipna=True) * 100.0

pct_above_20.name = "pct_sp500_above_ma20"
pct_above_50.name = "pct_sp500_above_ma50"
pct_above_200.name = "pct_sp500_above_ma200"

# --- 5) Attach breadth series to every symbol’s DataFrame ---
for sym, df in indicators_by_symbol.items():
    df['pct_sp500_above_ma20'] = pd.to_numeric(pct_above_20.reindex(df.index), errors='coerce').astype('float32')
    df['pct_sp500_above_ma50'] = pd.to_numeric(pct_above_50.reindex(df.index), errors='coerce').astype('float32')
    df['pct_sp500_above_ma200'] = pd.to_numeric(pct_above_200.reindex(df.index), errors='coerce').astype('float32')

In [None]:
sp500_universe = [t for t in sp500_tickers if t in indicators_by_symbol]

# Keep only tickers we actually have data for

# --- 1) Build a daily change sign panel for the S&P 500 ---
chg_sign = {}
for sym in sp500_universe:
    df = indicators_by_symbol[sym]
    # choose a price column
    price_col = 'close' if 'close' in df.columns else ('adjclose' if 'adjclose' in df.columns else None)
    if price_col is None:
        continue
    px = pd.to_numeric(df[price_col], errors='coerce')
    chg = px.diff()
    # sign: +1 advance, -1 decline, 0 unchanged/NaN
    chg_sign[sym] = np.sign(chg).replace({np.nan: 0}).astype('int8')

if not chg_sign:
    raise ValueError("Could not compute change signs for any S&P 500 members.")

panel = pd.DataFrame(chg_sign)  # index: dates, columns: symbols

# --- 2) Daily breadth stats ---
adv = (panel > 0).sum(axis=1).astype('float32')
dec = (panel < 0).sum(axis=1).astype('float32')
tot = (panel != 0).sum(axis=1).astype('float32')  # excludes unchanged

ad_net  = (adv - dec).astype('float32')
ad_line = ad_net.fillna(0).cumsum().astype('float32')
pct_adv = (adv / tot.replace(0, np.nan) * 100.0).astype('float32')

adv.name = "sp500_adv"
dec.name = "sp500_dec"
ad_net.name = "sp500_ad_net"
ad_line.name = "sp500_ad_line"
pct_adv.name = "sp500_pct_advancing"

# --- 3) Attach to EVERY symbol’s DataFrame ---
for sym, df in indicators_by_symbol.items():
    idx = df.index
    df['sp500_adv']           = adv.reindex(idx).astype('float32')
    df['sp500_dec']           = dec.reindex(idx).astype('float32')
    df['sp500_ad_net']        = ad_net.reindex(idx).astype('float32')
    df['sp500_ad_line']       = ad_line.reindex(idx).astype('float32')
    df['sp500_pct_advancing'] = pct_adv.reindex(idx).astype('float32')

In [None]:
for col in indicators_by_symbol['TSLA'].columns:
    print(col)

In [None]:
import matplotlib.pyplot as plt

fig, ax1 = plt.subplots(figsize=(12, 6))

# First series: hurst (left y-axis)
color1 = 'tab:blue'
ax1.set_xlabel('Date')
ax1.set_ylabel('Hurst (emaHL5)', color=color1)
df['hurst_ret_128_emaHL5'].plot(ax=ax1, color=color1, label='Hurst (emaHL5)')
ax1.tick_params(axis='y', labelcolor=color1)

# Second series: trend_score (right y-axis)
ax2 = ax1.twinx()
color2 = 'tab:orange'
ax2.set_ylabel('Trend Score', color=color2)
df['trend_persist_ema'].plot(ax=ax2, color=color2, label='Trend Score')
ax2.tick_params(axis='y', labelcolor=color2)

# Optional: align legends
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines + lines2, labels + labels2, loc='upper left')

plt.title('Hurst vs Trend Score')
plt.show()

In [None]:
fig, ax1 = plt.subplots(figsize=(12, 6))
ax1.set_xlabel('Date')
ax1.set_ylabel('ATR%', color='tab:green')
df['atr_percent'].plot(ax=ax1, color='tab:green', label='ATR%')
ax1.tick_params(axis='y', labelcolor='tab:green')

ax2 = ax1.twinx()
ax2.set_ylabel('Trend Score', color='tab:orange')
df['trend_persist_ema'].plot(ax=ax2, color='tab:orange', label='Trend Score')
ax2.tick_params(axis='y', labelcolor='tab:orange')

lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines + lines2, labels + labels2, loc='upper left')

plt.title('ATR% vs Trend Score')
plt.show()

In [None]:
import matplotlib.pyplot as plt

def plot_atr_trend_quadrants(df, symbol):
    # Pick the two features
    x = df['trend_score']
    y = df['atr_percent']

    # Midlines (could use median, mean, or 0 for trend)
    trend_mid = 0
    atr_mid = y.median()

    fig, ax = plt.subplots(figsize=(8, 8))
    ax.scatter(x, y, alpha=0.5, edgecolor='k', s=40)

    # Draw quadrant lines
    ax.axvline(trend_mid, color='black', linestyle='--', alpha=0.7)
    ax.axhline(atr_mid, color='black', linestyle='--', alpha=0.7)

    # Annotate quadrants
    ax.text(0.75, 0.95, 'High Trend\nHigh ATR%', transform=ax.transAxes,
            ha='center', va='top', fontsize=12, color='darkgreen')
    ax.text(0.25, 0.95, 'Low Trend\nHigh ATR%', transform=ax.transAxes,
            ha='center', va='top', fontsize=12, color='red')
    ax.text(0.75, 0.05, 'High Trend\nLow ATR%', transform=ax.transAxes,
            ha='center', va='bottom', fontsize=12, color='green')
    ax.text(0.25, 0.05, 'Low Trend\nLow ATR%', transform=ax.transAxes,
            ha='center', va='bottom', fontsize=12, color='gray')

    ax.set_xlabel('Trend Score')
    ax.set_ylabel('ATR %')
    ax.set_title(f'ATR% vs Trend Score Quadrants – {symbol}')
    plt.show()

# Example usage:
plot_atr_trend_quadrants(indicators_by_symbol['AAPL'], 'AAPL')

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

def plot_feature_heatmap(df, x_col, y_col, price_col='adjclose', horizon=5, bins=6,
                         use_quantile_bins=True, cmap='RdYlGn', title=None,
                         mark_current=True):
    """
    Heatmap of mean forward return by (x,y) feature bins, with support counts.
    If mark_current=True, overlay a larger, semi-transparent marker at the most recent (x,y) point.
    """
    d = df[[x_col, y_col, price_col]].dropna().copy()
    d['fwd_ret'] = d[price_col].shift(-horizon) / d[price_col] - 1.0
    d = d.dropna(subset=['fwd_ret'])

    # Bin features
    if use_quantile_bins:
        x_bins = pd.qcut(d[x_col], q=bins, duplicates='drop')
        y_bins = pd.qcut(d[y_col], q=bins, duplicates='drop')
    else:
        x_bins = pd.cut(d[x_col], bins=bins)
        y_bins = pd.cut(d[y_col], bins=bins)

    # Aggregations
    pivot_mean = pd.pivot_table(d, index=y_bins, columns=x_bins, values='fwd_ret', aggfunc='mean')
    pivot_cnt  = pd.pivot_table(d, index=y_bins, columns=x_bins, values='fwd_ret', aggfunc='count')

    fig, ax = plt.subplots(figsize=(8, 7))
    im = ax.imshow(pivot_mean.values, cmap=cmap, origin='lower',
                   vmin=np.nanpercentile(pivot_mean.values, 5),
                   vmax=np.nanpercentile(pivot_mean.values, 95))

    # Axis ticks/labels
    ax.set_xticks(range(pivot_mean.shape[1]))
    ax.set_yticks(range(pivot_mean.shape[0]))
    ax.set_xticklabels([str(c) for c in pivot_mean.columns], rotation=45, ha='right')
    ax.set_yticklabels([str(i) for i in pivot_mean.index])

    # Annotate with counts (support)
    for i in range(pivot_mean.shape[0]):
        for j in range(pivot_mean.shape[1]):
            val = pivot_mean.values[i, j]
            cnt = pivot_cnt.values[i, j]
            if np.isfinite(val) and cnt > 0:
                ax.text(j, i, f"{val*100:.1f}%\n(n={int(cnt)})",
                        ha='center', va='center', fontsize=8, color='black')

    # ---- Mark the most recent (x,y) point ----
    if mark_current:
        latest_idx = df[[x_col, y_col]].dropna().index.max()
        if pd.notna(latest_idx):
            cur_x = df.loc[latest_idx, x_col]
            cur_y = df.loc[latest_idx, y_col]

            # Map to bin indices
            x_cats = x_bins.cat.categories
            y_cats = y_bins.cat.categories
            xi = x_cats.get_indexer([cur_x])[0]
            yi = y_cats.get_indexer([cur_y])[0]

            if xi < 0:
                xi = np.clip(np.searchsorted(x_cats.left, cur_x) - 1, 0, len(x_cats)-1)
            if yi < 0:
                yi = np.clip(np.searchsorted(y_cats.left, cur_y) - 1, 0, len(y_cats)-1)

            # Bigger, transparent marker
            ax.scatter(xi, yi, s=300, color='cyan', alpha=0.5,
                       edgecolor='black', linewidths=1.2, zorder=5)
            ax.annotate(str(latest_idx.date()), (xi, yi), xytext=(10, 10),
                        textcoords='offset points', fontsize=9, color='black',
                        bbox=dict(boxstyle='round,pad=0.2', fc='white', alpha=0.6))

    ax.set_xlabel(x_col)
    ax.set_ylabel(y_col)
    ax.set_title(title or f"{y_col} vs {x_col} — mean {horizon}D forward return")
    cbar = plt.colorbar(im, ax=ax)
    cbar.set_label('Mean forward return')

    plt.tight_layout()
    plt.show()

# Example: trend vs hurst map for one symbol
df = indicators_by_symbol['TSLA']  # or any symbol
plot_feature_heatmap(df, x_col='rel_strength_sector', y_col='pct_dist_ma_200_z',
                     price_col='adjclose', horizon=5, bins=6, use_quantile_bins=True,
                     title='AAPL: 10D mean return by Sector Strenght × Hurst (smoothed)')

In [None]:
# e.g., did we hit +2% within horizon?
d['hit_up_2pct'] = (d[price_col].shift(-horizon).rolling(horizon).max() / d[price_col] - 1 >= 0.02).astype(int)
pivot_mean = pd.pivot_table(d, index=y_bins, columns=x_bins, values='hit_up_2pct', aggfunc='mean')

In [None]:
import pandas as pd

df["ret_10d"] = df["adjclose"].shift(-10) / df["adjclose"] - 1
target_col = "ret_10d"  # replace with your actual 10-day return column name

# Compute Pearson correlations
corr_with_target = df.corr(method="pearson")[target_col].drop(target_col)

# Sort descending by absolute correlation strength
top2 = corr_with_target.abs().sort_values(ascending=False).head(20)

print("Top 5 features correlated with 10-day return:")
print(top2)

# Optional: actual signed correlations
print("\nSigned correlations:")
print(corr_with_target[top2.index])

In [None]:
df.columsn

In [None]:
# Load the features parquet file
import pandas as pd

features_df = pd.read_parquet('../artifacts/features_long.parquet')
print(f"✅ Loaded features: {features_df.shape}")
print(f"Columns: {list(features_df.columns)}")
print(f"Date range: {features_df['date'].min()} to {features_df['date'].max()}")
print(f"Unique symbols: {features_df['symbol'].nunique()}")
print(f"Sample data:")
print(features_df.tail())