# 02 — Signal Research: Microstructure Features from Tick Data

**Goal:** Build features from raw tick trades that capture *what happened inside each candle* and evaluate their predictiveness for forward returns at 5m–1h horizons.

**Feature Categories:**
1. **Aggression** — taker buy/sell pressure, large trade detection
2. **Flow patterns** — trade arrival rate, clustering, acceleration
3. **Price impact** — how much price moves per unit of volume
4. **Volume profile** — distribution of volume within the candle
5. **Cross-exchange divergence** — when one venue sees unusual activity

**Evaluation:** All signals tested against forward returns at 5m, 15m, 1h with a **minimum edge threshold of 15-20 bps** (to clear VIP0 fees + slippage).

**Data:** BTCUSDT tick trades + OHLCV, 92 days, 6 sources

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from scipy import stats

plt.rcParams['figure.figsize'] = (14, 5)
plt.rcParams['figure.dpi'] = 100
plt.rcParams['axes.grid'] = True
plt.rcParams['grid.alpha'] = 0.3

PARQUET_DIR = Path('../parquet')
SYMBOL = 'BTCUSDT'

FUTURES_SOURCES = ['binance_futures', 'bybit_futures', 'okx_futures']
SOURCE_LABELS = {
    'binance_futures': 'Binance', 'bybit_futures': 'Bybit', 'okx_futures': 'OKX',
}
SOURCE_COLORS = {
    'binance_futures': '#F0B90B', 'bybit_futures': '#FF6B00', 'okx_futures': '#00C4B4',
}

# ---------------------------------------------------------------------------
# Loaders
# ---------------------------------------------------------------------------

def load_trades_day(symbol, source, date):
    """Load trades for a single day."""
    path = PARQUET_DIR / symbol / 'trades' / source / f'{date}.parquet'
    if not path.exists():
        return pd.DataFrame()
    return pd.read_parquet(path)

def load_trades_range(symbol, source, start_date, end_date):
    """Load trades for a date range."""
    dates = pd.date_range(start_date, end_date)
    dfs = []
    for d in dates:
        df = load_trades_day(symbol, source, d.strftime('%Y-%m-%d'))
        if not df.empty:
            dfs.append(df)
    if not dfs:
        return pd.DataFrame()
    return pd.concat(dfs, ignore_index=True)

def load_ohlcv(symbol, interval, source):
    """Load all OHLCV for a source."""
    ohlcv_dir = PARQUET_DIR / symbol / 'ohlcv' / interval / source
    if not ohlcv_dir.exists():
        return pd.DataFrame()
    files = sorted(ohlcv_dir.glob('*.parquet'))
    if not files:
        return pd.DataFrame()
    df = pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)
    df = df.sort_values('timestamp_us').reset_index(drop=True)
    return df

print(f'Ready. Symbol: {SYMBOL}')

## 1. Build Microstructure Features from Tick Data

For each 5-minute window, we compute features from raw trades that describe the *character* of activity inside the candle. We process day-by-day to keep memory bounded.

In [None]:
def compute_microstructure_features(trades: pd.DataFrame, interval_us: int = 300_000_000) -> pd.DataFrame:
    """
    Compute microstructure features from raw tick trades, aggregated into fixed intervals.
    
    Args:
        trades: DataFrame with columns [timestamp_us, price, quantity, quote_quantity, side]
        interval_us: aggregation interval in microseconds (default 5 min = 300_000_000)
    
    Returns:
        DataFrame indexed by interval start timestamp with microstructure features.
    """
    ts = trades['timestamp_us'].values
    price = trades['price'].values
    qty = trades['quantity'].values
    quote_qty = trades['quote_quantity'].values
    side = trades['side'].values  # 1=buy, -1=sell
    
    # Assign each trade to an interval bucket
    bucket = (ts // interval_us) * interval_us
    trades = trades.copy()
    trades['bucket'] = bucket
    
    features = []
    
    for bkt, grp in trades.groupby('bucket'):
        p = grp['price'].values
        q = grp['quantity'].values
        qq = grp['quote_quantity'].values
        s = grp['side'].values
        t = grp['timestamp_us'].values
        n = len(grp)
        
        if n < 2:
            continue
        
        buy_mask = s == 1
        sell_mask = s == -1
        buy_vol = q[buy_mask].sum()
        sell_vol = q[sell_mask].sum()
        total_vol = q.sum()
        buy_quote = qq[buy_mask].sum()
        sell_quote = qq[sell_mask].sum()
        
        # --- 1. AGGRESSION FEATURES ---
        # Volume imbalance (normalized)
        vol_imbalance = (buy_vol - sell_vol) / max(total_vol, 1e-10)
        
        # Quote-weighted imbalance ($ flow)
        dollar_imbalance = (buy_quote - sell_quote) / max(buy_quote + sell_quote, 1e-10)
        
        # Large trade detection (trades > 90th percentile size)
        q90 = np.percentile(q, 90)
        large_mask = q >= q90
        large_buy_vol = q[large_mask & buy_mask].sum()
        large_sell_vol = q[large_mask & sell_mask].sum()
        large_imbalance = (large_buy_vol - large_sell_vol) / max(large_buy_vol + large_sell_vol, 1e-10)
        
        # Aggression ratio: what fraction of volume is from large trades
        large_vol_pct = q[large_mask].sum() / max(total_vol, 1e-10)
        
        # --- 2. FLOW PATTERN FEATURES ---
        # Trade count
        trade_count = n
        buy_count = buy_mask.sum()
        sell_count = sell_mask.sum()
        count_imbalance = (buy_count - sell_count) / max(n, 1)
        
        # Trade arrival rate (trades per second)
        duration_s = max((t[-1] - t[0]) / 1e6, 0.001)
        arrival_rate = n / duration_s
        
        # Trade clustering: std of inter-trade times (lower = more clustered)
        if n > 2:
            iti = np.diff(t).astype(np.float64)
            iti_mean = iti.mean()
            iti_std = iti.std()
            iti_cv = iti_std / max(iti_mean, 1)  # coefficient of variation
            # Burstiness: fraction of trades in the most active 20% of the interval
            sub_buckets = np.linspace(t[0], t[-1], 6)  # 5 sub-intervals
            sub_counts = np.histogram(t, bins=sub_buckets)[0]
            burstiness = sub_counts.max() / max(n, 1)
        else:
            iti_cv = 0
            burstiness = 1.0
        
        # Acceleration: trade rate in 2nd half vs 1st half
        mid_t = (t[0] + t[-1]) / 2
        first_half = (t < mid_t).sum()
        second_half = n - first_half
        trade_acceleration = (second_half - first_half) / max(n, 1)
        
        # --- 3. PRICE IMPACT FEATURES ---
        # VWAP
        vwap = qq.sum() / max(total_vol, 1e-10)
        
        # Price range
        price_range = (p.max() - p.min()) / max(vwap, 1e-10)  # normalized
        
        # Close vs VWAP (where did price end relative to average)
        close_vs_vwap = (p[-1] - vwap) / max(vwap, 1e-10)
        
        # Kyle's lambda proxy: price impact per unit volume
        # Regress price changes on signed volume
        if n > 10:
            signed_vol = q * s
            price_changes = np.diff(p)
            if len(price_changes) > 1 and signed_vol[1:].std() > 0:
                kyle_lambda = np.corrcoef(signed_vol[1:], price_changes)[0, 1]
            else:
                kyle_lambda = 0
        else:
            kyle_lambda = 0
        
        # Amihud illiquidity: |return| / volume
        ret = (p[-1] - p[0]) / max(p[0], 1e-10)
        amihud = abs(ret) / max(total_vol, 1e-10)
        
        # --- 4. VOLUME PROFILE FEATURES ---
        # Volume at high vs low: where in the price range did most volume trade?
        price_mid = (p.max() + p.min()) / 2
        vol_above_mid = q[p >= price_mid].sum()
        vol_below_mid = q[p < price_mid].sum()
        vol_profile_skew = (vol_above_mid - vol_below_mid) / max(total_vol, 1e-10)
        
        # Volume-weighted price std (dispersion)
        if total_vol > 0:
            vol_weighted_std = np.sqrt(np.average((p - vwap)**2, weights=q))
        else:
            vol_weighted_std = 0
        
        # --- 5. OHLCV-DERIVED ---
        open_price = p[0]
        close_price = p[-1]
        high_price = p.max()
        low_price = p.min()
        
        # Upper/lower wick ratios
        body = abs(close_price - open_price)
        full_range = high_price - low_price
        if full_range > 0:
            upper_wick = (high_price - max(open_price, close_price)) / full_range
            lower_wick = (min(open_price, close_price) - low_price) / full_range
        else:
            upper_wick = 0
            lower_wick = 0
        
        features.append({
            'timestamp_us': bkt,
            # Aggression
            'vol_imbalance': vol_imbalance,
            'dollar_imbalance': dollar_imbalance,
            'large_imbalance': large_imbalance,
            'large_vol_pct': large_vol_pct,
            # Flow
            'trade_count': trade_count,
            'count_imbalance': count_imbalance,
            'arrival_rate': arrival_rate,
            'iti_cv': iti_cv,
            'burstiness': burstiness,
            'trade_acceleration': trade_acceleration,
            # Price impact
            'vwap': vwap,
            'price_range': price_range,
            'close_vs_vwap': close_vs_vwap,
            'kyle_lambda': kyle_lambda,
            'amihud': amihud,
            # Volume profile
            'vol_profile_skew': vol_profile_skew,
            'vol_weighted_std': vol_weighted_std,
            # Candle shape
            'upper_wick': upper_wick,
            'lower_wick': lower_wick,
            # Raw
            'open': open_price,
            'close': close_price,
            'high': high_price,
            'low': low_price,
            'volume': total_vol,
            'buy_volume': buy_vol,
            'sell_volume': sell_vol,
            'quote_volume': buy_quote + sell_quote,
        })
    
    return pd.DataFrame(features)

print(f'Feature builder ready. {len("compute_microstructure_features")} features per bar.')

In [None]:
%%time
# Build features for Binance Futures, processing day-by-day to keep memory bounded
# Using 5-minute intervals

source = 'binance_futures'
INTERVAL_US = 300_000_000  # 5 min

dates = sorted([f.stem for f in (PARQUET_DIR / SYMBOL / 'trades' / source).glob('*.parquet')])
print(f'Processing {len(dates)} days for {source}...')

all_features = []
for i, date in enumerate(dates):
    trades = load_trades_day(SYMBOL, source, date)
    if trades.empty:
        continue
    feat = compute_microstructure_features(trades, INTERVAL_US)
    all_features.append(feat)
    if (i + 1) % 10 == 0 or i == len(dates) - 1:
        print(f'  [{i+1}/{len(dates)}] {date}  bars={len(feat)}')
    del trades

features_bn = pd.concat(all_features, ignore_index=True).sort_values('timestamp_us').reset_index(drop=True)
features_bn['datetime'] = pd.to_datetime(features_bn['timestamp_us'], unit='us', utc=True)
features_bn['returns'] = features_bn['close'].pct_change()

print(f'\nTotal: {len(features_bn):,} bars, {len(features_bn.columns)} columns')
print(f'Date range: {features_bn["datetime"].min()} → {features_bn["datetime"].max()}')
features_bn.head(3)

In [None]:
%%time
# Build features for Bybit and OKX Futures too
features_all = {'binance_futures': features_bn}

for source in ['bybit_futures', 'okx_futures']:
    dates = sorted([f.stem for f in (PARQUET_DIR / SYMBOL / 'trades' / source).glob('*.parquet')])
    print(f'Processing {len(dates)} days for {source}...')
    
    all_feat = []
    for i, date in enumerate(dates):
        trades = load_trades_day(SYMBOL, source, date)
        if trades.empty:
            continue
        feat = compute_microstructure_features(trades, INTERVAL_US)
        all_feat.append(feat)
        if (i + 1) % 30 == 0 or i == len(dates) - 1:
            print(f'  [{i+1}/{len(dates)}] {date}')
        del trades
    
    df = pd.concat(all_feat, ignore_index=True).sort_values('timestamp_us').reset_index(drop=True)
    df['datetime'] = pd.to_datetime(df['timestamp_us'], unit='us', utc=True)
    df['returns'] = df['close'].pct_change()
    features_all[source] = df
    print(f'  → {len(df):,} bars\n')

print('All sources processed.')

## 2. Feature Overview & Distributions

Quick sanity check on feature distributions before signal evaluation.

In [None]:
df = features_bn  # Use Binance as primary for analysis

feature_cols = [
    'vol_imbalance', 'dollar_imbalance', 'large_imbalance', 'large_vol_pct',
    'count_imbalance', 'arrival_rate', 'iti_cv', 'burstiness', 'trade_acceleration',
    'price_range', 'close_vs_vwap', 'kyle_lambda', 'amihud',
    'vol_profile_skew', 'vol_weighted_std', 'upper_wick', 'lower_wick',
]

print(f'Binance Futures — Feature Summary ({len(df):,} bars)')
print(f'{"─" * 90}')
print(f'{"Feature":25s} {"Mean":>10s} {"Std":>10s} {"Min":>10s} {"P5":>10s} {"P95":>10s} {"Max":>10s}')
print(f'{"─" * 90}')
for col in feature_cols:
    s = df[col]
    print(f'{col:25s} {s.mean():>10.4f} {s.std():>10.4f} {s.min():>10.4f} '
          f'{s.quantile(0.05):>10.4f} {s.quantile(0.95):>10.4f} {s.max():>10.4f}')

# Plot key feature distributions
fig, axes = plt.subplots(3, 3, figsize=(16, 12))
fig.suptitle(f'{SYMBOL} — Microstructure Feature Distributions (Binance Futures, 5m)', fontsize=14, fontweight='bold')

plot_features = ['vol_imbalance', 'dollar_imbalance', 'large_imbalance',
                 'trade_acceleration', 'kyle_lambda', 'close_vs_vwap',
                 'burstiness', 'arrival_rate', 'price_range']

for ax, feat in zip(axes.flat, plot_features):
    vals = df[feat].dropna()
    # Clip for visualization
    lo, hi = vals.quantile(0.01), vals.quantile(0.99)
    ax.hist(vals.clip(lo, hi), bins=100, alpha=0.7, color='steelblue', density=True)
    ax.set_title(feat)
    ax.axvline(0, color='red', linestyle='--', alpha=0.3)

plt.tight_layout()
plt.show()

## 3. Signal Evaluation: Feature → Forward Return Predictiveness

For each feature, compute:
- **IC (Information Coefficient)**: rank correlation with forward returns at 5m, 15m, 1h
- **Decile spread**: mean return of top decile minus bottom decile
- **IC stability**: rolling IC to check if signal is consistent over time

In [None]:
def add_forward_returns(df):
    """Add forward returns at multiple horizons."""
    df = df.copy()
    df['fwd_1bar'] = df['close'].pct_change(1).shift(-1)    # 5m forward
    df['fwd_3bar'] = df['close'].pct_change(3).shift(-3)    # 15m forward
    df['fwd_12bar'] = df['close'].pct_change(12).shift(-12)  # 1h forward
    return df

def compute_ic_table(df, feature_cols, fwd_cols=['fwd_1bar', 'fwd_3bar', 'fwd_12bar']):
    """Compute rank IC (Spearman) for each feature vs each forward return horizon."""
    results = []
    for feat in feature_cols:
        row = {'feature': feat}
        for fwd in fwd_cols:
            clean = df[[feat, fwd]].dropna()
            if len(clean) < 100:
                row[fwd] = np.nan
                continue
            ic, pval = stats.spearmanr(clean[feat], clean[fwd])
            row[fwd] = ic
            row[f'{fwd}_pval'] = pval
        results.append(row)
    return pd.DataFrame(results).set_index('feature')

def compute_decile_spread(df, feature_cols, fwd_col='fwd_3bar'):
    """Compute mean forward return by decile for each feature."""
    results = []
    for feat in feature_cols:
        clean = df[[feat, fwd_col]].dropna()
        if len(clean) < 100:
            continue
        clean['decile'] = pd.qcut(clean[feat], q=10, labels=False, duplicates='drop')
        decile_ret = clean.groupby('decile')[fwd_col].mean()
        spread = (decile_ret.iloc[-1] - decile_ret.iloc[0]) * 10000  # bps
        results.append({'feature': feat, 'D10_minus_D1_bps': spread,
                       'D1_bps': decile_ret.iloc[0] * 10000,
                       'D10_bps': decile_ret.iloc[-1] * 10000})
    return pd.DataFrame(results).set_index('feature')

# Add forward returns to Binance features
df = add_forward_returns(features_bn)

# Compute IC table
ic_table = compute_ic_table(df, feature_cols)
print(f'{SYMBOL} — Information Coefficient (Rank IC) vs Forward Returns')
print(f'{"─" * 80}')
print(f'{"Feature":25s} {"IC 5m":>10s} {"IC 15m":>10s} {"IC 1h":>10s}  {"Significant?":>12s}')
print(f'{"─" * 80}')
for feat in feature_cols:
    ic5 = ic_table.loc[feat, 'fwd_1bar']
    ic15 = ic_table.loc[feat, 'fwd_3bar']
    ic1h = ic_table.loc[feat, 'fwd_12bar']
    # Mark significant if |IC| > 0.02 and p < 0.01
    sig = ''
    for fwd in ['fwd_1bar', 'fwd_3bar', 'fwd_12bar']:
        pval_col = f'{fwd}_pval'
        if pval_col in ic_table.columns:
            if abs(ic_table.loc[feat, fwd]) > 0.02 and ic_table.loc[feat, pval_col] < 0.01:
                sig = '***'
                break
            elif abs(ic_table.loc[feat, fwd]) > 0.01 and ic_table.loc[feat, pval_col] < 0.01:
                sig = '**'
    print(f'{feat:25s} {ic5:>10.4f} {ic15:>10.4f} {ic1h:>10.4f}  {sig:>12s}')

# Highlight top signals
print(f'\n{"=" * 60}')
print('Top signals by |IC| at 15m horizon:')
top = ic_table['fwd_3bar'].abs().sort_values(ascending=False).head(8)
for feat, ic in top.items():
    print(f'  {feat:25s}  IC={ic_table.loc[feat, "fwd_3bar"]:+.4f}')

In [None]:
# Decile spread analysis for 15m forward returns
spread_table = compute_decile_spread(df, feature_cols, 'fwd_3bar')

print(f'{SYMBOL} — Decile Spread (15m forward return, bps)')
print(f'{"─" * 70}')
print(f'{"Feature":25s} {"D1 (low)":>10s} {"D10 (high)":>10s} {"Spread":>10s}  {"Tradeable?":>10s}')
print(f'{"─" * 70}')
for feat in spread_table.index:
    d1 = spread_table.loc[feat, 'D1_bps']
    d10 = spread_table.loc[feat, 'D10_bps']
    spread = spread_table.loc[feat, 'D10_minus_D1_bps']
    tradeable = '✓' if abs(spread) > 15 else ''
    print(f'{feat:25s} {d1:>10.2f} {d10:>10.2f} {spread:>10.2f}  {tradeable:>10s}')

# Plot decile returns for top features
top_feats = spread_table['D10_minus_D1_bps'].abs().sort_values(ascending=False).head(6).index.tolist()

fig, axes = plt.subplots(2, 3, figsize=(18, 9))
fig.suptitle(f'{SYMBOL} — 15m Forward Return by Feature Decile (Binance Futures)', fontsize=14, fontweight='bold')

for ax, feat in zip(axes.flat, top_feats):
    clean = df[[feat, 'fwd_3bar']].dropna().copy()
    clean['decile'] = pd.qcut(clean[feat], q=10, labels=False, duplicates='drop')
    decile_ret = clean.groupby('decile')['fwd_3bar'].mean() * 10000
    colors = ['#d32f2f' if v < 0 else '#388e3c' for v in decile_ret.values]
    ax.bar(decile_ret.index, decile_ret.values, color=colors, alpha=0.7)
    ax.axhline(0, color='black', linewidth=0.5)
    ax.set_title(f'{feat}\nspread={spread_table.loc[feat, "D10_minus_D1_bps"]:.1f} bps')
    ax.set_xlabel('Decile (0=low, 9=high)')
    ax.set_ylabel('Mean 15m Fwd Return (bps)')

plt.tight_layout()
plt.show()

## 4. IC Stability Over Time

A signal is only useful if it's consistent. Check rolling IC to see if top features maintain predictiveness across the 3-month period.

In [None]:
# Rolling IC for top features (weekly windows)
top_feats_for_stability = spread_table['D10_minus_D1_bps'].abs().sort_values(ascending=False).head(6).index.tolist()

df['date'] = df['datetime'].dt.date
weekly_groups = df.groupby(pd.Grouper(key='datetime', freq='W'))

fig, axes = plt.subplots(2, 3, figsize=(18, 8))
fig.suptitle(f'{SYMBOL} — Rolling Weekly IC (feature vs 15m fwd return)', fontsize=14, fontweight='bold')

for ax, feat in zip(axes.flat, top_feats_for_stability):
    weekly_ics = []
    weekly_dates = []
    for week_start, grp in weekly_groups:
        clean = grp[[feat, 'fwd_3bar']].dropna()
        if len(clean) < 50:
            continue
        ic, _ = stats.spearmanr(clean[feat], clean['fwd_3bar'])
        weekly_ics.append(ic)
        weekly_dates.append(week_start)
    
    if not weekly_ics:
        continue
    
    colors = ['#388e3c' if v > 0 else '#d32f2f' for v in weekly_ics]
    ax.bar(weekly_dates, weekly_ics, width=5, color=colors, alpha=0.7)
    ax.axhline(0, color='black', linewidth=0.5)
    mean_ic = np.mean(weekly_ics)
    hit_rate = np.mean([1 if ic * mean_ic > 0 else 0 for ic in weekly_ics])
    ax.set_title(f'{feat}\nmean IC={mean_ic:+.4f}, consistency={hit_rate:.0%}')
    ax.set_ylabel('Weekly IC')
    ax.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

## 5. Cross-Exchange Feature Comparison

Do the same features have predictive power across all 3 exchanges? If so, the signal is more robust.

In [None]:
# Compare IC across all 3 exchanges
print(f'{SYMBOL} — Cross-Exchange IC Comparison (15m forward return)')
print(f'{"─" * 75}')
print(f'{"Feature":25s} {"Binance":>12s} {"Bybit":>12s} {"OKX":>12s} {"Consistent?":>12s}')
print(f'{"─" * 75}')

cross_ic = {}
for source in FUTURES_SOURCES:
    df_src = add_forward_returns(features_all[source])
    ic_src = compute_ic_table(df_src, feature_cols, ['fwd_3bar'])
    cross_ic[source] = ic_src['fwd_3bar']

for feat in feature_cols:
    ics = [cross_ic[src].get(feat, np.nan) for src in FUTURES_SOURCES]
    # Consistent if all same sign and all |IC| > 0.005
    same_sign = all(ic > 0 for ic in ics if not np.isnan(ic)) or all(ic < 0 for ic in ics if not np.isnan(ic))
    all_meaningful = all(abs(ic) > 0.005 for ic in ics if not np.isnan(ic))
    consistent = '✓' if same_sign and all_meaningful else ''
    print(f'{feat:25s} {ics[0]:>12.4f} {ics[1]:>12.4f} {ics[2]:>12.4f} {consistent:>12s}')

# Highlight features consistent across all exchanges
print(f'\n{"=" * 60}')
print('Features with consistent signal across all 3 exchanges:')

## 6. Cross-Exchange Volume Divergence

When one exchange sees unusual volume relative to others, it may signal informed flow. Build features that compare activity across venues.

In [None]:
# Merge features across exchanges on timestamp to build cross-exchange divergence features
bn = features_all['binance_futures'][['timestamp_us', 'vol_imbalance', 'dollar_imbalance',
    'large_imbalance', 'arrival_rate', 'trade_acceleration', 'volume', 'buy_volume',
    'sell_volume', 'close', 'kyle_lambda']].copy()
bb = features_all['bybit_futures'][['timestamp_us', 'vol_imbalance', 'dollar_imbalance',
    'large_imbalance', 'arrival_rate', 'trade_acceleration', 'volume', 'buy_volume',
    'sell_volume', 'close', 'kyle_lambda']].copy()
okx = features_all['okx_futures'][['timestamp_us', 'vol_imbalance', 'dollar_imbalance',
    'large_imbalance', 'arrival_rate', 'trade_acceleration', 'volume', 'buy_volume',
    'sell_volume', 'close', 'kyle_lambda']].copy()

# Merge all three
cross = bn.merge(bb, on='timestamp_us', suffixes=('_bn', '_bb'))
cross = cross.merge(okx, on='timestamp_us')
# Rename OKX columns
for col in ['vol_imbalance', 'dollar_imbalance', 'large_imbalance', 'arrival_rate',
            'trade_acceleration', 'volume', 'buy_volume', 'sell_volume', 'close', 'kyle_lambda']:
    cross.rename(columns={col: f'{col}_okx'}, inplace=True)

cross['datetime'] = pd.to_datetime(cross['timestamp_us'], unit='us', utc=True)

# --- Cross-exchange divergence features ---
# 1. Volume share divergence: is one exchange getting unusual share of total volume?
total_vol = cross['volume_bn'] + cross['volume_bb'] + cross['volume_okx']
cross['vol_share_bn'] = cross['volume_bn'] / total_vol
cross['vol_share_bb'] = cross['volume_bb'] / total_vol
cross['vol_share_okx'] = cross['volume_okx'] / total_vol

# Rolling z-score of volume share (unusual activity)
for exch in ['bn', 'bb', 'okx']:
    col = f'vol_share_{exch}'
    cross[f'vol_share_zscore_{exch}'] = (
        (cross[col] - cross[col].rolling(288).mean()) / cross[col].rolling(288).std()
    )  # 288 bars = 1 day of 5m bars

# 2. Imbalance divergence: when exchanges disagree on buy/sell pressure
cross['imbalance_consensus'] = (
    cross['vol_imbalance_bn'] + cross['vol_imbalance_bb'] + cross['vol_imbalance_okx']
) / 3
cross['imbalance_divergence'] = (
    (cross['vol_imbalance_bn'] - cross['imbalance_consensus']).abs() +
    (cross['vol_imbalance_bb'] - cross['imbalance_consensus']).abs() +
    (cross['vol_imbalance_okx'] - cross['imbalance_consensus']).abs()
) / 3

# 3. Large trade imbalance divergence
cross['large_imb_consensus'] = (
    cross['large_imbalance_bn'] + cross['large_imbalance_bb'] + cross['large_imbalance_okx']
) / 3

# 4. Binance-led imbalance: Binance imbalance minus average of others
cross['bn_imb_lead'] = cross['vol_imbalance_bn'] - (cross['vol_imbalance_bb'] + cross['vol_imbalance_okx']) / 2

# 5. Arrival rate divergence
total_rate = cross['arrival_rate_bn'] + cross['arrival_rate_bb'] + cross['arrival_rate_okx']
cross['rate_share_bn'] = cross['arrival_rate_bn'] / total_rate

# Forward returns (use Binance close as reference)
cross['fwd_1bar'] = cross['close_bn'].pct_change(1).shift(-1)
cross['fwd_3bar'] = cross['close_bn'].pct_change(3).shift(-3)
cross['fwd_12bar'] = cross['close_bn'].pct_change(12).shift(-12)

print(f'Cross-exchange dataset: {len(cross):,} matched 5m bars')
print(f'New features: vol_share_zscore, imbalance_consensus, imbalance_divergence, bn_imb_lead, etc.')

# Evaluate cross-exchange features
cross_features = [
    'vol_share_zscore_bn', 'vol_share_zscore_bb', 'vol_share_zscore_okx',
    'imbalance_consensus', 'imbalance_divergence',
    'large_imb_consensus', 'bn_imb_lead', 'rate_share_bn',
]

ic_cross = compute_ic_table(cross, cross_features, ['fwd_1bar', 'fwd_3bar', 'fwd_12bar'])
spread_cross = compute_decile_spread(cross, cross_features, 'fwd_3bar')

print(f'\n{SYMBOL} — Cross-Exchange Feature IC (15m forward)')
print(f'{"─" * 70}')
print(f'{"Feature":30s} {"IC 5m":>10s} {"IC 15m":>10s} {"IC 1h":>10s} {"Spread bps":>12s}')
print(f'{"─" * 70}')
for feat in cross_features:
    ic5 = ic_cross.loc[feat, 'fwd_1bar']
    ic15 = ic_cross.loc[feat, 'fwd_3bar']
    ic1h = ic_cross.loc[feat, 'fwd_12bar']
    sp = spread_cross.loc[feat, 'D10_minus_D1_bps'] if feat in spread_cross.index else np.nan
    print(f'{feat:30s} {ic5:>10.4f} {ic15:>10.4f} {ic1h:>10.4f} {sp:>12.1f}')

## 7. Volatility Regime Features

Volatility clustering was the strongest signal in profiling. Build regime features and test if they improve signal quality when combined with microstructure features.

In [None]:
# Build volatility regime features on Binance 5m data
df = features_bn.copy()

# Realized volatility (rolling std of returns)
df['returns'] = df['close'].pct_change()
df['rvol_12'] = df['returns'].rolling(12).std()   # 1h realized vol
df['rvol_60'] = df['returns'].rolling(60).std()    # 5h realized vol
df['rvol_288'] = df['returns'].rolling(288).std()  # 1d realized vol

# Vol ratio: short-term vs long-term (vol expansion/compression)
df['vol_ratio_12_60'] = df['rvol_12'] / df['rvol_60'].clip(lower=1e-10)
df['vol_ratio_12_288'] = df['rvol_12'] / df['rvol_288'].clip(lower=1e-10)

# Vol z-score: how unusual is current vol vs recent history
df['vol_zscore'] = (df['rvol_12'] - df['rvol_288']) / df['rvol_288'].rolling(288).std().clip(lower=1e-10)

# Vol acceleration: change in vol
df['vol_accel'] = df['rvol_12'].pct_change(6)  # 30m change in 1h vol

# Price range relative to vol (normalized range)
df['range_vs_vol'] = df['price_range'] / df['rvol_12'].clip(lower=1e-10)

# Combine vol regime with directional features
df['imb_x_vol'] = df['vol_imbalance'] * df['vol_ratio_12_288']  # imbalance amplified by vol expansion
df['large_imb_x_vol'] = df['large_imbalance'] * df['vol_ratio_12_288']
df['kyle_x_vol'] = df['kyle_lambda'] * df['vol_ratio_12_288']

# Forward returns
df['fwd_1bar'] = df['close'].pct_change(1).shift(-1)
df['fwd_3bar'] = df['close'].pct_change(3).shift(-3)
df['fwd_12bar'] = df['close'].pct_change(12).shift(-12)

# Evaluate vol features
vol_features = [
    'rvol_12', 'rvol_60', 'vol_ratio_12_60', 'vol_ratio_12_288',
    'vol_zscore', 'vol_accel', 'range_vs_vol',
    'imb_x_vol', 'large_imb_x_vol', 'kyle_x_vol',
]

ic_vol = compute_ic_table(df.dropna(), vol_features, ['fwd_1bar', 'fwd_3bar', 'fwd_12bar'])
spread_vol = compute_decile_spread(df.dropna(), vol_features, 'fwd_3bar')

print(f'{SYMBOL} — Volatility & Interaction Feature IC')
print(f'{"─" * 80}')
print(f'{"Feature":25s} {"IC 5m":>10s} {"IC 15m":>10s} {"IC 1h":>10s} {"Spread bps":>12s}')
print(f'{"─" * 80}')
for feat in vol_features:
    ic5 = ic_vol.loc[feat, 'fwd_1bar']
    ic15 = ic_vol.loc[feat, 'fwd_3bar']
    ic1h = ic_vol.loc[feat, 'fwd_12bar']
    sp = spread_vol.loc[feat, 'D10_minus_D1_bps'] if feat in spread_vol.index else np.nan
    marker = ' ✓' if abs(sp) > 15 else ''
    print(f'{feat:25s} {ic5:>10.4f} {ic15:>10.4f} {ic1h:>10.4f} {sp:>12.1f}{marker}')

# Plot interaction features
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
fig.suptitle(f'{SYMBOL} — Vol-Interaction Features: 15m Fwd Return by Decile', fontsize=14, fontweight='bold')

for ax, feat in zip(axes.flat, ['imb_x_vol', 'large_imb_x_vol', 'kyle_x_vol']):
    clean = df[[feat, 'fwd_3bar']].dropna().copy()
    clean['decile'] = pd.qcut(clean[feat], q=10, labels=False, duplicates='drop')
    decile_ret = clean.groupby('decile')['fwd_3bar'].mean() * 10000
    colors = ['#d32f2f' if v < 0 else '#388e3c' for v in decile_ret.values]
    ax.bar(decile_ret.index, decile_ret.values, color=colors, alpha=0.7)
    ax.axhline(0, color='black', linewidth=0.5)
    sp = spread_vol.loc[feat, 'D10_minus_D1_bps'] if feat in spread_vol.index else 0
    ax.set_title(f'{feat}\nspread={sp:.1f} bps')
    ax.set_xlabel('Decile')
    ax.set_ylabel('Mean 15m Fwd Return (bps)')

plt.tight_layout()
plt.show()

## 8. Conditional Analysis: Signals in Different Vol Regimes

Do microstructure features work better in high-vol or low-vol environments?

In [None]:
# Split into vol regimes and compare feature IC
df_clean = df.dropna(subset=['vol_ratio_12_288', 'fwd_3bar']).copy()

# Define regimes: low vol (bottom 30%), normal (middle 40%), high vol (top 30%)
df_clean['vol_regime'] = pd.qcut(df_clean['vol_ratio_12_288'], q=[0, 0.3, 0.7, 1.0],
                                  labels=['low_vol', 'normal', 'high_vol'])

key_features = ['vol_imbalance', 'dollar_imbalance', 'large_imbalance',
                'kyle_lambda', 'close_vs_vwap', 'trade_acceleration',
                'count_imbalance', 'burstiness']

print(f'{SYMBOL} — IC by Volatility Regime (15m forward return)')
print(f'{"─" * 80}')
print(f'{"Feature":25s} {"Low Vol":>12s} {"Normal":>12s} {"High Vol":>12s} {"Best Regime":>12s}')
print(f'{"─" * 80}')

regime_ics = {}
for feat in key_features:
    row = {}
    for regime in ['low_vol', 'normal', 'high_vol']:
        subset = df_clean[df_clean['vol_regime'] == regime]
        clean = subset[[feat, 'fwd_3bar']].dropna()
        if len(clean) < 100:
            row[regime] = np.nan
            continue
        ic, _ = stats.spearmanr(clean[feat], clean['fwd_3bar'])
        row[regime] = ic
    regime_ics[feat] = row
    
    best = max(row, key=lambda k: abs(row.get(k, 0) or 0))
    print(f'{feat:25s} {row.get("low_vol", 0):>12.4f} {row.get("normal", 0):>12.4f} '
          f'{row.get("high_vol", 0):>12.4f} {best:>12s}')

# Plot: decile spread in high vol vs low vol for top features
fig, axes = plt.subplots(2, 4, figsize=(20, 9))
fig.suptitle(f'{SYMBOL} — Feature Decile Returns: High Vol vs Low Vol (15m fwd)', fontsize=14, fontweight='bold')

for i, feat in enumerate(key_features):
    ax = axes[i // 4, i % 4]
    for regime, color, ls in [('high_vol', '#d32f2f', '-'), ('low_vol', '#1565c0', '--')]:
        subset = df_clean[df_clean['vol_regime'] == regime].copy()
        clean = subset[[feat, 'fwd_3bar']].dropna().copy()
        if len(clean) < 100:
            continue
        clean['decile'] = pd.qcut(clean[feat], q=10, labels=False, duplicates='drop')
        decile_ret = clean.groupby('decile')['fwd_3bar'].mean() * 10000
        ax.plot(decile_ret.index, decile_ret.values, marker='o', markersize=4,
               color=color, linestyle=ls, label=regime, alpha=0.8)
    ax.axhline(0, color='black', linewidth=0.5)
    ax.set_title(feat, fontsize=10)
    ax.set_xlabel('Decile')
    ax.set_ylabel('bps')
    ax.legend(fontsize=7)

plt.tight_layout()
plt.show()

## 9. Summary & Feature Ranking

Consolidate all findings: rank features by IC, decile spread, consistency, and regime robustness.

In [None]:
# Final feature ranking: combine IC, spread, consistency, and cross-exchange agreement

all_features_ranked = feature_cols + ['imb_x_vol', 'large_imb_x_vol', 'kyle_x_vol']

# Recompute everything on the full df with vol features
df_full = df.dropna(subset=['fwd_3bar']).copy()

print(f'{SYMBOL} — FINAL FEATURE RANKING')
print(f'{"=" * 100}')
print(f'{"Feature":25s} {"IC 5m":>8s} {"IC 15m":>8s} {"IC 1h":>8s} '
      f'{"Spread":>8s} {"Consistent":>10s} {"Verdict":>10s}')
print(f'{"─" * 100}')

ic_final = compute_ic_table(df_full, all_features_ranked, ['fwd_1bar', 'fwd_3bar', 'fwd_12bar'])
spread_final = compute_decile_spread(df_full, all_features_ranked, 'fwd_3bar')

verdicts = []
for feat in all_features_ranked:
    ic5 = ic_final.loc[feat, 'fwd_1bar'] if feat in ic_final.index else 0
    ic15 = ic_final.loc[feat, 'fwd_3bar'] if feat in ic_final.index else 0
    ic1h = ic_final.loc[feat, 'fwd_12bar'] if feat in ic_final.index else 0
    sp = spread_final.loc[feat, 'D10_minus_D1_bps'] if feat in spread_final.index else 0
    
    # Check cross-exchange consistency (for base features only)
    consistent = '—'
    if feat in feature_cols:
        ics_cross = []
        for src in FUTURES_SOURCES:
            if feat in cross_ic.get(src, pd.Series()).index:
                ics_cross.append(cross_ic[src][feat])
        if len(ics_cross) == 3:
            same_sign = all(x > 0 for x in ics_cross) or all(x < 0 for x in ics_cross)
            consistent = '✓' if same_sign and all(abs(x) > 0.005 for x in ics_cross) else '✗'
    
    # Verdict
    if abs(sp) > 15 and abs(ic15) > 0.02:
        verdict = '★★★'
    elif abs(sp) > 10 and abs(ic15) > 0.015:
        verdict = '★★'
    elif abs(sp) > 5 and abs(ic15) > 0.01:
        verdict = '★'
    else:
        verdict = ''
    
    verdicts.append((feat, abs(ic15), sp, verdict))
    print(f'{feat:25s} {ic5:>8.4f} {ic15:>8.4f} {ic1h:>8.4f} {sp:>7.1f}bp {consistent:>10s} {verdict:>10s}')

# Sort by |IC 15m|
print(f'\n{"=" * 60}')
print('TOP FEATURES (sorted by |IC| at 15m):')
print(f'{"─" * 60}')
for feat, ic, sp, verdict in sorted(verdicts, key=lambda x: -x[1])[:10]:
    print(f'  {verdict:4s} {feat:25s}  IC={ic:+.4f}  spread={sp:+.1f} bps')

print(f'\n→ Features marked ★★★ have both strong IC and tradeable spread (>15 bps)')
print(f'→ Next step: combine top features into a composite signal and backtest')

# 02 — Signal Research: Microstructure Features from Tick Data

**Goal:** Build features from raw tick trades that capture *what happened inside each candle* and evaluate their predictiveness for forward returns at 5m–1h horizons.

**Feature Categories:**
1. **Aggression** — taker buy/sell pressure, large trade detection
2. **Flow patterns** — trade arrival rate, clustering, acceleration
3. **Price impact** — how much price moves per unit of volume
4. **Volume profile** — distribution of volume within the candle
5. **Cross-exchange divergence** — when one venue sees unusual activity

**Evaluation:** All signals tested against forward returns at 5m, 15m, 1h with a **minimum edge threshold of 15-20 bps** (to clear VIP0 fees + slippage).

**Data:** BTCUSDT tick trades + OHLCV, 92 days, 6 sources