In [1]:
import os
import pickle
import argparse
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report, roc_auc_score

def compute_average_report_across_runs(reports):
    overall_report = {
        'HC': {'precision': [], 'recall': [], 'f1-score': [], 'support': []},
        'PD': {'precision': [], 'recall': [], 'f1-score': [], 'support': []},
        'accuracy': [],
        'macro avg': {'precision': [], 'recall': [], 'f1-score': [], 'support': []},
        'weighted avg': {'precision': [], 'recall': [], 'f1-score': [], 'support': []},
    }

    # Spoiler: It's gonna be inefficient :)

    for report in reports:
        for key in report.keys():
            if key == 'accuracy':
                overall_report[key].append(report[key])
            else:
                for key2 in report[key].keys():
                    overall_report[key][key2].append(report[key][key2])

    for key in overall_report.keys():
            if key == 'accuracy':
                overall_report[key] = f'{round(np.array(overall_report[key]).mean(), 4)}±{round(np.array(overall_report[key]).std(), 4)}'
            else:
                for key2 in report[key].keys():
                    overall_report[key][key2] = f'{round(np.array(overall_report[key][key2]).mean(), 4)}±{round(np.array(overall_report[key][key2]).std(), 4)}'

    # -- just for a more clean output
    overall_report = pd.DataFrame.from_dict(overall_report).T
    overall_report.iloc[2,0] = ''
    overall_report.iloc[2,1] = ''
    overall_report.iloc[2,3] = overall_report.iloc[3,3]

    return overall_report

def get_reports(exps_dir):
    val_reports = []
    test_reports = []
    run_dirs = os.listdir(exps_dir)
    for run_dir in run_dirs:
        val_preds = []
        val_labels = []
        test_preds = []
        test_labels = []

        run_dir_path = os.path.join(exps_dir, run_dir)
        fold_dirs = os.listdir(run_dir_path)
        
        for fold_dir in fold_dirs:
            model_output_dir = os.path.join(run_dir_path, fold_dir, 'model_output')

            # -- validation set
            val_report_path = os.path.join(os.path.join(model_output_dir, 'validation_classification.pkl'))
            with open(val_report_path, 'rb') as f:
                val_model_output = pickle.load(f)

            val_preds += val_model_output['preds']
            val_labels += val_model_output['labels']

            # -- test set
            test_report_path = os.path.join(os.path.join(model_output_dir, 'test_classification.pkl'))
            with open(test_report_path, 'rb') as f:
                test_model_output = pickle.load(f)

            test_preds += test_model_output['preds']
            test_labels += test_model_output['labels']

        # -- computing reports
        val_reports.append(
            classification_report(
                val_labels,
                val_preds,
                target_names=['HC', 'PD'],
                output_dict=True,
            )
        )

        test_reports.append(
            classification_report(
                test_labels,
                test_preds,
                target_names=['HC', 'PD'],
                output_dict=True,
            )
        )

    return val_reports, test_reports


def get_report_perfold(exps_dir):

    val_fold_reports = {}
    test_fold_reports = {}
    run_dirs = sorted(os.listdir(exps_dir))
    
    for run_dir in run_dirs:
        run_dir_path = os.path.join(exps_dir, run_dir)
        fold_dirs = sorted(os.listdir(run_dir_path))
        
        val_fold_reports[run_dir] = []
        test_fold_reports[run_dir] = []
        
        for fold_dir in fold_dirs:
            model_output_dir = os.path.join(run_dir_path, fold_dir, 'model_output')
            
            # -- validation set
            val_report_path = os.path.join(model_output_dir, 'validation_classification.pkl')
            if os.path.exists(val_report_path):
                with open(val_report_path, 'rb') as f:
                    val_model_output = pickle.load(f)
                
                val_probs = [preds[1] for preds in val_model_output['probs']]
                val_model_output['probs'] = val_probs  
         
                val_auc = roc_auc_score(val_model_output['labels'], val_probs)
                
                val_fold_report = classification_report(
                    val_model_output['labels'],
                    val_model_output['preds'],
                    target_names=['HC', 'PD'],
                    output_dict=True,
                )
                
                val_fold_reports[run_dir].append({
                    'fold': fold_dir,
                    'report': val_fold_report,
                    'preds': val_model_output['preds'],
                    'labels': val_model_output['labels'],
                    'probs': val_model_output['probs'],  
                    'auc': val_auc  
                })
            
            # -- test set
            test_report_path = os.path.join(model_output_dir, 'test_classification.pkl')
            if os.path.exists(test_report_path):
                with open(test_report_path, 'rb') as f:
                    test_model_output = pickle.load(f)
                
                test_probs = [preds[1] for preds in test_model_output['probs']]
                test_model_output['probs'] = test_probs  
                
                test_auc = roc_auc_score(test_model_output['labels'], test_probs)
                
                test_fold_report = classification_report(
                    test_model_output['labels'],
                    test_model_output['preds'],
                    target_names=['HC', 'PD'],
                    output_dict=True,
                )
                
                test_fold_reports[run_dir].append({
                    'fold': fold_dir,
                    'report': test_fold_report,
                    'preds': test_model_output['preds'],
                    'labels': test_model_output['labels'],
                    'probs': test_model_output['probs'],  
                    'auc': test_auc  
                })
    
    return val_fold_reports, test_fold_reports

def compute_fold_stats(fold_reports):

    fold_stats = {}
    
    for run_dir, fold_results in fold_reports.items():
        accuracies = [fold_result['report']['accuracy'] for fold_result in fold_results]
        f1_scores = [fold_result['report']['PD']['f1-score'] for fold_result in fold_results]
        precisions = [fold_result['report']['PD']['precision'] for fold_result in fold_results]
        recalls = [fold_result['report']['PD']['recall'] for fold_result in fold_results]
        
        sensitivities = [fold_result['report']['PD']['recall'] for fold_result in fold_results]  
        specificities = [fold_result['report']['HC']['recall'] for fold_result in fold_results] 
        
        aucs = [fold_result['auc'] for fold_result in fold_results]
        
        fold_stats[run_dir] = {
            'accuracy': {
                'mean': np.mean(accuracies),
                'std': np.std(accuracies)
            },
            'f1': {
                'mean': np.mean(f1_scores),
                'std': np.std(f1_scores)
            },
            'precision': {
                'mean': np.mean(precisions),
                'std': np.std(precisions)
            },
            'recall': {
                'mean': np.mean(recalls),
                'std': np.std(recalls)
            },
            'sensitivity': {
                'mean': np.mean(sensitivities),
                'std': np.std(sensitivities)
            },
            'specificity': {
                'mean': np.mean(specificities),
                'std': np.std(specificities)
            },
            'auc': {
                'mean': np.mean(aucs),
                'std': np.std(aucs)
            }
        }
    
    return fold_stats



In [None]:
import scipy.stats as stats
from collections import defaultdict

def compare_two_exps_with_wilcoxon(exps_dir1, exps_dir2):
    """
    Compare results from two experiment directories using Wilcoxon signed-rank test
    
    Args:
        exps_dir1: First experiment directory path
        exps_dir2: Second experiment directory path
        
    Returns:
        Dictionary with p-values for each metric
    """
    # Get reports for both experiment directories
    _, test_fold_reports1 = get_report_perfold(exps_dir1)
    _, test_fold_reports2 = get_report_perfold(exps_dir2)
    
    # Store metrics per fold for both experiments
    metrics_by_fold1 = defaultdict(lambda: defaultdict(list))
    metrics_by_fold2 = defaultdict(lambda: defaultdict(list))
    
    # Process first experiment directory
    for run_dir, fold_results in test_fold_reports1.items():
        for fold_result in fold_results:
            fold = fold_result['fold']
            # Store metrics for this fold
            metrics_by_fold1[fold]['accuracy'].append(fold_result['report']['accuracy'])
            metrics_by_fold1[fold]['f1'].append(fold_result['report']['PD']['f1-score'])
            metrics_by_fold1[fold]['precision'].append(fold_result['report']['PD']['precision'])
            metrics_by_fold1[fold]['recall'].append(fold_result['report']['PD']['recall'])
            metrics_by_fold1[fold]['sensitivity'].append(fold_result['report']['PD']['recall'])
            metrics_by_fold1[fold]['specificity'].append(fold_result['report']['HC']['recall'])
            metrics_by_fold1[fold]['auc'].append(fold_result['auc'])
    
    # Process second experiment directory
    for run_dir, fold_results in test_fold_reports2.items():
        for fold_result in fold_results:
            fold = fold_result['fold']
            # Store metrics for this fold
            metrics_by_fold2[fold]['accuracy'].append(fold_result['report']['accuracy'])
            metrics_by_fold2[fold]['f1'].append(fold_result['report']['PD']['f1-score'])
            metrics_by_fold2[fold]['precision'].append(fold_result['report']['PD']['precision'])
            metrics_by_fold2[fold]['recall'].append(fold_result['report']['PD']['recall'])
            metrics_by_fold2[fold]['sensitivity'].append(fold_result['report']['PD']['recall'])
            metrics_by_fold2[fold]['specificity'].append(fold_result['report']['HC']['recall'])
            metrics_by_fold2[fold]['auc'].append(fold_result['auc'])
    
    # Calculate mean values for each fold
    metrics_to_test = ['accuracy', 'f1', 'precision', 'sensitivity', 'specificity', 'auc']
    fold_means1 = defaultdict(list)
    fold_means2 = defaultdict(list)
    
    # Make sure we only use folds that exist in both experiments
    common_folds = set(metrics_by_fold1.keys()) & set(metrics_by_fold2.keys())
    
    # Calculate means for each fold
    for fold in common_folds:
        for metric in metrics_to_test:
            if metrics_by_fold1[fold][metric] and metrics_by_fold2[fold][metric]:
                fold_means1[metric].append(np.mean(metrics_by_fold1[fold][metric]))
                fold_means2[metric].append(np.mean(metrics_by_fold2[fold][metric]))
    
    # print(f"Number of common folds: {len(common_folds)}")
    # print(f"Metrics to test: {metrics_to_test}")
    # print(f"Fold means for experiment 1: {fold_means1}")
    # print(f"Fold means for experiment 2: {fold_means2}")
    
    # Run Wilcoxon signed-rank test for each metric
    results = {}
    for metric in metrics_to_test:
        if len(fold_means1[metric]) >= 5:  # Need at least 5 pairs for reliable results
            statistic, p_value = stats.wilcoxon(fold_means1[metric], fold_means2[metric])
            results[metric] = {
                'p_value': p_value,
                'significant': p_value < 0.05,
                'means1': np.mean(fold_means1[metric]),
                'means2': np.mean(fold_means2[metric]),
                'difference': np.mean(fold_means1[metric]) - np.mean(fold_means2[metric])
            }
    
    return results

# Example usage
exps_dir1 = "/home/yzhong/gits/interpretable-pd/exps/gita_splits2/M3/combined_set/"
exps_dir2 = "/home/yzhong/gits/interpretable-pd/exps/gita_splits2/M4/combined_set/"


results = compare_two_exps_with_wilcoxon(exps_dir1, exps_dir2)

# Print results in a nice format
print(f"Statistical comparison between: \n- {exps_dir1.strip('/').split('/')[-2]} and \n- {exps_dir2.strip('/').split('/')[-2]}\n")
print("Wilcoxon Signed-Rank Test Results:")
for metric, result in results.items():
    significance = "* significant *" if result['significant'] else "not significant"
    direction = ">" if result['difference'] > 0 else "<"
    print(f"{metric.upper()}: {result['means1']:.4f} {direction} {result['means2']:.4f}, p={result['p_value']:.4f} ({significance})")

Statistical comparison between: 
- Mallsent and 
- Mallsent

Wilcoxon Signed-Rank Test Results:
ACCURACY: 0.7943 > 0.7601, p=0.0039 (* significant *)
F1: 0.7924 > 0.7588, p=0.0039 (* significant *)
PRECISION: 0.8009 > 0.7678, p=0.0098 (* significant *)
SENSITIVITY: 0.7882 > 0.7562, p=0.0039 (* significant *)
SPECIFICITY: 0.8005 > 0.7641, p=0.0195 (* significant *)
AUC: 0.8445 > 0.8226, p=0.0195 (* significant *)
