In [None]:
import json
import pandas as pd
from statsmodels.stats.anova import AnovaRM

def parse_results_to_dataframe(file_path):
    """
    Parses the JSON results file and converts it into a long-format pandas DataFrame
    suitable for repeated measures ANOVA.

    Args:
        file_path (str): The path to the JSON results file.

    Returns:
        pandas.DataFrame: A DataFrame with columns for repetition, modality, task,
                          data_type, balanced_accuracy, and f1_macro.
    """
    try:
        with open(file_path, 'r') as f:
            all_runs_data = json.load(f)
    except FileNotFoundError:
        print(f"Error: The file was not found at {file_path}")
        print("Please make sure the file path is correct.")
        return None
    except json.JSONDecodeError:
        print(f"Error: The file at {file_path} is not a valid JSON file.")
        return None


    # A list to hold all the records before creating the DataFrame
    records = []

    # Enumerate through the outer list to get the repetition number
    for rep_idx, run_data in enumerate(all_runs_data):
        # Each 'run_data' is a list of dictionaries, one for each modality
        for modality_results in run_data:
            modality_name = modality_results['modality']
            
            # Iterate over the two tasks: 'tumor_type' and 'stage'
            for task_name in ['tumor_type', 'stage']:
                if task_name in modality_results:
                    task_data = modality_results[task_name]
                    
                    # Iterate over the data types: 'real', 'synthetic_from_coherent', etc.
                    for data_type_name, metrics in task_data.items():
                        record = {
                            'repetition': rep_idx,
                            'modality': modality_name,
                            'task': task_name,
                            'data_type': data_type_name,
                            'balanced_accuracy': metrics.get('balanced_accuracy'),
                            'f1_macro': metrics.get('f1_macro')
                        }
                        records.append(record)

    df = pd.DataFrame(records)
    return df

def perform_anova(df, task, metric):
    """
    Performs a two-way repeated measures ANOVA and prints the results.

    Args:
        df (pandas.DataFrame): The DataFrame containing the data.
        task (str): The task to filter by ('tumor_type' or 'stage').
        metric (str): The dependent variable metric ('balanced_accuracy' or 'f1_macro').
    """
    
    print("="*80)
    print(f"Performing Repeated Measures ANOVA for:")
    print(f"  Task   : {task}")
    print(f"  Metric : {metric}")
    print("="*80)

    # Filter the DataFrame for the specific task
    task_df = df[df['task'] == task].copy()

    # Check if the filtered dataframe is empty
    if task_df.empty:
        print(f"\nWarning: No data found for task '{task}'. Skipping ANOVA.\n")
        return

    # Perform the repeated measures ANOVA
    # The 'subject' is the identifier for each independent repetition.
    # 'within' specifies the within-subject factors ('modality' and 'data_type').
    try:
        aov = AnovaRM(data=task_df,
                      depvar=metric,
                      subject='repetition',
                      within=['modality', 'data_type'])
        
        res = aov.fit()
        print(res.summary())
        print("\n")

    except Exception as e:
        print(f"\nAn error occurred during ANOVA calculation for {task} - {metric}: {e}\n")


# --- Main execution ---
if __name__ == "__main__":
    # Define the path to your JSON file
    # IMPORTANT: Update this path to point to your actual file location.
    file_path = '../../results/downstream/task_01_train_on_real/evaluation_results_10_runs.json'

    # 1. Parse the data
    results_df = parse_results_to_dataframe(file_path)

    if results_df is not None:
        # 2. Define the tasks and metrics for which to run the anova
        tasks_to_analyze = ['tumor_type', 'stage']
        metrics_to_analyze = ['balanced_accuracy', 'f1_macro']

        # 3. Run the ANOVAs
        for task in tasks_to_analyze:
            for metric in metrics_to_analyze:
                perform_anova(results_df, task, metric)

In [None]:
import json
import pandas as pd
from scipy import stats
from itertools import combinations

def parse_results_to_dataframe(file_path):
    """
    Parses the JSON results file and converts it into a long-format pandas DataFrame.
    (This is the same function from the previous step).
    """
    try:
        with open(file_path, 'r') as f:
            all_runs_data = json.load(f)
    except FileNotFoundError:
        print(f"Error: The file was not found at {file_path}")
        return None
    records = []
    for rep_idx, run_data in enumerate(all_runs_data):
        for modality_results in run_data:
            modality_name = modality_results['modality']
            for task_name in ['tumor_type', 'stage']:
                if task_name in modality_results:
                    task_data = modality_results[task_name]
                    for data_type_name, metrics in task_data.items():
                        record = {
                            'repetition': rep_idx, 'modality': modality_name,
                            'task': task_name, 'data_type': data_type_name,
                            'balanced_accuracy': metrics.get('balanced_accuracy'),
                            'f1_macro': metrics.get('f1_macro')
                        }
                        records.append(record)
    return pd.DataFrame(records)

# --- Main execution ---
if __name__ == "__main__":
    file_path = '../../results/downstream/task_01_train_on_real/evaluation_results_10_runs.json'
    results_df = parse_results_to_dataframe(file_path)

    if results_df is not None:
        # Define parameters for the analysis
        tasks_to_analyze = ['tumor_type', 'stage']
        metrics_to_analyze = ['balanced_accuracy', 'f1_macro']
        modalities = results_df['modality'].unique()
        data_types = ['real', 'synthetic_from_coherent', 'synthetic_from_multi']

        # Define the number of comparisons for Bonferroni correction
        n_comparisons = 3 # (real vs coherent), (real vs multi), (coherent vs multi)
        alpha = 0.05
        corrected_alpha = alpha / n_comparisons
        print(f"Using Bonferroni-corrected alpha = {corrected_alpha:.4f}\n")

        # Loop through each task and metric
        for task in tasks_to_analyze:
            for metric in metrics_to_analyze:
                print("="*80)
                print(f"ANALYSIS: Task = {task.upper()}, Metric = {metric.upper()}")
                print("="*80)
                
                # Perform comparisons within each modality
                for modality in modalities:
                    print(f"\n--- Modality: {modality} ---")
                    
                    modality_df = results_df[
                        (results_df['task'] == task) &
                        (results_df['modality'] == modality)
                    ]

                    # Create a dictionary to hold the series of results for each data type
                    results_by_dtype = {
                        dtype: modality_df[modality_df['data_type'] == dtype][metric]
                        for dtype in data_types
                    }

                    # Perform pairwise paired t-tests
                    for dt1, dt2 in combinations(data_types, 2):
                        # Get the two series to compare
                        series1 = results_by_dtype[dt1]
                        series2 = results_by_dtype[dt2]

                        # Perform the paired t-test
                        t_stat, p_value = stats.ttest_rel(series1, series2)

                        # Check for significance against the corrected alpha
                        is_significant = "Significant" if p_value < corrected_alpha else "Not Significant"

                        print(f"{dt1:>25} vs. {dt2:<25}: p-value = {p_value:.6f} ({is_significant})")

In [None]:
import pandas as pd
from statsmodels.stats.anova import AnovaRM
from scipy import stats
from itertools import combinations

def run_anova_for_imputation(df, metric):
    """
    Performs and prints a two-way repeated measures ANOVA.
    """
    print("="*80)
    print(f"ANOVA Results for Metric: {metric.upper()}")
    print("="*80)
    
    try:
        aov = AnovaRM(data=df,
                      depvar=metric,
                      subject='run', # 'run' identifies the repetition
                      within=['test_condition', 'test_type'])
        
        res = aov.fit()
        print(res.summary())
        print("\n")
        
    except Exception as e:
        print(f"\nAn error occurred during ANOVA calculation for {metric}: {e}\n")

def run_posthoc_ttests_for_imputation(df, metric):
    """
    Performs and prints post-hoc paired t-tests with Bonferroni correction.
    """
    print("="*80)
    print(f"Post-Hoc Paired t-test Results for Metric: {metric.upper()}")
    print("="*80)

    conditions = df['test_condition'].unique()
    test_types = ['ablation', 'imputed_coherent', 'imputed_multi']

    # Define the number of comparisons for Bonferroni correction
    n_conditions = len(conditions)
    n_pairs_per_condition = 3
    n_comparisons = n_conditions * n_pairs_per_condition

    alpha = 0.05
    corrected_alpha = alpha / n_comparisons
    print(f"Using Bonferroni-corrected alpha = {corrected_alpha:.4f} for significance.\n")

    # Perform comparisons within each test_condition
    for condition in conditions:
        print(f"\n--- Condition: {condition} ---")
        
        condition_df = df[df['test_condition'] == condition]

        # Create a dictionary to hold the series of results for each test type
        results_by_ttype = {
            ttype: condition_df[condition_df['test_type'] == ttype][metric]
            for ttype in test_types
        }

        # Perform pairwise paired t-tests
        for tt1, tt2 in combinations(test_types, 2):
            # Get the two series to compare
            series1 = results_by_ttype[tt1]
            series2 = results_by_ttype[tt2]

            # Check if either series is empty (can happen if a combination doesn't exist)
            if series1.empty or series2.empty:
                continue

            # Perform the paired t-test
            t_stat, p_value = stats.ttest_rel(series1, series2)

            # Check for significance against the corrected alpha
            is_significant = "Significant" if p_value < corrected_alpha else "Not Significant"

            print(f"{tt1:>18} vs. {tt2:<18}: p-value = {p_value:.6f} ({is_significant})")
    print("\n")


# --- Main execution ---
if __name__ == "__main__":
    # 1. Load the dataset from the CSV file
    try:
        file_path = '../../results/downstream/task_05_imputing_test_set/results_10_runs.csv'
        full_df = pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: The file was not found at {file_path}")
        exit()

    # 2. Filter the data as requested
    conditions_to_exclude = ['cancer_label_only', 'full_data']
    filtered_df = full_df[~full_df['test_condition'].isin(conditions_to_exclude)].copy()
    
    print(f"Successfully loaded and filtered data. Kept {len(filtered_df)} rows for analysis.\n")

    # Define the metrics to analyze
    metrics_to_analyze = ['balanced_accuracy', 'macro_f1_score']

    for metric in metrics_to_analyze:
        # 3. Run the Repeated Measures ANOVA
        run_anova_for_imputation(filtered_df, metric)
        
        # 4. Run the Post-Hoc t-tests to explore 'test_type' differences
        run_posthoc_ttests_for_imputation(filtered_df, metric)

In [None]:
import pandas as pd
from scipy import stats

def run_imputation_vs_full_data_ttests(df, metric):
    """
    Compares imputed data against the full_data baseline using paired t-tests.
    If a result is significant, it also reports which group performed better.
    """
    print("=" * 80)
    print(f"ANALYSIS: Imputation vs. Full Data for Metric: {metric.upper()}")
    print("=" * 80)

    # Step 1: Isolate the baseline 'full_data' results
    full_data_results = df[df['test_condition'] == 'full_data'].sort_values('run')[metric]
    if full_data_results.empty:
        print("Error: Could not find 'full_data' in the dataset. Aborting.")
        return
    mean_full_data = full_data_results.mean()

    # Step 2: Get the data for imputed conditions
    imputed_df = df[df['test_type'].isin(['imputed_coherent', 'imputed_multi'])]
    conditions_to_test = imputed_df['test_condition'].unique()

    # Step 3: Set up the Bonferroni correction
    n_conditions = len(conditions_to_test)
    n_pairs_per_condition = 2
    n_comparisons = n_conditions * n_pairs_per_condition    
    alpha = 0.05
    corrected_alpha = alpha / n_comparisons
    print(f"Using Bonferroni-corrected alpha = {corrected_alpha:.4f} for significance.\n")

    # Step 4: Loop through each condition and perform the comparisons
    for condition in conditions_to_test:
        print(f"\n--- Comparing Condition '{condition}' against Full Data ---")
        condition_df = imputed_df[imputed_df['test_condition'] == condition]

        imputation_types_to_test = ['imputed_coherent', 'imputed_multi']
        for imp_type in imputation_types_to_test:
            imputed_results = condition_df[condition_df['test_type'] == imp_type].sort_values('run')[metric]

            if not imputed_results.empty:
                # Perform the paired t-test
                t_stat, p_value = stats.ttest_rel(full_data_results, imputed_results)
                
                significance_details = ""
                # Check if the result is statistically significant
                if p_value < corrected_alpha:
                    mean_imputed = imputed_results.mean()
                    # Determine which group had the higher mean score
                    if mean_imputed > mean_full_data:
                        winner_string = f"{imp_type} is better"
                    else:
                        winner_string = "full_data is better"
                    significance_details = f"Significant ({winner_string})"
                else:
                    significance_details = "Not Significant"

                print(f"{imp_type:>18} vs. {'full_data':<18}: p-value = {p_value:.6f} ({significance_details})")

    print("\n")


# --- Main execution ---
if __name__ == "__main__":
    # Load the dataset from the CSV file
    try:
        file_path = '../../results/downstream/task_05_imputing_test_set/results_10_runs.csv'
        full_df = pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: The file was not found at {file_path}")
        exit()
    
    # Define the metrics to analyze
    metrics_to_analyze = ['balanced_accuracy', 'macro_f1_score']

    for metric in metrics_to_analyze:
        run_imputation_vs_full_data_ttests(full_df, metric)

In [None]:
import pandas as pd
from statsmodels.stats.anova import AnovaRM
from scipy import stats
from itertools import combinations

def run_anova_for_imputation(df, metric):
    """
    Performs and prints a two-way repeated measures ANOVA.
    """
    print("="*80)
    print(f"ANOVA Results for Metric: {metric.upper()}")
    print("="*80)
    
    try:
        aov = AnovaRM(data=df,
                      depvar=metric,
                      subject='run', # 'run' identifies the repetition
                      within=['test_condition', 'test_type'])
        
        res = aov.fit()
        print(res.summary())
        print("\n")
        
    except Exception as e:
        print(f"\nAn error occurred during ANOVA calculation for {metric}: {e}\n")

def run_posthoc_ttests_for_imputation(df, metric):
    """
    Performs and prints post-hoc paired t-tests with Bonferroni correction.
    """
    print("="*80)
    print(f"Post-Hoc Paired t-test Results for Metric: {metric.upper()}")
    print("="*80)

    conditions = df['test_condition'].unique()
    test_types = ['ablation', 'imputed_coherent', 'imputed_multi']

    # Define the number of comparisons for Bonferroni correction
    n_conditions = len(conditions)
    n_pairs_per_condition = 3
    n_comparisons = n_conditions * n_pairs_per_condition
    alpha = 0.05
    corrected_alpha = alpha / n_comparisons
    print(f"Using Bonferroni-corrected alpha = {corrected_alpha:.4f} for significance.\n")

    # Perform comparisons within each test_condition
    for condition in conditions:
        print(f"\n--- Condition: {condition} ---")
        
        condition_df = df[df['test_condition'] == condition]

        # Create a dictionary to hold the series of results for each test type
        results_by_ttype = {
            ttype: condition_df[condition_df['test_type'] == ttype][metric]
            for ttype in test_types
        }

        # Perform pairwise paired t-tests
        for tt1, tt2 in combinations(test_types, 2):
            # Get the two series to compare
            series1 = results_by_ttype[tt1]
            series2 = results_by_ttype[tt2]

            # Check if either series is empty (can happen if a combination doesn't exist)
            if series1.empty or series2.empty:
                continue

            # Perform the paired t-test
            t_stat, p_value = stats.ttest_rel(series1, series2)

            # Check for significance against the corrected alpha
            is_significant = "Significant" if p_value < corrected_alpha else "Not Significant"

            print(f"{tt1:>18} vs. {tt2:<18}: p-value = {p_value:.6f} ({is_significant})")
    print("\n")


# --- Main execution ---
if __name__ == "__main__":
    # 1. Load the dataset from the CSV file
    try:
        file_path = '../../results/downstream/task_06_imputing_test_set_surv/all_imputations_results_long_rf.csv'
        full_df = pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: The file was not found at {file_path}")
        exit()

    # 2. Filter the data as requested
    conditions_to_exclude = ['cancer_label_only', 'full_data']
    filtered_df = full_df[~full_df['test_condition'].isin(conditions_to_exclude)].copy()
    
    print(f"Successfully loaded and filtered data. Kept {len(filtered_df)} rows for analysis.\n")

    # Define the metrics to analyze
    metrics_to_analyze = ['c_index']

    for metric in metrics_to_analyze:
        # 3. Run the Repeated Measures ANOVA
        run_anova_for_imputation(filtered_df, metric)
        
        # 4. Run the Post-Hoc t-tests to explore 'test_type' differences
        run_posthoc_ttests_for_imputation(filtered_df, metric)

In [None]:
import pandas as pd
from scipy import stats

def run_imputation_vs_full_data_ttests(df, metric):
    """
    Compares imputed data against the full_data baseline using paired t-tests.
    If a result is significant, it also reports which group performed better.
    """
    print("=" * 80)
    print(f"ANALYSIS: Imputation vs. Full Data for Metric: {metric.upper()}")
    print("=" * 80)

    # Step 1: Isolate the baseline 'full_data' results
    full_data_results = df[df['test_condition'] == 'full_data'].sort_values('run')[metric]
    if full_data_results.empty:
        print("Error: Could not find 'full_data' in the dataset. Aborting.")
        return
    mean_full_data = full_data_results.mean()

    # Step 2: Get the data for imputed conditions
    imputed_df = df[df['test_type'].isin(['imputed_coherent', 'imputed_multi'])]
    conditions_to_test = imputed_df['test_condition'].unique()

    # Step 3: Set up the Bonferroni correction
    n_conditions = len(conditions_to_test)
    n_pairs_per_condition = 2
    n_comparisons = n_conditions * n_pairs_per_condition    
    alpha = 0.05
    corrected_alpha = alpha / n_comparisons
    print(f"Using Bonferroni-corrected alpha = {corrected_alpha:.4f} for significance.\n")

    # Step 4: Loop through each condition and perform the comparisons
    for condition in conditions_to_test:
        print(f"\n--- Comparing Condition '{condition}' against Full Data ---")
        condition_df = imputed_df[imputed_df['test_condition'] == condition]

        imputation_types_to_test = ['imputed_coherent', 'imputed_multi']
        for imp_type in imputation_types_to_test:
            imputed_results = condition_df[condition_df['test_type'] == imp_type].sort_values('run')[metric]

            if not imputed_results.empty:
                # Perform the paired t-test
                t_stat, p_value = stats.ttest_rel(full_data_results, imputed_results)
                
                significance_details = ""
                # Check if the result is statistically significant
                if p_value < corrected_alpha:
                    mean_imputed = imputed_results.mean()
                    # Determine which group had the higher mean score
                    if mean_imputed > mean_full_data:
                        winner_string = f"{imp_type} is better"
                    else:
                        winner_string = "full_data is better"
                    significance_details = f"Significant ({winner_string})"
                else:
                    significance_details = "Not Significant"

                print(f"{imp_type:>18} vs. {'full_data':<18}: p-value = {p_value:.6f} ({significance_details})")

    print("\n")


# --- Main execution ---
if __name__ == "__main__":
    # Load the dataset from the CSV file
    try:
        file_path = '../../results/downstream/task_06_imputing_test_set_surv/all_imputations_results_long_rf.csv'
        full_df = pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: The file was not found at {file_path}")
        exit()
    
    # Define the metrics to analyze
    metrics_to_analyze = ['c_index']

    for metric in metrics_to_analyze:
        run_imputation_vs_full_data_ttests(full_df, metric)

In [None]:
import json
import numpy as np
from scipy import stats
from itertools import combinations
from sklearn.metrics import auc

def run_auc_ttest_analysis(raw_data_path, ablation_steps):
    """
    Loads raw experiment data, calculates the Area Under the Curve (AUC) for each 
    run, and performs paired t-tests to compare the methods.
    """
    print("=" * 80)
    print("Statistical Analysis: Counterfactual Inference vs. Random Ablation")
    print("Metric: Area Under the F1-Score Curve (AUC)")
    print("=" * 80)

    try:
        with open(raw_data_path, 'r') as f:
            all_results = json.load(f)
    except FileNotFoundError:
        print(f"Error: The results file was not found at '{raw_data_path}'")
        print("Please make sure you have run the experiment script first.")
        return

    # Bonferroni correction for 3 comparisons:
    # 1. random vs coherent
    # 2. random vs multi
    # 3. coherent vs multi
    n_comparisons = 3
    alpha = 0.05
    corrected_alpha = alpha / n_comparisons
    print(f"Using Bonferroni-corrected alpha = {corrected_alpha:.4f} for significance.\n")

    # Loop through each tested modality (e.g., 'rna', 'wsi')
    for modality_key, raw_data in all_results.items():
        print(f"\n--- Analysis for Ablated Modality: {modality_key.upper()} ---")

        # Step 1: Calculate AUC for each of the 10 runs for each method
        auc_scores = {}
        for method, runs_data in raw_data.items():
            # For each run, calculate the area under the curve.
            # Using sklearn.metrics.auc. np.trapz(y, x) would also work.
            auc_scores[method] = [auc(ablation_steps, run_f1_scores) for run_f1_scores in runs_data]

        # Step 2: Perform pairwise paired t-tests on the lists of AUC scores
        methods_to_compare = ['random', 'coherent', 'multi']
        for method1, method2 in combinations(methods_to_compare, 2):
            scores1 = auc_scores[method1]
            scores2 = auc_scores[method2]

            # Perform the paired t-test
            t_stat, p_value = stats.ttest_rel(scores1, scores2)
            
            significance_details = ""
            # Check for significance against the corrected alpha
            if p_value < corrected_alpha:
                mean1 = np.mean(scores1)
                mean2 = np.mean(scores2)
                winner = method1 if mean1 > mean2 else method2
                significance_details = f"Significant ({winner} is better)"
            else:
                significance_details = "Not Significant"

            print(f"{method1:>10} vs. {method2:<10}: p-value = {p_value:.6f} ({significance_details})")

# --- Main execution ---
if __name__ == '__main__':
    # Define the path to your NEW raw data file
    RAW_DATA_PATH = "../../results/downstream/task_07_counterfactual/rna_wsi_results_RAW.json"
    
    # The ablation steps used in the experiment, needed for AUC calculation
    ABLATION_STEPS = np.arange(0, 1.05, 0.05)

    run_auc_ttest_analysis(RAW_DATA_PATH, ABLATION_STEPS)


In [None]:
import json
import numpy as np

# =============================================================================
# CONFIGURATION
# =============================================================================
# The path to the JSON file created by the recalculation script.
RAW_SCORES_FILE_PATH = "../../results/downstream/task_00_rsquared/recalculated_r2_scores_RAW.json"


# =============================================================================
# MAIN VERIFICATION SCRIPT
# =============================================================================
def verify_and_print_summary(file_path: str):
    """
    Loads the raw scores JSON file and prints a summary table of means and STDs.
    """
    print("\n" + "="*80)
    print("Verification: Calculating Summary Statistics from Raw Scores")
    print("="*80)

    try:
        with open(file_path, 'r') as f:
            all_raw_scores = json.load(f)
    except FileNotFoundError:
        print(f"Verification failed: The file '{file_path}' was not found.")
        print("Please run the 'r2_recalculation_script' first.")
        return

    # Print table header
    print(f"{'target':<10} | {'source_label':<15} | {'r2_mean':<25} | {'r2_std':<25}")
    print("-" * 80)

    # Iterate through the data and print the summary for each experiment
    # Sorting ensures a consistent order for comparison
    for target_modality in sorted(all_raw_scores.keys()):
        sources = all_raw_scores[target_modality]
        for source_label in sorted(sources.keys()):
            scores = sources[source_label]
            if scores:
                # Use numpy for mean and std calculation
                r2_mean = np.mean(scores)
                # ddof=0 for population standard deviation, matching your original np.std
                r2_std = np.std(scores, ddof=0)
                print(f"{target_modality:<10} | {source_label:<15} | {r2_mean:<25.10f} | {r2_std:<25.10f}")
            else:
                print(f"{target_modality:<10} | {source_label:<15} | {'N/A':<25} | {'N/A':<25}")
    print("="*80 + "\n")


if __name__ == '__main__':
    verify_and_print_summary(RAW_SCORES_FILE_PATH)

In [None]:
import json
import numpy as np
import pandas as pd
from scipy import stats
from itertools import combinations
from statsmodels.stats.anova import AnovaRM

# =============================================================================
# CONFIGURATION
# =============================================================================
RAW_SCORES_FILE_PATH = "../../results/downstream/task_00_rsquared/recalculated_r2_scores_RAW.json"
N_REPETITIONS = 10

# =============================================================================
# 1. DATA PREPARATION
# =============================================================================
def prepare_dataframe_for_analysis(file_path: str) -> pd.DataFrame:
    """
    Loads the raw scores and transforms them into a long-format DataFrame suitable
    for repeated measures ANOVA and t-tests. It creates a new 'Single' source
    type by averaging the single-modality sources for each run.
    """
    try:
        with open(file_path, 'r') as f:
            all_raw_scores = json.load(f)
    except FileNotFoundError:
        print(f"Analysis failed: The file '{file_path}' was not found.")
        return pd.DataFrame()

    records = []
    # These are the labels for single-modality sources
    single_source_labels = ['cna', 'rnaseq', 'rppa', 'wsi']

    for target_modality, sources in all_raw_scores.items():
        
        # --- Create the new 'Single' source type ---
        single_source_runs = []
        for source_label, scores in sources.items():
            # Identify single-modality sources, excluding the one that is the target
            if source_label in single_source_labels and source_label != target_modality:
                single_source_runs.append(scores)
        
        # Average across the single-source runs for each of the 10 repetitions
        if single_source_runs:
            # np.mean(axis=0) calculates the mean down the columns (i.e., for each run)
            mean_single_scores = np.mean(single_source_runs, axis=0)
            for i in range(N_REPETITIONS):
                records.append({
                    'run': i,
                    'target_modality': target_modality,
                    'source_type': 'Single',
                    'r2_score': mean_single_scores[i]
                })

        # --- Add 'Coherent' and 'Multi' source types ---
        for source_label, scores in sources.items():
            if source_label in ['Coherent', 'Multi']:
                for i in range(N_REPETITIONS):
                    records.append({
                        'run': i,
                        'target_modality': target_modality,
                        'source_type': source_label,
                        'r2_score': scores[i]
                    })

    return pd.DataFrame(records)


# =============================================================================
# 2. STATISTICAL ANALYSIS FUNCTIONS
# =============================================================================
def run_anova(df: pd.DataFrame):
    """Performs and prints a two-way repeated measures ANOVA."""
    print("\n" + "="*80)
    print("Two-Way Repeated Measures ANOVA Results")
    print("="*80)
    
    if df.empty:
        print("DataFrame is empty, cannot run ANOVA.")
        return
        
    try:
        aov = AnovaRM(data=df,
                      depvar='r2_score',
                      subject='run',
                      within=['target_modality', 'source_type'])
        res = aov.fit()
        print(res.summary())
    except Exception as e:
        print(f"An error occurred during ANOVA calculation: {e}")


def run_posthoc_ttests(df: pd.DataFrame):
    """Performs post-hoc paired t-tests for source_type within each target_modality."""
    print("\n" + "="*80)
    print("Post-Hoc Paired t-tests for Source Type")
    print("="*80)

    if df.empty:
        print("DataFrame is empty, cannot run t-tests.")
        return

    # Bonferroni correction for 3 comparisons
    n_comparisons = 3
    alpha = 0.05
    corrected_alpha = alpha / n_comparisons
    print(f"Using Bonferroni-corrected alpha = {corrected_alpha:.4f} for significance.\n")

    for target in df['target_modality'].unique():
        print(f"\n--- Analysis for Target Modality: {target.upper()} ---")
        target_df = df[df['target_modality'] == target]
        
        source_types_to_compare = ['Single', 'Coherent', 'Multi']
        
        # Get the scores for each source type
        scores_by_source = {
            stype: target_df[target_df['source_type'] == stype]['r2_score'].values
            for stype in source_types_to_compare
        }

        # Perform pairwise t-tests
        for s1, s2 in combinations(source_types_to_compare, 2):
            scores1 = scores_by_source[s1]
            scores2 = scores_by_source[s2]

            if len(scores1) == 0 or len(scores2) == 0:
                continue

            t_stat, p_value = stats.ttest_rel(scores1, scores2)
            
            significance_details = ""
            if p_value < corrected_alpha:
                mean1 = np.mean(scores1)
                mean2 = np.mean(scores2)
                winner = s1 if mean1 > mean2 else s2
                significance_details = f"Significant ({winner} is better)"
            else:
                significance_details = "Not Significant"

            print(f"{s1:>10} vs. {s2:<10}: p-value = {p_value:.6f} ({significance_details})")


# =============================================================================
# 3. MAIN EXECUTION
# =============================================================================
if __name__ == '__main__':
    # Step 1: Prepare the data
    analysis_df = prepare_dataframe_for_analysis(RAW_SCORES_FILE_PATH)
    
    if not analysis_df.empty:
        # Step 2: Run the ANOVA
        run_anova(analysis_df)
        
        # Step 3: Run the post-hoc t-tests
        run_posthoc_ttests(analysis_df)
    else:
        print("\nAnalysis could not be performed due to data loading issues.")