### Complex Confounding on X-ray data - Asymptotic Normality

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import seaborn as sns
from scipy.stats import norm
from scipy.special import expit

# ATE estimation
import statsmodels.api as sm
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPRegressor
from doubleml import DoubleMLData, DoubleMLPLR
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from causalml.inference.meta import BaseSRegressor

# Custom modules
from feature_extraction.pretrained_models_xrv import load_torchxrayvision_model, extract_features_from_folder
from utils.project import set_root
from utils.io import save_results, load_results
from helpers.ae import fit_autoencoder

In [None]:
# Set working directory, dataset directory and directory for saving results
set_root()
dataset_dir = "data/xray/raw/all_unique"
results_dir = "results/asym_normality/xray/complex"

# Define the model name and path for saving results
model_name = "densenet121-res224-all"  # Pretrained model name
save_dir = f"data/xray/representations/{model_name}"

# Define file paths
features_path = os.path.join(save_dir, "latent_features.npy")
labels_path = os.path.join(save_dir, "labels.npy")

In [None]:
# Feature extraction and saving (Only if the features and labels do not already exist)
if not os.path.exists(features_path) or not os.path.exists(labels_path):
    print(f"Extracting features using model '{model_name}'...")
    
    # Extract features and save them
    model = load_torchxrayvision_model(model_name)
    all_features, labels = extract_features_from_folder(
        dataset_dir,
        model,
        device='cpu',
        batch_size=32,
        save_path=save_dir
    )
    
    print(f"Features extracted and saved to: {save_dir}")
else:
    print(f"Features already exist in {save_dir}. Skipping extraction.")

# Load extracted features
all_features = np.load(features_path)
labels = np.load(labels_path)

### Autoencoder Training on Pre-trained Representations (Used for Complex Confounding Simuation)

In [None]:
# Train an autoencoder on the pre-trained representations (latent_features of the AE will be used to simulate confounding)
latent_dim = 5 # Dimensionality of the latent space
encoder, latent_features, scaler = fit_autoencoder(all_features, latent_dim=latent_dim, epochs=50, batch_size=32)

# Obtain scaled imdb pre-trained representations 
all_features_scaled = scaler.transform(all_features)

### Complex Confounding Simulation and ATE Estimation

In [None]:
# 1. Define simulation parameters
beta_true = 2.0     # True effect of A on Y
np.random.seed(42)  # For reproducibility
gammas_Y = np.random.uniform(-1, 0, latent_dim)   # Effect latent feature confounder on Y
gamma_A = np.random.normal(0, 1, latent_dim)  # Effect latent feature confounder on A

# 2. Specify general parameters for simulation
n_samples = latent_features.shape[0]
d_features = all_features_scaled.shape[1]
n_runs = 200  # Number of simulation runs
ci_alpha_level = 0.05  # Alpha level for 1-alpha confidence intervals
z_score = norm.ppf(1 - ci_alpha_level / 2) # Z-score for 1-alpha confidence intervals

# 3. Initialize Storage for Estimates and Confidence Intervals
methods = ['Naive', 'S-Learner (NN)', 'DML (NN)']  

estimates_dict_complex = {method: [] for method in methods}
cis_dict_complex = {method: {'se': [], 'lower': [], 'upper': []} for method in methods}

# 4. Simulation Loop
for run in range(n_runs):
    print(f"\n--- Simulation Run {run + 1} ---")
    # Set a unique seed for each run for variability
    seed = 42 + run
    np.random.seed(seed)
    
    # a. Compute Treatment Assignment Probability
    pA = expit(latent_features @ gamma_A)

    # b. Sample Treatment Assignment
    A = np.random.binomial(1, pA)

    # c. Generate Outcome Y
    noise = np.random.normal(loc=0, scale=1, size=n_samples)
    Y = beta_true * A + latent_features @ gammas_Y + noise
    
    # d. Package into DataFrame
    df = pd.DataFrame({
        'Y': Y,
        'A': A
    })
    
    # 4.1. Naive OLS (Unadjusted) using statsmodels
    X_naive = sm.add_constant(df['A']) 
    model_naive = sm.OLS(df['Y'], X_naive).fit()
    beta_naive = model_naive.params['A']
    se_naive = model_naive.bse['A']
    ci_lower_naive = beta_naive - z_score * se_naive
    ci_upper_naive = beta_naive + z_score * se_naive
    estimates_dict_complex['Naive'].append(beta_naive)
    cis_dict_complex['Naive']['se'].append(se_naive)
    cis_dict_complex['Naive']['lower'].append(ci_lower_naive)
    cis_dict_complex['Naive']['upper'].append(ci_upper_naive)
    print(f"Naive OLS: β = {beta_naive:.3f}, SE = {se_naive:.3f}")
    
    ## Different Other ATE Estimators

    # 4.2. S-Learner (NN)
    outcome_model_nn = Pipeline([
        ('scaler', StandardScaler()),
        ('mlp', MLPRegressor(
            hidden_layer_sizes=(100, 50),
            activation='relu',
            solver='adam',
            max_iter=500,
            random_state=seed
        ))
    ])

    try:
        s_learner_nn = BaseSRegressor(outcome_model_nn) 
        s_ate_nn, s_ci_lower_nn, s_ci_upper_nn = s_learner_nn.estimate_ate(all_features_scaled, A, Y, return_ci=True)
        estimates_dict_complex['S-Learner (NN)'].append(s_ate_nn[0])
        se_slearner_nn = (s_ci_upper_nn[0] - s_ate_nn[0]) / z_score
        cis_dict_complex['S-Learner (NN)']['se'].append(se_slearner_nn)
        cis_dict_complex['S-Learner (NN)']['lower'].append(s_ci_lower_nn[0])
        cis_dict_complex['S-Learner (NN)']['upper'].append(s_ci_upper_nn[0])
        print(f"S-Learner (NN): β = {s_ate_nn[0]:.3f}")
    except Exception as e:
        print(f"Run {run+1}: S-Learner (NN) failed with error: {e}")
        estimates_dict_complex['S-Learner (NN)'].append(np.nan)
        cis_dict_complex['S-Learner (NN)']['lower'].append(np.nan)
        cis_dict_complex['S-Learner (NN)']['upper'].append(np.nan)


    # 4.3. DoubleML with Neural Network Nuisance
    # Convert all_features to DataFrame
    X_dml_df = pd.DataFrame(
        all_features_scaled,  # Use scaled features
        columns=[f"feat_{i}" for i in range(d_features)]
    )
    
    # Add outcome and treatment to DoubleMLData via column names
    X_dml_df['Y'] = df['Y']
    X_dml_df['A'] = df['A']

    # Create DoubleMLData
    data_dml = DoubleMLData(X_dml_df, "Y", "A")

    # 4.3.1. DoubleML with Neural Network Nuisance Estimators
    try:
        # Define nuisance models with neural networks
        ml_g_nn = Pipeline([
            ('scaler', StandardScaler()),
            ('mlp', MLPRegressor(
                hidden_layer_sizes=(100, 50),
                activation='relu',
                solver='adam',
                max_iter=500,
                random_state=seed
            ))
        ])
        
        ml_m_log = LogisticRegression()
    
        # Instantiate and fit DoubleMLPLR
        dml_plr_nn = DoubleMLPLR(data_dml, ml_g_nn, ml_m_log, n_folds=2)
        dml_plr_nn.fit()
        beta_dml_nn = dml_plr_nn.coef[0]
        se_dml_nn = dml_plr_nn.se[0]
        estimates_dict_complex['DML (NN)'].append(beta_dml_nn)
        # 95% Confidence Interval
        ci_lower_dml_nn = beta_dml_nn - z_score * se_dml_nn
        ci_upper_dml_nn = beta_dml_nn + z_score * se_dml_nn
        cis_dict_complex['DML (NN)']['se'].append(se_dml_nn)
        cis_dict_complex['DML (NN)']['lower'].append(ci_lower_dml_nn)
        cis_dict_complex['DML (NN)']['upper'].append(ci_upper_dml_nn)
        print(f"DML (NN): β = {beta_dml_nn:.3f}, SE = {se_dml_nn:.3f}")
    except Exception as e:
        print(f"Run {run+1}: DML (NN) failed with error: {e}")
        estimates_dict_complex['DML (NN)'].append(np.nan)
        cis_dict_complex['DML (NN)']['se'].append(np.nan)
        cis_dict_complex['DML (NN)']['lower'].append(np.nan)
        cis_dict_complex['DML (NN)']['upper'].append(np.nan)

In [None]:
# 5. Create a directory for the experiment and save the results
experiment_name = "exp_results"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
experiment_dir = os.path.join(results_dir, model_name, experiment_name, timestamp)
save_results(experiment_dir, estimates_dict_complex, cis_dict_complex)

### Plotting Results

In [4]:
# Load the results from the previous experiment
estimates_dict_complex, cis_dict_complex = load_results(experiment_dir)

In [7]:
# Standardize the results
dml_est_complex_normalized = (np.array(estimates_dict_complex['DML (NN)']) - beta_true) / np.array(cis_dict_complex['DML (NN)']['se'])
slearner_est_complex_normalized = (np.array(estimates_dict_complex['S-Learner (NN)']) - beta_true) / np.array(cis_dict_complex['S-Learner (NN)']['se'])
naive_est_complex_normalized = (np.array(estimates_dict_complex['Naive']) - beta_true) / np.array(cis_dict_complex['Naive']['se'])

In [None]:
# 6. Plotting
sns.set(style="whitegrid")
face_colors = sns.color_palette('pastel')
edge_colors = sns.color_palette('dark')

fig_orth_nosplit, ax = plt.subplots(constrained_layout=True)

n_bins = 30
sns.histplot(naive_est_complex_normalized,
                color=face_colors[1], edgecolor = edge_colors[1],
                stat='density', bins=8, label='Naive')
sns.histplot(slearner_est_complex_normalized,
                color=face_colors[0], edgecolor = edge_colors[0],
                stat='density', bins=n_bins, label='S-Learner')
sns.histplot(dml_est_complex_normalized,
                color=face_colors[2], edgecolor = edge_colors[2],
                stat='density', bins=n_bins, label='DML')
x_val_norm = np.arange(-4, 4, 0.001)
y_val_norm = norm.pdf(x_val_norm)
ax.plot(x_val_norm, y_val_norm, color='k', label='$\\mathcal{N}(0, 1)$', linewidth=1.5)
ax.legend(loc='upper right', bbox_to_anchor=(1.2, 1.0))
ax.set_xlim([-40, 20])
ax.set_xlabel('$(\widehat{ATE} - ATE)/\hat{\sigma}$')
plot_path = os.path.join(experiment_dir, 'ate_asym_norm_complex_conf_pneu.pdf')
plt.savefig(plot_path, dpi=300, bbox_inches='tight', facecolor='white')
plt.show()