# Experiment 20: Counterfactual Capability

Demonstrates:
1. Individual-level counterfactuals: P(Y_x | X=x', Y=y)
2. Average Treatment Effect (ATE)
3. Transportability across populations

In [None]:
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

SEED = 42
np.random.seed(SEED)
print("Setup complete.")

In [None]:
class CounterfactualSynthesizer:
    """MISATA with individual counterfactual capability."""
    
    def __init__(self, target_col, task='classification', random_state=42):
        self.target_col = target_col
        self.task = task
        self.random_state = random_state
        
    def fit(self, df):
        self.columns = list(df.columns)
        self.n_samples = len(df)
        self.original_data = df.copy().reset_index(drop=True)
        
        # Store marginals
        self.marginals = {}
        for col in self.columns:
            values = df[col].values
            self.marginals[col] = {'sorted': np.sort(values), 'min': values.min(), 'max': values.max()}
        
        # Compute and store noise terms (for counterfactuals)
        self.uniform_data = pd.DataFrame()
        for col in self.columns:
            self.uniform_data[col] = stats.rankdata(df[col]) / (len(df) + 1)
        
        self.noise_terms = self.uniform_data.apply(
            lambda x: stats.norm.ppf(np.clip(x, 0.001, 0.999))
        )
        
        # Copula
        corr_matrix = self.noise_terms.corr().values
        corr_matrix = np.nan_to_num(corr_matrix, nan=0.0)
        np.fill_diagonal(corr_matrix, 1.0)
        eigvals, eigvecs = np.linalg.eigh(corr_matrix)
        eigvals = np.maximum(eigvals, 1e-6)
        corr_matrix = eigvecs @ np.diag(eigvals) @ eigvecs.T
        self.cholesky = np.linalg.cholesky(corr_matrix)
        
        # Target model
        feature_cols = [c for c in self.columns if c != self.target_col]
        self.target_model = GradientBoostingClassifier(n_estimators=100, max_depth=5, random_state=self.random_state)
        self.target_model.fit(df[feature_cols], df[self.target_col])
        self.feature_cols = feature_cols
        self.target_rate = df[self.target_col].mean()
        
        return self
    
    def counterfactual(self, individual_idx, intervention):
        """Compute individual counterfactual."""
        noise = self.noise_terms.iloc[individual_idx].values
        
        cf_values = {}
        for i, col in enumerate(self.columns):
            if col in intervention:
                cf_values[col] = intervention[col]
            elif col == self.target_col:
                continue
            else:
                u = stats.norm.cdf(noise[i])
                sorted_vals = self.marginals[col]['sorted']
                positions = np.linspace(0, 1, len(sorted_vals))
                cf_values[col] = np.interp(u, positions, sorted_vals)
        
        # Predict counterfactual outcome
        X_cf = pd.DataFrame([{c: cf_values[c] for c in self.feature_cols}])
        prob = self.target_model.predict_proba(X_cf)[0, 1]
        cf_values[self.target_col] = 1 if prob >= 0.5 else 0
        
        return pd.Series(cf_values)
    
    def average_treatment_effect(self, treatment_var, treatment_value, control_value):
        """Compute ATE via counterfactuals."""
        treatment_outcomes = []
        control_outcomes = []
        
        for idx in range(min(500, self.n_samples)):  # Sample for speed
            cf_treat = self.counterfactual(idx, {treatment_var: treatment_value})
            cf_control = self.counterfactual(idx, {treatment_var: control_value})
            
            treatment_outcomes.append(cf_treat[self.target_col])
            control_outcomes.append(cf_control[self.target_col])
        
        ate = np.mean(treatment_outcomes) - np.mean(control_outcomes)
        se = np.sqrt(np.var(treatment_outcomes)/len(treatment_outcomes) + np.var(control_outcomes)/len(control_outcomes))
        
        return ate, se

print("CounterfactualSynthesizer defined.")

In [None]:
# Load Adult Census
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
columns = ['age', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital_status',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_per_week', 'native_country', 'income']
df_raw = pd.read_csv(url, names=columns, na_values=' ?', skipinitialspace=True)
df_raw = df_raw.dropna().reset_index(drop=True).sample(3000, random_state=SEED)
df_raw['income'] = (df_raw['income'] == '>50K').astype(int)

for col in ['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country']:
    df_raw[col] = LabelEncoder().fit_transform(df_raw[col].astype(str))

print(f"Data: {len(df_raw)} samples")

In [None]:
# Fit
synth = CounterfactualSynthesizer(target_col='income')
synth.fit(df_raw)
print("Model fitted.")

## Test 1: Individual Counterfactuals

In [None]:
# Pick an individual with low income
low_income_idx = df_raw[df_raw['income'] == 0].index[0]
original = df_raw.iloc[low_income_idx]

print("Original individual (low income):")
print(f"  Age: {original['age']}, Education: {original['education_num']}, Income: {original['income']}")

# Counterfactual: what if education was higher?
cf = synth.counterfactual(low_income_idx, {'education_num': 16})  # PhD level

print("\nCounterfactual (education_num=16):")
print(f"  Education: 16, Income: {cf['income']}")

if cf['income'] == 1:
    print("\n✓ With higher education, this individual WOULD have high income")
else:
    print("\n✗ Even with higher education, this individual would NOT have high income")

In [None]:
# Batch counterfactuals
print("\nBatch counterfactuals for 10 individuals:")
print("-" * 60)

results = []
for idx in range(10):
    original = df_raw.iloc[idx]
    cf_low = synth.counterfactual(idx, {'education_num': 8})
    cf_high = synth.counterfactual(idx, {'education_num': 16})
    
    results.append({
        'idx': idx,
        'original_edu': original['education_num'],
        'original_income': original['income'],
        'cf_edu_8': cf_low['income'],
        'cf_edu_16': cf_high['income'],
        'effect': cf_high['income'] - cf_low['income']
    })

cf_df = pd.DataFrame(results)
print(cf_df.to_string(index=False))

## Test 2: Average Treatment Effect

In [None]:
# ATE of education on income
print("Computing Average Treatment Effect...")

ate_edu, se_edu = synth.average_treatment_effect('education_num', treatment_value=16, control_value=8)
ate_hours, se_hours = synth.average_treatment_effect('hours_per_week', treatment_value=60, control_value=20)

print("\n" + "="*60)
print("AVERAGE TREATMENT EFFECTS")
print("="*60)
print(f"\nEducation (16 vs 8 years):")
print(f"  ATE = {ate_edu:.3f} ± {se_edu*1.96:.3f}")
print(f"  Interpretation: Higher education increases income probability by {ate_edu*100:.1f}%")

print(f"\nHours per week (60 vs 20):")
print(f"  ATE = {ate_hours:.3f} ± {se_hours*1.96:.3f}")
print(f"  Interpretation: More hours increases income probability by {ate_hours*100:.1f}%")

## Test 3: Transportability

In [None]:
# Train on population A (age < 40), test on population B (age >= 40)
pop_a = df_raw[df_raw['age'] < 40].reset_index(drop=True)
pop_b = df_raw[df_raw['age'] >= 40].reset_index(drop=True)

print(f"Population A (young): {len(pop_a)} samples")
print(f"Population B (older): {len(pop_b)} samples")

# Fit on A
synth_a = CounterfactualSynthesizer(target_col='income')
synth_a.fit(pop_a)

# Compute ATE on A
ate_a, se_a = synth_a.average_treatment_effect('education_num', 16, 8)

# Fit on B
synth_b = CounterfactualSynthesizer(target_col='income')
synth_b.fit(pop_b)

# Compute ATE on B
ate_b, se_b = synth_b.average_treatment_effect('education_num', 16, 8)

print("\n" + "="*60)
print("TRANSPORTABILITY ANALYSIS")
print("="*60)
print(f"\nATE of education on income:")
print(f"  Population A (young): {ate_a:.3f} ± {se_a*1.96:.3f}")
print(f"  Population B (older): {ate_b:.3f} ± {se_b*1.96:.3f}")

if abs(ate_a - ate_b) < 0.1:
    print("\n✓ Causal effect is TRANSPORTABLE across age groups")
else:
    print(f"\n⚠ Effect differs by {abs(ate_a - ate_b):.3f} between populations")

In [None]:
# Save results
results_summary = {
    'metric': ['ATE_education', 'ATE_hours', 'ATE_pop_A', 'ATE_pop_B'],
    'value': [ate_edu, ate_hours, ate_a, ate_b],
    'se': [se_edu, se_hours, se_a, se_b]
}
pd.DataFrame(results_summary).to_csv('counterfactual_results.csv', index=False)

print("\n" + "="*60)
print("EXPERIMENT 20 COMPLETE")
print("="*60)
print("\nCapabilities Demonstrated:")
print("  1. ✓ Individual counterfactuals")
print("  2. ✓ Average Treatment Effect")
print("  3. ✓ Transportability analysis")
print("\nFile saved: counterfactual_results.csv")