# Experiment 1: LOCO vs CMI on Synthetic Data

This notebook compares the LOCO (Leave-One-Covariate-Out) and CMI (Conditional Mutual Information) 
methods for conditional independence testing.

## Objectives
1. Compare Type I error rates (null true scenario)
2. Compare statistical power (null false scenarios with varying effect sizes)
3. Compare runtime performance
4. Evaluate p-value calibration

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import time
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

import sys
sys.path.insert(0, '..')

from causal_grounding import create_ci_engine, CITestEngine
from causal_grounding.ci_tests_loco import LOCOCIEngine

print("Imports successful!")

## 1. Data Generation Functions

In [None]:
def generate_ci_test_data(
    n_samples: int,
    n_covariates: int,
    effect_size: float,
    seed: int = None
) -> pd.DataFrame:
    """
    Generate synthetic data for CI testing.
    
    Tests: Y _||_ X | W
    
    Args:
        n_samples: Number of samples
        n_covariates: Number of conditioning covariates
        effect_size: Effect of X on Y (0 = null true, >0 = null false)
        seed: Random seed
    
    Returns:
        DataFrame with X, Y, W1, W2, ..., Wk
    """
    if seed is not None:
        np.random.seed(seed)
    
    # Generate conditioning covariates W
    W = {}
    for i in range(n_covariates):
        W[f'W{i+1}'] = np.random.choice([0, 1, 2], size=n_samples)
    
    # Generate X (binary)
    X = np.random.binomial(1, 0.5, n_samples)
    
    # Generate Y depending on W and possibly X
    logit = -0.5
    for i, w_col in enumerate(W.values()):
        logit = logit + 0.3 * (w_col == 1) + 0.2 * (w_col == 2)
    
    # Add X effect (0 for null true, >0 for null false)
    logit = logit + effect_size * X
    
    prob_Y = 1 / (1 + np.exp(-logit))
    Y = np.random.binomial(1, prob_Y)
    
    # Build dataframe
    df = pd.DataFrame({'X': X, 'Y': Y})
    for name, vals in W.items():
        df[name] = vals
    
    return df

# Test the data generation
df_test = generate_ci_test_data(500, 3, effect_size=0, seed=42)
print(f"Generated data shape: {df_test.shape}")
print(f"Columns: {list(df_test.columns)}")
print(f"Y mean: {df_test['Y'].mean():.3f}")

## 2. Experiment Configuration

In [None]:
# Experiment parameters
N_REPLICATIONS = 50  # Number of datasets per condition
SAMPLE_SIZES = [200, 500, 1000]
N_COVARIATES_LIST = [2, 5]
EFFECT_SIZES = [0.0, 0.5, 1.0, 1.5]  # 0 = null true
ALPHA = 0.05  # Significance level

# Create engines
cmi_engine = create_ci_engine('cmi', n_permutations=200, random_seed=42)
loco_engine = create_ci_engine('loco', function_class='gbm', 
                                n_estimators=50, max_depth=2, random_state=42)

print(f"Sample sizes: {SAMPLE_SIZES}")
print(f"Effect sizes: {EFFECT_SIZES}")
print(f"N replications: {N_REPLICATIONS}")

## 3. Run Comparison Experiment

In [None]:
def run_single_test(engine, df, conditioning_cols):
    """Run a single CI test and return results with timing."""
    start = time.time()
    result = engine.test_conditional_independence(df, 'X', 'Y', conditioning_cols)
    elapsed = time.time() - start
    return {
        'p_value': result['p_value'],
        'reject': result['reject_independence'],
        'cmi': result['cmi'],
        'runtime': elapsed
    }

# Run experiments
results = []

for n_samples in SAMPLE_SIZES:
    for n_cov in N_COVARIATES_LIST:
        for effect in EFFECT_SIZES:
            print(f"n={n_samples}, k={n_cov}, effect={effect}")
            
            for rep in tqdm(range(N_REPLICATIONS), leave=False):
                # Generate data
                df = generate_ci_test_data(n_samples, n_cov, effect, seed=rep*1000+n_samples)
                conditioning_cols = [f'W{i+1}' for i in range(n_cov)]
                
                # Run CMI test
                try:
                    cmi_result = run_single_test(cmi_engine, df, conditioning_cols)
                    results.append({
                        'method': 'CMI',
                        'n_samples': n_samples,
                        'n_covariates': n_cov,
                        'effect_size': effect,
                        'replication': rep,
                        **cmi_result
                    })
                except Exception as e:
                    print(f"CMI error: {e}")
                
                # Run LOCO test
                try:
                    loco_result = run_single_test(loco_engine, df, conditioning_cols)
                    results.append({
                        'method': 'LOCO',
                        'n_samples': n_samples,
                        'n_covariates': n_cov,
                        'effect_size': effect,
                        'replication': rep,
                        **loco_result
                    })
                except Exception as e:
                    print(f"LOCO error: {e}")

results_df = pd.DataFrame(results)
print(f"\nTotal results: {len(results_df)} rows")

## 4. Analysis: Type I Error (Null True)

In [None]:
# Filter to null true scenario (effect_size = 0)
null_true_df = results_df[results_df['effect_size'] == 0]

# Compute rejection rates (Type I error)
type1_error = null_true_df.groupby(['method', 'n_samples', 'n_covariates'])['reject'].mean().reset_index()
type1_error.columns = ['method', 'n_samples', 'n_covariates', 'type1_error']

print("Type I Error Rates (should be ~0.05):")
print(type1_error.pivot(index=['n_samples', 'n_covariates'], columns='method', values='type1_error'))

In [None]:
# Plot Type I error
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

for idx, n_cov in enumerate(N_COVARIATES_LIST):
    ax = axes[idx]
    subset = type1_error[type1_error['n_covariates'] == n_cov]
    
    for method in ['CMI', 'LOCO']:
        method_data = subset[subset['method'] == method]
        ax.plot(method_data['n_samples'], method_data['type1_error'], 
                marker='o', label=method, linewidth=2)
    
    ax.axhline(y=0.05, color='red', linestyle='--', label=f'Nominal ({ALPHA})')
    ax.set_xlabel('Sample Size')
    ax.set_ylabel('Type I Error Rate')
    ax.set_title(f'Type I Error (k={n_cov} covariates)')
    ax.legend()
    ax.set_ylim(0, 0.20)

plt.tight_layout()
plt.savefig('../results/loco_vs_cmi_type1_error.png', dpi=150, bbox_inches='tight')
plt.show()

## 5. Analysis: Statistical Power (Null False)

In [None]:
# Compute power (rejection rate when null is false)
power_df = results_df.groupby(['method', 'n_samples', 'n_covariates', 'effect_size'])['reject'].mean().reset_index()
power_df.columns = ['method', 'n_samples', 'n_covariates', 'effect_size', 'power']

print("Power by effect size (n=500, k=2):")
subset = power_df[(power_df['n_samples'] == 500) & (power_df['n_covariates'] == 2)]
print(subset.pivot(index='effect_size', columns='method', values='power'))

In [None]:
# Plot power curves
fig, axes = plt.subplots(len(N_COVARIATES_LIST), len(SAMPLE_SIZES), 
                         figsize=(4*len(SAMPLE_SIZES), 4*len(N_COVARIATES_LIST)))

for i, n_cov in enumerate(N_COVARIATES_LIST):
    for j, n_samples in enumerate(SAMPLE_SIZES):
        ax = axes[i, j] if len(N_COVARIATES_LIST) > 1 else axes[j]
        subset = power_df[(power_df['n_samples'] == n_samples) & 
                          (power_df['n_covariates'] == n_cov)]
        
        for method in ['CMI', 'LOCO']:
            method_data = subset[subset['method'] == method]
            ax.plot(method_data['effect_size'], method_data['power'], 
                    marker='o', label=method, linewidth=2)
        
        ax.axhline(y=0.05, color='red', linestyle='--', alpha=0.5)
        ax.axhline(y=0.80, color='green', linestyle='--', alpha=0.5, label='80% power')
        ax.set_xlabel('Effect Size')
        ax.set_ylabel('Power')
        ax.set_title(f'n={n_samples}, k={n_cov}')
        ax.legend()
        ax.set_ylim(0, 1.05)

plt.tight_layout()
plt.savefig('../results/loco_vs_cmi_power_curves.png', dpi=150, bbox_inches='tight')
plt.show()

## 6. Analysis: Runtime Comparison

In [None]:
# Average runtime by method and sample size
runtime_df = results_df.groupby(['method', 'n_samples', 'n_covariates'])['runtime'].agg(['mean', 'std']).reset_index()
runtime_df.columns = ['method', 'n_samples', 'n_covariates', 'mean_runtime', 'std_runtime']

print("Mean Runtime (seconds):")
print(runtime_df.pivot(index=['n_samples', 'n_covariates'], columns='method', values='mean_runtime'))

In [None]:
# Plot runtime comparison
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

for idx, n_cov in enumerate(N_COVARIATES_LIST):
    ax = axes[idx]
    subset = runtime_df[runtime_df['n_covariates'] == n_cov]
    
    x = np.arange(len(SAMPLE_SIZES))
    width = 0.35
    
    cmi_data = subset[subset['method'] == 'CMI']
    loco_data = subset[subset['method'] == 'LOCO']
    
    ax.bar(x - width/2, cmi_data['mean_runtime'], width, label='CMI', 
           yerr=cmi_data['std_runtime'], capsize=5)
    ax.bar(x + width/2, loco_data['mean_runtime'], width, label='LOCO',
           yerr=loco_data['std_runtime'], capsize=5)
    
    ax.set_xlabel('Sample Size')
    ax.set_ylabel('Runtime (seconds)')
    ax.set_title(f'Runtime Comparison (k={n_cov} covariates)')
    ax.set_xticks(x)
    ax.set_xticklabels(SAMPLE_SIZES)
    ax.legend()

plt.tight_layout()
plt.savefig('../results/loco_vs_cmi_runtime.png', dpi=150, bbox_inches='tight')
plt.show()

## 7. Analysis: P-value Calibration (Under Null)

In [None]:
# P-value distribution under null (should be uniform)
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

for idx, method in enumerate(['CMI', 'LOCO']):
    ax = axes[idx]
    method_null = null_true_df[null_true_df['method'] == method]
    
    ax.hist(method_null['p_value'], bins=20, density=True, alpha=0.7, edgecolor='black')
    ax.axhline(y=1.0, color='red', linestyle='--', label='Uniform')
    ax.set_xlabel('P-value')
    ax.set_ylabel('Density')
    ax.set_title(f'{method} P-value Distribution (Under Null)')
    ax.legend()
    ax.set_xlim(0, 1)

plt.tight_layout()
plt.savefig('../results/loco_vs_cmi_pvalue_calibration.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Kolmogorov-Smirnov test for uniformity
print("KS Test for P-value Uniformity (under null):")
for method in ['CMI', 'LOCO']:
    pvals = null_true_df[null_true_df['method'] == method]['p_value']
    ks_stat, ks_pval = stats.kstest(pvals, 'uniform')
    print(f"  {method}: KS stat = {ks_stat:.4f}, p-value = {ks_pval:.4f}")

## 8. Summary Statistics

In [None]:
# Create summary table
summary = results_df.groupby(['method', 'effect_size']).agg({
    'reject': 'mean',
    'runtime': 'mean',
    'p_value': 'mean'
}).round(4)
summary.columns = ['Rejection Rate', 'Mean Runtime (s)', 'Mean P-value']

print("Summary by Method and Effect Size:")
print(summary)

In [None]:
# Save results
results_df.to_csv('../results/loco_vs_cmi_synthetic_results.csv', index=False)
print("Results saved to results/loco_vs_cmi_synthetic_results.csv")

## 9. Conclusions

Based on the experiments:

1. **Type I Error**: [Fill in based on results]
2. **Power**: [Fill in based on results]
3. **Runtime**: [Fill in based on results]
4. **P-value Calibration**: [Fill in based on results]

**Recommendations**:
- Use CMI when: [conditions]
- Use LOCO when: [conditions]