In [None]:
import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import utilities as ut
import modularised_utils as mut
import scipy.stats as stats

sns.set_theme(style="whitegrid")
seed = 42
np.random.seed(seed)

In [52]:
experiment = 'slc'
path = f"data/{experiment}"

In [53]:
# Load the dictionaries containing the results for each optimization method
diroca_results = joblib.load(f"{path}/diroca_cv_results.pkl")
gradca_results = joblib.load(f"{path}/gradca_cv_results.pkl")
baryca_results = joblib.load(f"{path}/baryca_cv_results.pkl")

# Also load the original data dictionary
all_data = ut.load_all_data(experiment)

print("Successfully loaded results for all optimization methods.")
print(f"  - DIROCA results keys: {list(diroca_results.keys())}")
print(f"  - GradCA results keys: {list(gradca_results.keys())}")
print(f"  - BARYCA results keys: {list(baryca_results.keys())}")

Data loaded for 'slc'.
Successfully loaded results for all optimization methods.
  - DIROCA results keys: ['fold_0', 'fold_1']
  - GradCA results keys: ['fold_0', 'fold_1']
  - BARYCA results keys: ['fold_0', 'fold_1']


In [54]:
def calculate_abstraction_error(T_matrix, Dll_test, Dhl_test):
    """
    Calculates the "0-shift" abstraction error for a given T matrix on a test set.

    This function works in the space of distribution parameters:
    1. It estimates Gaussian parameters (mean, cov) from the LL and HL test samples.
    2. It transforms the LL Gaussian's parameters using the T matrix.
    3. It computes the Wasserstein distance between the transformed LL distribution
       and the actual HL distribution.
    
    Args:
        T_matrix (np.ndarray): The learned abstraction matrix.
        Dll_test (np.ndarray): The low-level endogenous test samples.
        Dhl_test (np.ndarray): The high-level endogenous test samples.
        
    Returns:
        float: The calculated Wasserstein-2 distance.
    """
    # 1. Estimate parameters from the low-level test data
    mu_L_test    = np.mean(Dll_test, axis=0)
    Sigma_L_test = np.cov(Dll_test, rowvar=False)

    # 2. Estimate parameters from the high-level test data
    mu_H_test    = np.mean(Dhl_test, axis=0)
    Sigma_H_test = np.cov(Dhl_test, rowvar=False)

    # 3. Transform the low-level parameters using the T matrix
    # This projects the low-level distribution into the high-level space
    mu_V_predicted    = mu_L_test @ T_matrix.T
    Sigma_V_predicted = T_matrix @ Sigma_L_test @ T_matrix.T
    
    # 4. Compute the Wasserstein distance between the two resulting Gaussians
    # Assuming 'oput.compute_wasserstein' is the function you provided
    try:
        # Your function returns the squared distance, so we take the sqrt
        wasserstein_dist = np.sqrt(mut.compute_wasserstein(mu_V_predicted, Sigma_V_predicted, mu_H_test, Sigma_H_test))
    except Exception as e:
        print(f"  - Warning: Could not compute Wasserstein distance. Error: {e}. Returning NaN.")
        return np.nan

    return wasserstein_dist

print("✓ Helper function 'calculate_abstraction_error' is defined.")

✓ Helper function 'calculate_abstraction_error' is defined.


In [55]:
# 3. Unpack the necessary data collections from your loaded data
Dll_samples = all_data['LLmodel']['data']
Dhl_samples = all_data['HLmodel']['data']
I_ll_relevant = all_data['LLmodel']['intervention_set']
omega = all_data['abstraction_data']['omega']

In [56]:
# --- Main Evaluation Loop (Averaged Over All Interventions) ---

# 1. Create a list to store a record for each evaluation run
evaluation_records = []

# 2. Group all your results dictionaries together for easy iteration
results_to_evaluate = {
    "DIROCA": diroca_results,
    "GradCA": gradca_results,
    "BARYCA": baryca_results
    # You can add other baselines like Abs-LiNGAM here
}



# 4. Loop through each method, fold, and hyperparameter run
for method_name, results_dict in results_to_evaluate.items():
    print(f"-- Evaluating method: {method_name} --")
    for fold_key, fold_results in results_dict.items():
        for run_key, run_data in fold_results.items():
            
            # a. Get the learned T matrix and test indices for this run
            T_learned = run_data['T_matrix']
            test_indices = run_data['test_indices']
            
            # b. NEW: Loop over all interventions to get an average error
            errors_per_intervention = []
            for iota in I_ll_relevant:
                # Get the correct slice of test data for this intervention
                Dll_test_iota = Dll_samples[iota][test_indices]
                Dhl_test_iota = Dhl_samples[omega[iota]][test_indices]
                
                # Calculate the abstraction error for this specific intervention
                error = calculate_abstraction_error(T_learned, Dll_test_iota, Dhl_test_iota)
                if not np.isnan(error):
                    errors_per_intervention.append(error)
            
            # c. Calculate the final error as the average over all interventions
            average_error = np.mean(errors_per_intervention) if errors_per_intervention else np.nan
            
            # d. Store the result in a structured record
            record = {
                'method': method_name,
                'fold': int(fold_key.split('_')[1]),
                'run_id': run_key,
                'avg_error': average_error # Store the new average error
            }
            evaluation_records.append(record)

# 5. Convert the list of records into a pandas DataFrame for easy analysis
results_df = pd.DataFrame(evaluation_records)

print("\n\n--- Evaluation Complete ---")
print("Displaying the average '0-shift' error (across all interventions) for every run:")
display(results_df)

# 6. Calculate and display the final summary statistics
print("\n--- Final Summary (Mean Error ± Std Dev across all folds) ---")
summary_stats = results_df.groupby(['method', 'run_id'])['avg_error'].agg(['mean', 'std']).sort_values('mean')
display(summary_stats)

-- Evaluating method: DIROCA --
-- Evaluating method: GradCA --
-- Evaluating method: BARYCA --


--- Evaluation Complete ---
Displaying the average '0-shift' error (across all interventions) for every run:


Unnamed: 0,method,fold,run_id,avg_error
0,DIROCA,0,eps_delta_8,1.280884
1,DIROCA,1,eps_delta_8,1.28638
2,GradCA,0,gradca_run,1.2653
3,GradCA,1,gradca_run,1.273351
4,BARYCA,0,baryca_run,2.64145
5,BARYCA,1,baryca_run,2.625743



--- Final Summary (Mean Error ± Std Dev across all folds) ---


Unnamed: 0_level_0,Unnamed: 1_level_0,mean,std
method,run_id,Unnamed: 2_level_1,Unnamed: 3_level_1
GradCA,gradca_run,1.269326,0.005692
DIROCA,eps_delta_8,1.283632,0.003887
BARYCA,baryca_run,2.633597,0.011107


## rho-shift

In [None]:

def apply_shift(clean_data, shift_config, all_var_names, model_level, seed=42):
    """
    Applies a specified contamination to the test data with full flexibility.
    - Handles different shift types (additive, multiplicative).
    - Handles different distributions (gaussian, student-t, exponential).
    - Handles selective application to a subset of variables.
    """
    np.random.seed(seed)
    shift_type = shift_config.get('type')
    dist_type = shift_config.get('distribution', 'gaussian')
    n_samples, n_dims = clean_data.shape

    # Select the correct parameter dictionary for the current model level
    level_key = 'll_params' if model_level == 'L' else 'hl_params'
    params = shift_config.get(level_key, {})
    
    # --- 1. Generate the full noise matrix based on the specified distribution ---
    noise_matrix = np.zeros_like(clean_data)
    if dist_type == 'gaussian':
        mu = np.array(params.get('mu', np.zeros(n_dims)))
        sigma_def = params.get('sigma', np.eye(n_dims))
        sigma = np.diag(np.array(sigma_def)) if np.array(sigma_def).ndim == 1 else np.array(sigma_def)
        noise_matrix = np.random.multivariate_normal(mean=mu, cov=sigma, size=n_samples)

    elif dist_type == 'student-t':
        df = params.get('df', 3)
        loc = np.array(params.get('loc', np.zeros(n_dims)))
        shape_def = params.get('shape', np.eye(n_dims))
        shape = np.diag(np.array(shape_def)) if np.array(shape_def).ndim == 1 else np.array(shape_def)
        noise_matrix = stats.multivariate_t.rvs(loc=loc, shape=shape, df=df, size=n_samples)

    elif dist_type == 'exponential':
        scale = params.get('scale', 1.0)
        noise_matrix = np.random.exponential(scale=scale, size=(n_samples, n_dims))
    
    # --- 2. Apply noise selectively if specified ---
    final_noise = np.zeros_like(clean_data)
    vars_to_affect = params.get('apply_to_vars')

    if vars_to_affect is None:
        # If not specified, apply noise to all variables
        final_noise = noise_matrix
    else:
        # If specified, apply noise only to the selected columns
        try:
            indices_to_affect = [all_var_names.index(var) for var in vars_to_affect]
            final_noise[:, indices_to_affect] = noise_matrix[:, indices_to_affect]
        except ValueError as e:
            print(f"Warning: A variable in 'apply_to_vars' not found. Error: {e}")
            return clean_data # Return clean data if there's a config error

    # --- 3. Return the contaminated data ---
    if shift_type == 'additive':
        return clean_data + final_noise
    elif shift_type == 'multiplicative':
        return clean_data * final_noise
    else:
        raise ValueError(f"Unknown shift type: {shift_type}")

print("✓ Final generalized helper function 'apply_shift' is defined.")

✓ Final generalized helper function 'apply_shift' is defined.


## Configuration Guide for "Rho-Shift" Evaluation

The entire "rho-shift" evaluation is controlled by a single Python dictionary named `shift_config`. By changing the keys and values in this dictionary, you can test a wide variety of data contamination scenarios.

### Top-Level Keys

These keys define the main type of contamination to apply.

* `type: str`: Determines the primary operation.
    * **Options:** `'translation'`, `'scaling'`, `'additive'`, `'multiplicative'`.

* `distribution: str` (Used only for `additive` and `multiplicative` types): Determines the type of random noise to generate.
    * **Options:** `'gaussian'`, `'student-t'`, `'exponential'`.

In [70]:
shift_config = {
    'type': 'additive',
    'distribution': 'student-t',  # Can be 'gaussian', 'student-t', or 'exponential'
    
    'll_params': {
        'df': 3,
        'loc': [0, 0, 0],
        'shape': [0.5, 2.0, 1.0] # Define diagonal elements of the scale matrix
    },
    'hl_params': {
        'df': 5,
        'loc': [0, 0],
        'shape': [[0.8, -0.3], [-0.3, 0.8]] # Define a full scale matrix
    }
}

# 2. Prepare necessary variables
print(f"Running 'rho-shift' evaluation with shift type: '{shift_config['type']}'")
rho_shift_records = []
ll_var_names = list(all_data['LLmodel']['graph'].nodes())
hl_var_names = list(all_data['HLmodel']['graph'].nodes())

# 3. Main evaluation loop
for method_name, results_dict in results_to_evaluate.items():
    print(f"-- Evaluating method's robustness: {method_name} --")
    for fold_key, fold_results in results_dict.items():
        for run_key, run_data in fold_results.items():
            
            T_learned = run_data['T_matrix']
            test_indices = run_data['test_indices']
            
            errors_per_intervention = []
            for iota in I_ll_relevant:
                Dll_test_clean = Dll_samples[iota][test_indices]
                Dhl_test_clean = Dhl_samples[omega[iota]][test_indices]
                
                # Apply the shift, now providing the list of variable names
                Dll_test_noisy = apply_shift(Dll_test_clean, shift_config, ll_var_names, model_level='L')
                Dhl_test_noisy = apply_shift(Dhl_test_clean, shift_config, hl_var_names, model_level='H')
                
                error = calculate_abstraction_error(T_learned, Dll_test_noisy, Dhl_test_noisy)
                if not np.isnan(error):
                    errors_per_intervention.append(error)
            
            average_error = np.mean(errors_per_intervention) if errors_per_intervention else np.nan
            
            record = {
                'method': method_name,
                'fold': int(fold_key.split('_')[1]),
                'run_id': run_key,
                'avg_error_noisy': average_error
            }
            rho_shift_records.append(record)

# 4. Display Final Results
rho_shift_df = pd.DataFrame(rho_shift_records)
print("\n\n--- 'Rho-Shift' Evaluation Complete ---")
summary = rho_shift_df.groupby(['method', 'run_id'])['avg_error_noisy'].agg(['mean', 'std']).sort_values('mean')
display(summary)

Running 'rho-shift' evaluation with shift type: 'additive'
-- Evaluating method's robustness: DIROCA --
-- Evaluating method's robustness: GradCA --
-- Evaluating method's robustness: BARYCA --


--- 'Rho-Shift' Evaluation Complete ---


Unnamed: 0_level_0,Unnamed: 1_level_0,mean,std
method,run_id,Unnamed: 2_level_1,Unnamed: 3_level_1
GradCA,gradca_run,1.964032,0.00539
DIROCA,eps_delta_8,2.062791,0.009017
BARYCA,baryca_run,4.353293,0.024379


# Huber contamination logic

In [None]:

def apply_shift(clean_data, shift_config, all_var_names, model_level, seed=42):
    """
    Applies a specified contamination to the test data with full flexibility.
    - Handles different shift types (additive, multiplicative).
    - Handles different distributions (gaussian, student-t, exponential).
    - Handles selective application to a subset of variables.
    """
    np.random.seed(seed)
    shift_type = shift_config.get('type')
    dist_type = shift_config.get('distribution', 'gaussian')
    n_samples, n_dims = clean_data.shape

    # Select the correct parameter dictionary for the current model level
    level_key = 'll_params' if model_level == 'L' else 'hl_params'
    params = shift_config.get(level_key, {})
    
    # --- 1. Generate the full noise matrix based on the specified distribution ---
    noise_matrix = np.zeros_like(clean_data)
    if dist_type == 'gaussian':
        mu = np.array(params.get('mu', np.zeros(n_dims)))
        sigma_def = params.get('sigma', np.eye(n_dims))
        sigma = np.diag(np.array(sigma_def)) if np.array(sigma_def).ndim == 1 else np.array(sigma_def)
        noise_matrix = np.random.multivariate_normal(mean=mu, cov=sigma, size=n_samples)

    elif dist_type == 'student-t':
        df = params.get('df', 3)
        loc = np.array(params.get('loc', np.zeros(n_dims)))
        shape_def = params.get('shape', np.eye(n_dims))
        shape = np.diag(np.array(shape_def)) if np.array(shape_def).ndim == 1 else np.array(shape_def)
        noise_matrix = stats.multivariate_t.rvs(loc=loc, shape=shape, df=df, size=n_samples)

    elif dist_type == 'exponential':
        scale = params.get('scale', 1.0)
        noise_matrix = np.random.exponential(scale=scale, size=(n_samples, n_dims))
    
    # --- 2. Apply noise selectively if specified ---
    final_noise = np.zeros_like(clean_data)
    vars_to_affect = params.get('apply_to_vars')

    if vars_to_affect is None:
        # If not specified, apply noise to all variables
        final_noise = noise_matrix
    else:
        # If specified, apply noise only to the selected columns
        try:
            indices_to_affect = [all_var_names.index(var) for var in vars_to_affect]
            final_noise[:, indices_to_affect] = noise_matrix[:, indices_to_affect]
        except ValueError as e:
            print(f"Warning: A variable in 'apply_to_vars' not found. Error: {e}")
            return clean_data # Return clean data if there's a config error

    # --- 3. Return the contaminated data ---
    if shift_type == 'additive':
        return clean_data + final_noise
    elif shift_type == 'multiplicative':
        return clean_data * final_noise
    else:
        raise ValueError(f"Unknown shift type: {shift_type}")

print("✓ Final generalized helper function 'apply_shift' is defined.")

In [None]:
def apply_huber_contamination(clean_data, alpha, shift_config, all_var_names, model_level, seed=42):
    """
    Contaminates a dataset using the Huber model. A fraction 'alpha' of the
    samples are replaced with noisy versions.

    Args:
        clean_data (np.ndarray): The original, clean test data samples.
        alpha (float): The fraction of data to contaminate (between 0 and 1).
        shift_config (dict): Configuration defining the noise for the outliers.
        all_var_names (list): List of all variable names for this data.
        model_level (str): 'L' for low-level or 'H' for high-level.
        
    Returns:
        np.ndarray: The new, contaminated test data.
    """
    np.random.seed(seed)
    if not (0 <= alpha <= 1):
        raise ValueError("Alpha must be between 0 and 1.")

    if alpha == 0:
        return clean_data
    
    # Create the fully noisy version of the data using our existing function
    noisy_data = apply_shift(clean_data, shift_config, all_var_names, model_level)
    
    if alpha == 1:
        return noisy_data
        
    n_samples = clean_data.shape[0]
    n_to_contaminate = int(alpha * n_samples)
    
    # Randomly select which rows to replace
    indices_to_replace = np.random.choice(n_samples, n_to_contaminate, replace=False)
    
    # Start with a copy of the clean data
    contaminated_data = clean_data.copy()
    
    # Replace the selected rows with their noisy versions
    contaminated_data[indices_to_replace] = noisy_data[indices_to_replace]
    
    return contaminated_data

print("✓ Helper function 'apply_huber_contamination' is defined.")

✓ Helper function 'apply_huber_contamination' is defined.


In [71]:
# --- "Huber-Shift" Evaluation Loop ---

# ======================================================================
# 1. CONFIGURE YOUR HUBER TEST HERE
# ======================================================================

# Set the contamination level (e.g., 0.1 means 10% of data will be outliers)
alpha = 1.0

# Define the type of noise to use for the outliers
shift_config = {
    'type': 'additive',
    'distribution': 'student-t',  # Can be 'gaussian', 'student-t', or 'exponential'
    
    'll_params': {
        'df': 3,
        'loc': [0, 0, 0],
        'shape': [0.5, 2.0, 1.0] # Define diagonal elements of the scale matrix
    },
    'hl_params': {
        'df': 5,
        'loc': [0, 0],
        'shape': [[0.8, -0.3], [-0.3, 0.8]] # Define a full scale matrix
    }
}


# ======================================================================
# 2. MAIN EVALUATION LOOP
# ======================================================================

print(f"Running 'Huber-Shift' evaluation with alpha = {alpha}")
huber_shift_records = []

for method_name, results_dict in results_to_evaluate.items():
    print(f"-- Evaluating method's robustness: {method_name} --")
    for fold_key, fold_results in results_dict.items():
        for run_key, run_data in fold_results.items():
            
            T_learned = run_data['T_matrix']
            test_indices = run_data['test_indices']
            
            errors_per_intervention = []
            for iota in I_ll_relevant:
                # a. Get the clean test data
                Dll_test_clean = Dll_samples[iota][test_indices]
                Dhl_test_clean = Dhl_samples[omega[iota]][test_indices]
                
                # b. Apply Huber contamination to create the test set
                Dll_test_contaminated = apply_huber_contamination(Dll_test_clean, alpha, shift_config, ll_var_names, model_level='L')
                Dhl_test_contaminated = apply_huber_contamination(Dhl_test_clean, alpha, shift_config, hl_var_names, model_level='H')
                
                # c. Calculate error on the CONTAMINATED data
                error = calculate_abstraction_error(T_learned, Dll_test_contaminated, Dhl_test_contaminated)
                if not np.isnan(error):
                    errors_per_intervention.append(error)
            
            average_error = np.mean(errors_per_intervention) if errors_per_intervention else np.nan
            
            record = {
                'method': method_name,
                'fold': int(fold_key.split('_')[1]),
                'run_id': run_key,
                'avg_error_huber': average_error
            }
            huber_shift_records.append(record)

# 3. Display Final Results
huber_shift_df = pd.DataFrame(huber_shift_records)
print("\n\n--- 'Huber-Shift' Evaluation Complete ---")
summary = huber_shift_df.groupby(['method', 'run_id'])['avg_error_huber'].agg(['mean', 'std']).sort_values('mean')
display(summary)

Running 'Huber-Shift' evaluation with alpha = 1.0
-- Evaluating method's robustness: DIROCA --
-- Evaluating method's robustness: GradCA --
-- Evaluating method's robustness: BARYCA --


--- 'Huber-Shift' Evaluation Complete ---


Unnamed: 0_level_0,Unnamed: 1_level_0,mean,std
method,run_id,Unnamed: 2_level_1,Unnamed: 3_level_1
GradCA,gradca_run,1.964032,0.00539
DIROCA,eps_delta_8,2.062791,0.009017
BARYCA,baryca_run,4.353293,0.024379


In [17]:
import numpy as np
import torch
import joblib
import matplotlib.pyplot as plt
from tqdm import tqdm
import seaborn as sns
from scipy.stats import wilcoxon

# Local modules
import modularised_utils as mut
import opt_utils as oput 
import evaluation_utils as evut
import Linear_Additive_Noise_Models as lanm
import params
import random

from math_utils import compute_wasserstein

In [18]:
experiment = 'synth1'

In [19]:
T_results = joblib.load(f"data/{experiment}/diroca_train_results.pkl")

In [20]:
coeff_estimation = True

Dll_obs = joblib.load(f"data/{experiment}/Dll_obs_test.pkl")
Dhl_obs = joblib.load(f"data/{experiment}/Dhl_obs_test.pkl")

LLmodels = joblib.load(f"data/{experiment}/LLmodels.pkl")
HLmodels = joblib.load(f"data/{experiment}/HLmodels.pkl")

num_llsamples, num_hlsamples  = Dll_obs.shape[0], Dhl_obs.shape[0]

Gll, Ill = mut.load_model(experiment, 'LL')
Ghl, Ihl = mut.load_model(experiment, 'HL')

n_varsll, n_varshl = len(Gll.nodes()), len(Ghl.nodes())

omega    = mut.load_omega_map(experiment)

if coeff_estimation == True:
    ll_coeffs = mut.get_coefficients(Dll_obs, Gll)
    hl_coeffs = mut.get_coefficients(Dhl_obs, Ghl) 
else:
    ll_coeffs = mut.load_coeffs(experiment, 'LL')
    hl_coeffs = mut.load_coeffs(experiment, 'HL')

U_ll_hat, mu_U_ll_hat, Sigma_U_ll_hat = mut.lan_abduction(Dll_obs, Gll, ll_coeffs)
U_hl_hat, mu_U_hl_hat, Sigma_U_hl_hat = mut.lan_abduction(Dhl_obs, Ghl, hl_coeffs)

data = evut.generate_data(LLmodels, HLmodels, omega, num_llsamples, num_hlsamples, mu_U_ll_hat, Sigma_U_ll_hat, mu_U_hl_hat, Sigma_U_hl_hat)

In [21]:
test_observ        = True 
test_interv        = True
num_iter           = 100
metric             = 'wass'

if test_observ and test_interv:
    test_data = data

elif test_observ:
    test_data = {None: data[None]}

elif test_interv:
    test_data = {k: v for k, v in data.items() if k is not None}

## 0-shift

In [None]:
# Has to be learnt with coeff = True as well. check opt_ell.ipynb'
if coeff_estimation == True:
    
    results_single = {method: {'errors': [], 'mean': 0, 'ci': 0} for method in T_results.keys()}

    for name, res in T_results.items():
        T = res['T_matrix']
        errors = []  # Store errors for each intervention
        scale_factor = 1/np.sqrt(len(Ill))
        wass_total = 0
        for iota in Ill:
            L_i = LLmodels[iota].F
            V_i = T @ L_i
            H_i = HLmodels[omega[iota]].F

            muV    = V_i @ mu_U_ll_hat
            sigmaV = V_i @ Sigma_U_ll_hat @ V_i.T
            muH    = H_i @ mu_U_hl_hat
            sigmaH = H_i @ Sigma_U_hl_hat @ H_i.T


            # Compute Wasserstein metric
            wass_dist = np.sqrt(mut.compute_wasserstein(muV, sigmaV, muH, sigmaH))
            errors.append(wass_dist)
            wass_total += wass_dist

        # Calculate mean and CI
        mean_error = np.mean(errors)
        std_error = np.std(errors)
        ci = std_error

        # Store all statistics
        results_single[name] = {
            'errors': errors,
            'mean': mean_error,
            'ci': ci
        }

    results_single = dict(sorted(results_single.items(), key=lambda x: x[1]['mean']))
    ll_coeffs = mut.load_coeffs(experiment, 'LL')
    hl_coeffs = mut.load_coeffs(experiment, 'HL')
else:
    print('No coeff estimation')
    
# Print results
print("\n" + "="*100)
print(f"{'Method':<15} {'Error (mean ± std)':<35}")
print("="*100)

for method, stats in results_single.items():
    print(f"{method:<15} {stats['mean']:>8.4f} ± {stats['ci']:<8.4f}")

## ρ-shift

In [24]:
rad_values = np.arange(0.05, 100.05, 10).tolist()  
sample_forms = ['boundary', 'sample']

center   = 'worst'
coverage_type = 'uniform'

hat_dict = {'L': [mu_U_ll_hat, Sigma_U_ll_hat], 'H': [mu_U_hl_hat, Sigma_U_hl_hat]}

worst = 'T_8'
mu_worst_L    = T_results[worst]['optimization_params']['L']['mu_U']
Sigma_worst_L = T_results[worst]['optimization_params']['L']['Sigma_U']
mu_worst_H    = T_results[worst]['optimization_params']['H']['mu_U']
Sigma_worst_H = T_results[worst]['optimization_params']['H']['Sigma_U']

worst_dict = {'L': [mu_worst_L, Sigma_worst_L], 'H': [mu_worst_H, Sigma_worst_H]}

In [None]:
# Define the r_sigma values to sweep over
sigma_values = np.linspace(0, 1, 100)
methods_to_track = list(T_results.keys())

# Storage for plotting and for mean/CI across all sigmas
error_evolution = {method: [] for method in methods_to_track}
mean_across_sigmas = {method: [] for method in methods_to_track}
ci_across_sigmas = {method: [] for method in methods_to_track}

for r_sigma in sigma_values:
    #print(f"Testing with r_sigma = {r_sigma}")
    # Generate shifted Gaussian families for this sigma
    shift_family_L = mut.generate_shifted_gaussian_family(
        mu_worst_L, Sigma_worst_L, 1, r_mu=0, r_sigma=r_sigma, coverage='rand', seed=None)
    shift_family_H = mut.generate_shifted_gaussian_family(
        mu_worst_H, Sigma_worst_H, 1, r_mu=0, r_sigma=r_sigma, coverage='rand', seed=None)

    # Initialize results for this sigma
    results = {method: [] for method in methods_to_track}

    for shift_L, shift_H in zip(shift_family_L, shift_family_H):
        noise_muL, noise_SigmaL = shift_L
        noise_muH, noise_SigmaH = shift_H
        noise_muL = noise_muL.numpy() if hasattr(noise_muL, 'numpy') else noise_muL
        noise_muH = noise_muH.numpy() if hasattr(noise_muH, 'numpy') else noise_muH
        noise_SigmaL = noise_SigmaL.numpy() if hasattr(noise_SigmaL, 'numpy') else noise_SigmaL
        noise_SigmaH = noise_SigmaH.numpy() if hasattr(noise_SigmaH, 'numpy') else noise_SigmaH

        for name in methods_to_track:
            res = T_results[name]
            T = res['T_matrix']
            wass_total = 0
            for iota in Ill:
                L_i = LLmodels[iota].F
                V_i = T @ L_i
                H_i = HLmodels[omega[iota]].F
                muV = V_i @ noise_muL
                sigmaV = V_i @ noise_SigmaL @ V_i.T
                muH = H_i @ noise_muH
                sigmaH = H_i @ noise_SigmaH @ H_i.T
                wass_dist = np.sqrt(compute_wasserstein(muV, sigmaV, muH, sigmaH))
                wass_total += wass_dist
            results[name].append(wass_total / len(Ill))

    # Store mean and CI for each method for this sigma
    for method in methods_to_track:
        mean = np.mean(results[method])
        std = np.std(results[method])
        ci = std/10
        error_evolution[method].append(mean)
        mean_across_sigmas[method].append(mean)
        ci_across_sigmas[method].append(ci)

# Print mean and CI across all sigmas for each method
print(f"\n{'Method':<15} {'Mean across sigmas ± Std':<35}")
print("="*50)
method_stats = []
for method in methods_to_track:
    mean_over_sigmas = np.mean(mean_across_sigmas[method])
    std_over_sigmas = np.std(mean_across_sigmas[method])
    ci_over_sigmas = std_over_sigmas
    method_stats.append((method, mean_over_sigmas, ci_over_sigmas))
# Sort by mean, descending (worst to best)
method_stats.sort(key=lambda x: x[1], reverse=True)
for method, mean, ci in method_stats:
    print(f"{method:<15} {mean:8.4f} ± {ci:<8.4f}")
print("="*50)

## F-contamination

In [27]:
def contaminate_structural_matrix(M, contamination_fraction, contamination_type, num_segments=10, seed=None):
   """
   Contaminates a linear transformation matrix M to break its strict linearity.
  
   Args:
       M (np.ndarray): Original linear transformation matrix (n x m).
       contamination_fraction (float): Magnitude of contamination (e.g., between 0.05 and 1.0).
       contamination_type (str): Type of contamination to apply. Options are:
                                 'multiplicative', 'nonlinear', or 'piecewise'.
       num_segments (int): Number of segments for piecewise linear contamination (default: 3).
       seed (int, optional): Random seed for reproducibility.
      
   Returns:
       np.ndarray: The contaminated matrix.
   """
   rng = np.random.default_rng(seed)
   M_cont = M.copy() 
   n, m = M.shape


   if contamination_type == "multiplicative":
       # Apply element-wise multiplicative noise (preserving zeros below the main diagonal)
       # Only perturb the upper-triangular part.
       noise = rng.uniform(low=1.0 - contamination_fraction, high=1.0 + contamination_fraction, size=M.shape)
       # Create a mask for the upper triangular (including diagonal)
       mask = np.triu(np.ones_like(M))
       M_cont = M * (1 - mask + mask * noise)
  
   elif contamination_type == "nonlinear":
       # Apply a nonlinear function to L: for instance, add a sine-based perturbation.
       M_cont = M + contamination_fraction * np.sin(M)
  
   elif contamination_type == "piecewise":
       # Contaminate each row with a piecewise linear function.
       def piecewise_contaminate_row(row, cont_frac, segments, rng):
           n_elem = len(row)
           # Choose random breakpoints among indices
           if segments < 2:
               return row  # nothing to do
           breakpoints = np.sort(rng.integers(low=1, high=n_elem, size=segments - 1))
           breakpoints = np.concatenate(([0], breakpoints, [n_elem]))
           contaminated_row = np.empty_like(row)
           # For each segment, assign a random multiplicative factor.
           for j in range(len(breakpoints) - 1):
               start = breakpoints[j]
               end = breakpoints[j+1]
               factor = 1.0 + rng.uniform(low=-cont_frac, high=cont_frac)
               contaminated_row[start:end] = row[start:end] * factor
           return contaminated_row
      
       # Apply the piecewise contamination row-by-row.
       for i in range(n):
           M_cont[i, :] = piecewise_contaminate_row(M[i, :], contamination_fraction, num_segments, rng)
  
   else:
       raise ValueError("Unknown contamination type. Choose among 'multiplicative', 'nonlinear', or 'piecewise'.")
  
   return M_cont


In [None]:
# Define contamination levels to test
contamination_levels = np.linspace(0.0, 1.0, 100)  
for cont_type in ['piecewise']:
    print(f"Contamination type: {cont_type}")
    # Store results for plotting
    plot_results = {method: {'means': [], 'stds': []} for method in T_results.keys()}


    # Run experiment for each contamination level
    for cont_frac in tqdm(contamination_levels):
        abstraction_error = {name: [] for name in T_results.keys()}
    
        for _ in range(1):
            noise_muL, noise_SigmaL = mu_U_ll_hat, Sigma_U_ll_hat
            noise_muH, noise_SigmaH = mu_U_hl_hat, Sigma_U_hl_hat
            
            noise_muL    = noise_muL.numpy() if torch.is_tensor(noise_muL) else noise_muL
            noise_muH    = noise_muH.numpy() if torch.is_tensor(noise_muH) else noise_muH
            noise_SigmaL = noise_SigmaL.numpy() if torch.is_tensor(noise_SigmaL) else noise_SigmaL
            noise_SigmaH = noise_SigmaH.numpy() if torch.is_tensor(noise_SigmaH) else noise_SigmaH

            for name, res in T_results.items():
                T = res['T_matrix']
                total = 0
                for iota in Ill:
                    L_i = LLmodels[iota].F
                    L_i = contaminate_structural_matrix(L_i, contamination_fraction=cont_frac, contamination_type=cont_type)
                    V_i = T @ L_i
                    H_i = HLmodels[omega[iota]].F
                    H_i = contaminate_structural_matrix(H_i, contamination_fraction=cont_frac, contamination_type=cont_type)
                    
                    muV    = V_i @ noise_muL
                    sigmaV = V_i @ noise_SigmaL @ V_i.T
                    muH    = H_i @ noise_muH
                    sigmaH = H_i @ noise_SigmaH @ H_i.T

                    dist = np.sqrt(compute_wasserstein(muV, sigmaV, muH, sigmaH))

                    total += dist


                iter_avg = total / len(Ill)
                abstraction_error[name].append(iter_avg)


        # Store results for this contamination level
        for method in T_results.keys():
            mean_e = np.mean(abstraction_error[method])
            std_e = np.std(abstraction_error[method])
            plot_results[method]['means'].append(mean_e)
            plot_results[method]['stds'].append(std_e)

    # Compute averages across all contamination levels for each method
    method_averages = {}

    for method in T_results.keys():
        # Get all means across contamination levels
        all_means = plot_results[method]['means']
        # Compute the mean and std across all contamination levels
        overall_mean = np.mean(all_means)
        overall_std = np.std(all_means)
        method_averages[method] = (overall_mean, overall_std)

    # Sort methods by average (worst to best)
    sorted_methods = sorted(method_averages.items(), key=lambda x: x[1][0], reverse=True)

    # Print results
    print("\n" + "="*100)
    print("AVERAGE WASSERSTEIN DISTANCE ACROSS ALL CONTAMINATION LEVELS (0.0 to 1.0)")
    print("="*100)
    print(f"{'Method':<15} {'Mean ± std':<35}")
    print("-"*100)

    for method, (mean, std) in sorted_methods:
        ci = std
        print(f"{method:<15} {mean:>8.4f} ± {ci:<8.4f}")

    print("="*100)

## ω-contamination

In [30]:
def contaminate_omega_map(original_omega, num_misalignments):
    """
    Randomly corrupt a subset of entries in the ω map to simulate mapping misspecification.
    
    Args:
        original_omega (dict): Original intervention mapping.
            For example: {None: None, iota1: H_i1, iota2: H_i1, iota3: H_i2, ...}
        num_misalignments (int): Desired number of misaligned mappings.
        
    Returns:
        dict: A new ω mapping with up to num_misalignments entries altered.
    """
    # Exclude keys or values that are None if desired.
    omega_keys = [k for k in original_omega.keys() if k is not None]
    omega_vals = [original_omega[k] for k in omega_keys if original_omega[k] is not None]
    
    # Start with a copy of the original mapping.
    contaminated_omega = original_omega.copy()
    
    # Bound the number of misalignments by the number of eligible keys.
    num_to_corrupt = min(num_misalignments, len(omega_keys))
    
    # Randomly select keys to corrupt.
    to_corrupt = random.sample(omega_keys, k=num_to_corrupt)
    
    # Create a random permutation of available targets (ensuring change)
    # Use the set of targets from eligible keys.
    all_targets = list(set(omega_vals))
    
    for key in to_corrupt:
        original_target = original_omega[key]
        # Only corrupt if there's an alternative available.
        available_targets = [t for t in all_targets if t != original_target]
        if available_targets:
            new_target = random.choice(available_targets)
            contaminated_omega[key] = new_target
            
    return contaminated_omega

In [None]:
# Define contamination levels to test
misalignment_levels = range(0, len(Ill))

# Store results for plotting
omega_plot_results = {method: {'means': [], 'stds': []} for method in T_results.keys()}


# Run experiment for each contamination level
for num_mis in tqdm(misalignment_levels):
   abstraction_error = {name: [] for name in T_results.keys()}
  
   for _ in range(1):
    noise_muL, noise_SigmaL = mu_U_ll_hat, Sigma_U_ll_hat
    noise_muH, noise_SigmaH = mu_U_hl_hat, Sigma_U_hl_hat
    
    noise_muL    = noise_muL.numpy() if torch.is_tensor(noise_muL) else noise_muL
    noise_muH    = noise_muH.numpy() if torch.is_tensor(noise_muH) else noise_muH
    noise_SigmaL = noise_SigmaL.numpy() if torch.is_tensor(noise_SigmaL) else noise_SigmaL
    noise_SigmaH = noise_SigmaH.numpy() if torch.is_tensor(noise_SigmaH) else noise_SigmaH

    omega_cont = contaminate_omega_map(omega, num_mis)


    for name, res in T_results.items():
        T = res['T_matrix']
        total = 0
        for iota in Ill:
            L_i = LLmodels[iota].F
            V_i = T @ L_i
            H_i = HLmodels[omega_cont[iota]].F
            
            muV    = V_i @ noise_muL
            sigmaV = V_i @ noise_SigmaL @ V_i.T
            muH    = H_i @ noise_muH
            sigmaH = H_i @ noise_SigmaH @ H_i.T


            dist = np.sqrt(compute_wasserstein(muV, sigmaV, muH, sigmaH))
            total += dist


        iter_avg = total / len(Ill)
        abstraction_error[name].append(iter_avg)


   # Store results for this contamination level
   for method in T_results.keys():
       mean_e = np.mean(abstraction_error[method])
       std_e = np.std(abstraction_error[method])
       omega_plot_results[method]['means'].append(mean_e)
       omega_plot_results[method]['stds'].append(std_e)

# Compute averages for each method
method_averages = []
for method in T_results.keys():
    # Get all means across misalignment levels
    all_means = omega_plot_results[method]['means']
    # Compute overall mean and std
    overall_mean = np.mean(all_means)
    overall_std = np.std(all_means)
    method_averages.append((method, overall_mean, overall_std))

# Sort methods by mean (worst to best)
method_averages.sort(key=lambda x: x[1], reverse=True)

# Print sorted averages
for method, mean, std in method_averages:
    ci = std
    print(f"{method:<15} {mean:>8.4f} ± {ci:<8.4f}")

print("="*100)