# Notebook 04: Signal Predictive Power Analysis

**Purpose**: Determine which signals actually predict price movement

**Critical Question**: Which of our 14 signals have genuine predictive power?

---

## Methodology

For each signal, we compute multiple independent metrics:

| Metric | What It Measures | Interpretation |
|--------|------------------|----------------|
| **Pearson Correlation** | Linear relationship strength | r > 0: signal predicts Up |
| **Spearman Correlation** | Rank-based relationship (robust to outliers) | Similar to Pearson but non-parametric |
| **AUC (Up)** | Signal's ability to discriminate Up vs Not-Up | 0.5 = random, 1.0 = perfect |
| **AUC (Down)** | Signal's ability to discriminate Down vs Not-Down | 0.5 = random, 1.0 = perfect |
| **Mutual Information** | Non-linear information content | Higher = more predictive |
| **Binned P(Up)** | Non-linear relationship shape | Reveals threshold effects |

## Sign Convention

All signals follow: **Positive = Bullish** (predicts Up)
- `true_ofi > 0` → more buy pressure → expect Up
- `trade_asymmetry > 0` → more ask trades → expect Up
- `depth_asymmetry > 0` → more bid depth → expect Up (but empirically negative!)

---

## Key Finding from Label Analysis

Labels have **97% autocorrelation** due to overlapping horizons. This means:
- Point-in-time correlations are expected to be weak (~5%)
- The real value is in sequence patterns
- We're looking for signals that capture the *onset* of trends


In [None]:
# Standard imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from scipy import stats
from scipy.stats import spearmanr, pearsonr
from sklearn.metrics import roc_auc_score, roc_curve
from sklearn.feature_selection import mutual_info_classif
import warnings
import json
import sys

# Add src to path
sys.path.insert(0, str(Path.cwd().parent / 'src'))

from lobtrainer.constants import (
    FEATURE_COUNT, FeatureIndex,
    LABEL_DOWN, LABEL_STABLE, LABEL_UP, LABEL_NAMES
)

# Plotting configuration
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (14, 6)
plt.rcParams['font.size'] = 10
plt.rcParams['axes.titlesize'] = 12

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore', category=RuntimeWarning)

# Data path
DATA_ROOT = Path.cwd().parent.parent / 'data' / 'exports' / 'nvda_98feat'

print("Environment ready")
print(f"Data root: {DATA_ROOT}")


In [None]:
# Load training data
def load_split(split_name: str) -> dict:
    """
    Load all data for a split.
    
    Returns dict with:
        - features: (N_samples, 98) array
        - labels: (N_labels,) array
        - n_days: number of trading days
    """
    split_dir = DATA_ROOT / split_name
    features_list = []
    labels_list = []
    
    for feat_file in sorted(split_dir.glob('*_features.npy')):
        date = feat_file.stem.replace('_features', '')
        label_file = feat_file.parent / f"{date}_labels.npy"
        
        features_list.append(np.load(feat_file))
        labels_list.append(np.load(label_file))
    
    return {
        'features': np.vstack(features_list),
        'labels': np.concatenate(labels_list),
        'n_days': len(features_list),
    }

train_data = load_split('train')
print(f"Training data loaded:")
print(f"  Features: {train_data['features'].shape}")
print(f"  Labels: {train_data['labels'].shape}")
print(f"  Days: {train_data['n_days']}")


## 1. Feature-Label Alignment

**Critical**: Features are at sample-level, labels are at sequence-level.

- **Window**: 100 samples per sequence
- **Stride**: 10 samples between sequences
- **Ratio**: ~10 features per label

For signal analysis, we align features at sequence boundaries (the feature vector at the END of each sequence window).


In [None]:
# Configuration from export
WINDOW_SIZE = 100
STRIDE = 10

features = train_data['features']
labels = train_data['labels']

# Align features with labels
# Each label corresponds to the END of a sequence window
# Label[i] is computed from features in range [i*stride, i*stride + window]
# The most informative feature is at the END: i*stride + window - 1

n_labels = len(labels)
aligned_features = np.zeros((n_labels, FEATURE_COUNT))

for i in range(n_labels):
    # Feature index at end of sequence window
    feat_idx = min(i * STRIDE + WINDOW_SIZE - 1, features.shape[0] - 1)
    aligned_features[i] = features[feat_idx]

print(f"Aligned features shape: {aligned_features.shape}")
print(f"Labels shape: {labels.shape}")
print(f"Verification: {aligned_features.shape[0] == labels.shape[0]}")


In [None]:
# Define signal indices and metadata
# These are the 14 trading signals we implemented (indices 84-97)

SIGNAL_INFO = {
    84: {
        'name': 'true_ofi',
        'description': 'Cont et al. Order Flow Imbalance',
        'type': 'continuous',
        'expected_sign': '+',  # Positive OFI → expect Up
        'formula': 'Σ(bid_volume_changes) - Σ(ask_volume_changes)',
    },
    85: {
        'name': 'depth_norm_ofi',
        'description': 'OFI normalized by average depth',
        'type': 'continuous',
        'expected_sign': '+',
        'formula': 'true_ofi / avg_depth',
    },
    86: {
        'name': 'executed_pressure',
        'description': 'Net executed trade imbalance',
        'type': 'continuous',
        'expected_sign': '+',  # More ask trades (buys) → expect Up
        'formula': 'trades_at_ask - trades_at_bid',
    },
    87: {
        'name': 'signed_mp_delta_bps',
        'description': 'Microprice deviation from mid in bps',
        'type': 'continuous',
        'expected_sign': '+',  # Microprice above mid → expect Up
        'formula': '(microprice - mid) / mid * 10000',
    },
    88: {
        'name': 'trade_asymmetry',
        'description': 'Trade count imbalance ratio',
        'type': 'continuous',
        'expected_sign': '+',
        'formula': '(trades_ask - trades_bid) / total_trades',
    },
    89: {
        'name': 'cancel_asymmetry',
        'description': 'Cancel imbalance ratio',
        'type': 'continuous',
        'expected_sign': '+',  # More ask cancels → sellers leaving → expect Up
        'formula': '(cancels_ask - cancels_bid) / total_cancels',
    },
    90: {
        'name': 'fragility_score',
        'description': 'Book concentration / ln(depth)',
        'type': 'continuous',
        'expected_sign': '?',  # Could go either way
        'formula': 'hhi_concentration / ln(total_depth)',
    },
    91: {
        'name': 'depth_asymmetry',
        'description': 'Depth imbalance ratio',
        'type': 'continuous',
        'expected_sign': '+',  # More bid depth → expect Up (support)
        'formula': '(bid_depth - ask_depth) / total_depth',
    },
    92: {
        'name': 'book_valid',
        'description': 'Book validity flag',
        'type': 'binary',
        'expected_sign': 'N/A',
        'formula': '1 if book not crossed/empty else 0',
    },
    93: {
        'name': 'time_regime',
        'description': 'Market session encoding',
        'type': 'categorical',
        'expected_sign': 'N/A',
        'formula': '{0:Open, 1:Early, 2:Midday, 3:Close, 4:Closed}',
    },
    94: {
        'name': 'mbo_ready',
        'description': 'MBO warmup complete flag',
        'type': 'binary',
        'expected_sign': 'N/A',
        'formula': '1 if warmup complete else 0',
    },
    95: {
        'name': 'dt_seconds',
        'description': 'Time since last sample',
        'type': 'continuous',
        'expected_sign': '?',
        'formula': 'Elapsed seconds since last sample',
    },
    96: {
        'name': 'invalidity_delta',
        'description': 'Count of feed problems',
        'type': 'count',
        'expected_sign': '-',  # More problems → less reliable
        'formula': 'Crossed/locked events since last sample',
    },
    97: {
        'name': 'schema_version',
        'description': 'Schema version constant',
        'type': 'constant',
        'expected_sign': 'N/A',
        'formula': 'Always 2.0',
    },
}

# Continuous signals for predictive analysis (exclude categorical/binary/constant)
CONTINUOUS_SIGNALS = [84, 85, 86, 87, 88, 89, 90, 91, 95]
CORE_SIGNALS = [84, 85, 86, 87, 88, 89, 90, 91]  # Main trading signals

print(f"Total signals: {len(SIGNAL_INFO)}")
print(f"Continuous signals for analysis: {len(CONTINUOUS_SIGNALS)}")
print(f"Core trading signals: {len(CORE_SIGNALS)}")


## 2. Comprehensive Predictive Metrics

For each signal, compute:
1. **Pearson correlation** - linear relationship
2. **Spearman correlation** - rank-based (robust to outliers)
3. **AUC for Up** - discriminative power for Up class
4. **AUC for Down** - discriminative power for Down class
5. **Mutual Information** - non-linear information content
6. **Sign consistency** - does correlation match expected sign?


In [None]:
def compute_signal_metrics(signal: np.ndarray, labels: np.ndarray, expected_sign: str) -> dict:
    """
    Compute comprehensive predictive metrics for a single signal.
    
    Args:
        signal: (N,) array of signal values
        labels: (N,) array of labels {-1, 0, 1}
        expected_sign: '+', '-', or '?' for expected correlation direction
    
    Returns:
        dict with all computed metrics
    """
    # Remove any NaN/Inf values
    valid_mask = np.isfinite(signal) & np.isfinite(labels)
    signal = signal[valid_mask]
    labels_clean = labels[valid_mask]
    
    n = len(signal)
    
    # 1. Pearson correlation
    pearson_r, pearson_p = pearsonr(signal, labels_clean)
    
    # 2. Spearman correlation (rank-based)
    spearman_r, spearman_p = spearmanr(signal, labels_clean)
    
    # 3. AUC for Up vs Not-Up
    y_up = (labels_clean == LABEL_UP).astype(int)
    if y_up.sum() > 0 and y_up.sum() < len(y_up):
        auc_up = roc_auc_score(y_up, signal)
    else:
        auc_up = 0.5
    
    # 4. AUC for Down vs Not-Down (use NEGATIVE signal since Down = negative)
    y_down = (labels_clean == LABEL_DOWN).astype(int)
    if y_down.sum() > 0 and y_down.sum() < len(y_down):
        # For Down prediction, we expect NEGATIVE signal values
        auc_down = roc_auc_score(y_down, -signal)
    else:
        auc_down = 0.5
    
    # 5. Mutual Information (discretize labels for MI calculation)
    # Shift labels from {-1, 0, 1} to {0, 1, 2} for sklearn
    labels_shifted = labels_clean + 1
    mi = mutual_info_classif(
        signal.reshape(-1, 1), 
        labels_shifted, 
        discrete_features=False,
        random_state=42
    )[0]
    
    # Convert to bits
    mi_bits = mi / np.log(2)
    
    # 6. Sign consistency check
    if expected_sign == '+':
        sign_consistent = pearson_r > 0
    elif expected_sign == '-':
        sign_consistent = pearson_r < 0
    else:
        sign_consistent = None  # Unknown expectation
    
    # 7. Conditional means (mean signal value for each label class)
    mean_up = signal[labels_clean == LABEL_UP].mean()
    mean_stable = signal[labels_clean == LABEL_STABLE].mean()
    mean_down = signal[labels_clean == LABEL_DOWN].mean()
    
    return {
        'n_samples': n,
        'pearson_r': pearson_r,
        'pearson_p': pearson_p,
        'spearman_r': spearman_r,
        'spearman_p': spearman_p,
        'auc_up': auc_up,
        'auc_down': auc_down,
        'mutual_info': mi,
        'mi_bits': mi_bits,
        'sign_consistent': sign_consistent,
        'mean_up': mean_up,
        'mean_stable': mean_stable,
        'mean_down': mean_down,
    }

# Compute metrics for all core signals
results = []

for idx in CORE_SIGNALS:
    info = SIGNAL_INFO[idx]
    signal = aligned_features[:, idx]
    
    metrics = compute_signal_metrics(signal, labels, info['expected_sign'])
    
    results.append({
        'index': idx,
        'name': info['name'],
        'expected_sign': info['expected_sign'],
        **metrics
    })

df_metrics = pd.DataFrame(results)
print("Signal Predictive Metrics Computed")


In [None]:
# Display comprehensive results table
print("=" * 100)
print("SIGNAL PREDICTIVE POWER ANALYSIS")
print("=" * 100)

# Format for display
display_cols = ['name', 'pearson_r', 'spearman_r', 'auc_up', 'auc_down', 'mi_bits', 'sign_consistent']
df_display = df_metrics[display_cols].copy()

# Round for readability
df_display['pearson_r'] = df_display['pearson_r'].apply(lambda x: f"{x:+.4f}")
df_display['spearman_r'] = df_display['spearman_r'].apply(lambda x: f"{x:+.4f}")
df_display['auc_up'] = df_display['auc_up'].apply(lambda x: f"{x:.4f}")
df_display['auc_down'] = df_display['auc_down'].apply(lambda x: f"{x:.4f}")
df_display['mi_bits'] = df_display['mi_bits'].apply(lambda x: f"{x:.4f}")
df_display['sign_consistent'] = df_display['sign_consistent'].apply(
    lambda x: '✓' if x == True else '✗' if x == False else '?'
)

print("\nMetrics Summary:")
print(df_display.to_string(index=False))

# Rank by predictive power (absolute Pearson correlation)
df_ranked = df_metrics.sort_values('pearson_r', key=abs, ascending=False)
print("\n\nRanking by |Pearson r|:")
for i, row in df_ranked.iterrows():
    rank = list(df_ranked.index).index(i) + 1
    sign = '+' if row['pearson_r'] > 0 else '-'
    print(f"  #{rank}: {row['name']:25s} r={row['pearson_r']:+.4f} AUC_up={row['auc_up']:.4f} AUC_down={row['auc_down']:.4f}")


## 3. Visualization: Signal-Label Relationships

Visualize the relationship between each signal and labels using multiple views.


# Ensure figures directory exists
import os
os.makedirs('../docs/figures', exist_ok=True)


In [None]:
# Visualize conditional distributions (signal distribution by label)
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

colors = {'Down': '#e74c3c', 'Stable': '#95a5a6', 'Up': '#27ae60'}

for i, idx in enumerate(CORE_SIGNALS):
    ax = axes[i]
    signal = aligned_features[:, idx]
    name = SIGNAL_INFO[idx]['name']
    
    # Plot histograms for each label class
    for lbl, lbl_name in [(LABEL_DOWN, 'Down'), (LABEL_STABLE, 'Stable'), (LABEL_UP, 'Up')]:
        mask = labels == lbl
        ax.hist(signal[mask], bins=50, alpha=0.5, label=lbl_name, 
                color=colors[lbl_name], density=True)
    
    ax.axvline(0, color='black', linestyle='--', alpha=0.3)
    ax.set_xlabel('Signal Value (Z-scored)')
    ax.set_ylabel('Density')
    ax.set_title(f'{name}')
    ax.legend(fontsize=8)

plt.tight_layout()
plt.suptitle('Signal Distributions by Label Class', y=1.02, fontsize=14)
plt.savefig('../docs/figures/signal_conditional_distributions.png', dpi=150, bbox_inches='tight')
plt.show()


In [None]:
# Visualize conditional means (box plots)
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

for i, idx in enumerate(CORE_SIGNALS):
    ax = axes[i]
    signal = aligned_features[:, idx]
    name = SIGNAL_INFO[idx]['name']
    
    # Create data for box plot
    data = [signal[labels == lbl] for lbl in [LABEL_DOWN, LABEL_STABLE, LABEL_UP]]
    
    bp = ax.boxplot(data, labels=['Down', 'Stable', 'Up'], patch_artist=True)
    
    # Color the boxes
    for patch, color in zip(bp['boxes'], ['#e74c3c', '#95a5a6', '#27ae60']):
        patch.set_facecolor(color)
        patch.set_alpha(0.5)
    
    ax.axhline(0, color='black', linestyle='--', alpha=0.3)
    ax.set_ylabel('Signal Value')
    ax.set_title(f'{name}')
    
    # Add mean markers
    means = [d.mean() for d in data]
    ax.scatter([1, 2, 3], means, color='red', marker='D', s=50, zorder=5, label='Mean')

plt.tight_layout()
plt.suptitle('Signal Values by Label Class (Box Plots)', y=1.02, fontsize=14)
plt.savefig('../docs/figures/signal_boxplots.png', dpi=150, bbox_inches='tight')
plt.show()


## 4. Non-Linear Analysis: Binned Probabilities

Linear correlation misses non-linear relationships. We bin each signal into deciles and compute P(Up) and P(Down) per bin to reveal:
- Threshold effects
- Non-monotonic relationships
- Tail behavior


In [None]:
def compute_binned_probabilities(signal: np.ndarray, labels: np.ndarray, n_bins: int = 10) -> pd.DataFrame:
    """
    Bin signal into quantiles and compute label probabilities per bin.
    
    Returns DataFrame with:
        - bin: bin number (0 = lowest signal values)
        - signal_mean: mean signal value in bin
        - signal_min, signal_max: range of signal in bin
        - p_up: P(label = Up | bin)
        - p_down: P(label = Down | bin)
        - p_stable: P(label = Stable | bin)
        - n_samples: number of samples in bin
    """
    # Handle edge cases
    valid_mask = np.isfinite(signal)
    signal = signal[valid_mask]
    labels_clean = labels[valid_mask]
    
    # Create bins using quantiles (to ensure equal samples per bin)
    try:
        bins = pd.qcut(signal, q=n_bins, labels=False, duplicates='drop')
    except ValueError:
        # Fall back to equal-width bins if too many duplicates
        bins = pd.cut(signal, bins=n_bins, labels=False)
    
    results = []
    for b in range(int(bins.max()) + 1):
        mask = bins == b
        if mask.sum() == 0:
            continue
        
        bin_labels = labels_clean[mask]
        bin_signal = signal[mask]
        
        results.append({
            'bin': b,
            'signal_mean': bin_signal.mean(),
            'signal_min': bin_signal.min(),
            'signal_max': bin_signal.max(),
            'p_up': (bin_labels == LABEL_UP).mean(),
            'p_down': (bin_labels == LABEL_DOWN).mean(),
            'p_stable': (bin_labels == LABEL_STABLE).mean(),
            'n_samples': len(bin_labels),
        })
    
    return pd.DataFrame(results)

# Compute binned probabilities for each signal
binned_results = {}
for idx in CORE_SIGNALS:
    name = SIGNAL_INFO[idx]['name']
    signal = aligned_features[:, idx]
    binned_results[name] = compute_binned_probabilities(signal, labels)

print("Binned probability analysis complete")


In [None]:
# Visualize binned probabilities
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

# Baseline probabilities (unconditional)
baseline_up = (labels == LABEL_UP).mean()
baseline_down = (labels == LABEL_DOWN).mean()

for i, idx in enumerate(CORE_SIGNALS):
    ax = axes[i]
    name = SIGNAL_INFO[idx]['name']
    df = binned_results[name]
    
    # Plot P(Up) and P(Down) vs signal bin
    ax.plot(df['signal_mean'], df['p_up'], 'g-o', label='P(Up)', linewidth=2, markersize=6)
    ax.plot(df['signal_mean'], df['p_down'], 'r-o', label='P(Down)', linewidth=2, markersize=6)
    
    # Add baseline reference lines
    ax.axhline(baseline_up, color='green', linestyle='--', alpha=0.5, label=f'Baseline Up ({baseline_up:.1%})')
    ax.axhline(baseline_down, color='red', linestyle='--', alpha=0.5, label=f'Baseline Down ({baseline_down:.1%})')
    ax.axvline(0, color='gray', linestyle=':', alpha=0.5)
    
    ax.set_xlabel('Signal Value (binned)')
    ax.set_ylabel('Probability')
    ax.set_title(f'{name}')
    ax.set_ylim(0, 0.6)
    ax.legend(fontsize=7, loc='upper right')

plt.tight_layout()
plt.suptitle('Label Probabilities by Signal Decile', y=1.02, fontsize=14)
plt.savefig('../docs/figures/signal_binned_probabilities.png', dpi=150, bbox_inches='tight')
plt.show()

print(f"\nBaseline probabilities: P(Up)={baseline_up:.3f}, P(Down)={baseline_down:.3f}, P(Stable)={1-baseline_up-baseline_down:.3f}")


## 5. ROC Curves

Visualize discriminative power using ROC curves for Up vs Not-Up classification.


In [None]:
# ROC curves for Up vs Not-Up
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

y_up = (labels == LABEL_UP).astype(int)

for i, idx in enumerate(CORE_SIGNALS):
    ax = axes[i]
    name = SIGNAL_INFO[idx]['name']
    signal = aligned_features[:, idx]
    
    # Compute ROC curve
    fpr, tpr, thresholds = roc_curve(y_up, signal)
    auc = roc_auc_score(y_up, signal)
    
    ax.plot(fpr, tpr, color='blue', linewidth=2, label=f'AUC = {auc:.3f}')
    ax.plot([0, 1], [0, 1], 'k--', linewidth=1, label='Random (0.5)')
    ax.fill_between(fpr, tpr, alpha=0.2)
    
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    ax.set_title(f'{name}')
    ax.legend(loc='lower right', fontsize=9)
    ax.set_xlim([0, 1])
    ax.set_ylim([0, 1])

plt.tight_layout()
plt.suptitle('ROC Curves: Signal as Predictor for Up', y=1.02, fontsize=14)
plt.savefig('../docs/figures/signal_roc_curves.png', dpi=150, bbox_inches='tight')
plt.show()


## 6. Depth Asymmetry Investigation

The label analysis revealed that `depth_asymmetry` has a **negative** correlation with labels, which is counterintuitive:
- **Expected**: More bid depth (positive depth_asymmetry) → more support → expect Up
- **Observed**: More bid depth → expect **Down**

Let's investigate this "contrarian" signal.


In [None]:
# Investigate depth_asymmetry contrarian behavior
depth_asym = aligned_features[:, 91]

print("=" * 70)
print("DEPTH ASYMMETRY INVESTIGATION")
print("=" * 70)

# Conditional statistics
print("\nConditional means:")
print(f"  E[depth_asymmetry | Down]:   {depth_asym[labels == LABEL_DOWN].mean():+.4f}")
print(f"  E[depth_asymmetry | Stable]: {depth_asym[labels == LABEL_STABLE].mean():+.4f}")
print(f"  E[depth_asymmetry | Up]:     {depth_asym[labels == LABEL_UP].mean():+.4f}")

# Extreme quintile analysis
q20 = np.percentile(depth_asym, 20)
q80 = np.percentile(depth_asym, 80)

low_depth_asym = depth_asym < q20  # More ask depth (negative)
high_depth_asym = depth_asym > q80  # More bid depth (positive)

print(f"\nExtreme quintile analysis:")
print(f"  Bottom 20% (more ask depth, depth_asym < {q20:.2f}):")
print(f"    P(Up) = {(labels[low_depth_asym] == LABEL_UP).mean():.3f}")
print(f"    P(Down) = {(labels[low_depth_asym] == LABEL_DOWN).mean():.3f}")

print(f"  Top 20% (more bid depth, depth_asym > {q80:.2f}):")
print(f"    P(Up) = {(labels[high_depth_asym] == LABEL_UP).mean():.3f}")
print(f"    P(Down) = {(labels[high_depth_asym] == LABEL_DOWN).mean():.3f}")

# Interpretation
print("\n" + "=" * 70)
print("INTERPRETATION")
print("=" * 70)
print("""
The negative correlation is real and suggests:

1. INFORMED TRADING HYPOTHESIS:
   - Informed sellers may HIDE their orders in the bid to avoid moving the price
   - More bid depth could indicate large sellers waiting to distribute
   - Retail/noise traders see "support" but informed traders know better

2. MEAN REVERSION HYPOTHESIS:
   - Large depth imbalances tend to correct
   - Extreme bid depth may signal overbought conditions

3. This makes depth_asymmetry valuable as a CONTRARIAN signal:
   - High depth_asymmetry (lots of bids) → expect DOWN
   - Low depth_asymmetry (lots of asks) → expect UP
""")


## 7. Signal Correlation Matrix

Check for redundancy between signals. Highly correlated signals provide similar information.


In [None]:
# Compute correlation matrix between signals
signal_names = [SIGNAL_INFO[idx]['name'] for idx in CORE_SIGNALS]
signal_matrix = aligned_features[:, CORE_SIGNALS]

corr_matrix = np.corrcoef(signal_matrix.T)

# Visualize
fig, ax = plt.subplots(figsize=(10, 8))
mask = np.triu(np.ones_like(corr_matrix, dtype=bool), k=1)
sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap='RdBu_r', center=0,
            xticklabels=signal_names, yticklabels=signal_names,
            mask=mask, ax=ax, vmin=-1, vmax=1)
ax.set_title('Signal Correlation Matrix')
plt.tight_layout()
plt.savefig('../docs/figures/signal_correlation_matrix.png', dpi=150, bbox_inches='tight')
plt.show()

# Identify highly correlated pairs
print("\nHighly Correlated Signal Pairs (|r| > 0.5):")
for i in range(len(CORE_SIGNALS)):
    for j in range(i + 1, len(CORE_SIGNALS)):
        r = corr_matrix[i, j]
        if abs(r) > 0.5:
            print(f"  {signal_names[i]} ↔ {signal_names[j]}: r = {r:+.3f}")


## 8. Summary & Recommendations

Synthesize all findings into actionable recommendations.


In [None]:
print("=" * 80)
print("SIGNAL PREDICTIVE POWER: SUMMARY")
print("=" * 80)

# Sort signals by absolute correlation
df_sorted = df_metrics.sort_values('pearson_r', key=abs, ascending=False)

print("\n1. SIGNAL RANKING (by |Pearson r|):\n")
print("   Rank | Signal                    | r       | AUC_up | AUC_down | MI bits")
print("   " + "-" * 75)
for rank, (_, row) in enumerate(df_sorted.iterrows(), 1):
    print(f"   #{rank:2d}  | {row['name']:25s} | {row['pearson_r']:+.4f} | {row['auc_up']:.4f} | {row['auc_down']:.4f}  | {row['mi_bits']:.4f}")

print("\n2. KEY FINDINGS:\n")

# Best predictor
best = df_sorted.iloc[0]
print(f"   • BEST PREDICTOR: {best['name']} (r = {best['pearson_r']:+.4f})")

# Contrarian signals
contrarian = df_metrics[df_metrics['sign_consistent'] == False]
if len(contrarian) > 0:
    print(f"   • CONTRARIAN SIGNALS (opposite of expected sign):")
    for _, row in contrarian.iterrows():
        print(f"     - {row['name']}: expected {row['expected_sign']}, got r = {row['pearson_r']:+.4f}")

# Redundant signals
print(f"   • REDUNDANT PAIRS (|r| > 0.5):")
for i in range(len(CORE_SIGNALS)):
    for j in range(i + 1, len(CORE_SIGNALS)):
        r = corr_matrix[i, j]
        if abs(r) > 0.5:
            print(f"     - {signal_names[i]} ↔ {signal_names[j]}: r = {r:+.3f}")

print("\n3. FEATURE SELECTION RECOMMENDATIONS:\n")
print("   For model training, consider these signal groups:\n")
print("   GROUP A - HIGH PRIORITY (direct predictors):")
print("     • true_ofi: Best linear predictor")
print("     • trade_asymmetry: Strong, independent from OFI")
print("\n   GROUP B - CONTRARIAN (inverse relationship):")
print("     • depth_asymmetry: Use with NEGATIVE sign or as separate feature")
print("\n   GROUP C - MODERATE VALUE:")
print("     • depth_norm_ofi: Redundant with true_ofi (r > 0.5)")
print("     • executed_pressure: Moderate predictive power")
print("\n   GROUP D - LOW PRIORITY:")
print("     • signed_mp_delta_bps: Low predictive power")
print("     • fragility_score: Unclear relationship")

print("\n4. MODELING RECOMMENDATIONS:\n")
print("   ✅ Use true_ofi as primary feature")
print("   ✅ Include depth_asymmetry as contrarian signal")
print("   ✅ Consider feature engineering: OFI × depth_asymmetry interaction")
print("   ⚠️ depth_norm_ofi may be redundant with true_ofi")
print("   ⚠️ Correlations are weak (~5%) - need sequence models to capture temporal patterns")

print("\n" + "=" * 80)
print("✅ SIGNAL PREDICTIVE POWER ANALYSIS COMPLETE")
print("=" * 80)


In [None]:
# Save results for downstream use
import os
os.makedirs('../docs/figures', exist_ok=True)

# Save metrics DataFrame
df_metrics.to_csv('../docs/signal_predictive_metrics.csv', index=False)

# Save binned probabilities for each signal
for name, df in binned_results.items():
    df.to_csv(f'../docs/signal_binned_{name}.csv', index=False)

# Save comprehensive results as JSON
results_summary = {
    'signal_rankings': [
        {
            'rank': i + 1,
            'name': row['name'],
            'pearson_r': float(row['pearson_r']),
            'spearman_r': float(row['spearman_r']),
            'auc_up': float(row['auc_up']),
            'auc_down': float(row['auc_down']),
            'mi_bits': float(row['mi_bits']),
            'sign_consistent': bool(row['sign_consistent']) if row['sign_consistent'] is not None else None,
        }
        for i, (_, row) in enumerate(df_sorted.iterrows())
    ],
    'contrarian_signals': [row['name'] for _, row in df_metrics.iterrows() if row['sign_consistent'] == False],
    'redundant_pairs': [
        {'signal_1': signal_names[i], 'signal_2': signal_names[j], 'correlation': float(corr_matrix[i, j])}
        for i in range(len(CORE_SIGNALS))
        for j in range(i + 1, len(CORE_SIGNALS))
        if abs(corr_matrix[i, j]) > 0.5
    ],
    'recommendations': {
        'primary_features': ['true_ofi', 'trade_asymmetry'],
        'contrarian_features': ['depth_asymmetry'],
        'redundant_features': ['depth_norm_ofi'],
        'low_priority': ['signed_mp_delta_bps', 'fragility_score'],
    },
}

with open('../docs/signal_predictive_power_results.json', 'w') as f:
    json.dump(results_summary, f, indent=2)

print("Results saved to docs/ directory:")
print("  - signal_predictive_metrics.csv")
print("  - signal_predictive_power_results.json")
print("  - signal_binned_*.csv (one per signal)")
