In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.stats import norm
from collections import defaultdict

# Sklearn imports
from sklearn.model_selection import train_test_split
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, 
                             roc_auc_score, roc_curve, brier_score_loss)
from sklearn.calibration import calibration_curve
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.ensemble import (AdaBoostClassifier, BaggingClassifier, 
                              GradientBoostingClassifier, ExtraTreesClassifier)
from sklearn.utils import resample

# Boosting libraries
import xgboost as xgb
import catboost as cb
import lightgbm as lgb

import warnings
warnings.filterwarnings('ignore')

# ================= 配置区域 (Configuration) =================
# 1. 设置 Times New Roman 字体
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman']
plt.rcParams['axes.unicode_minus'] = False

# 2. 设置 PDF 导出为矢量格式 (Type 42)
plt.rcParams['pdf.fonttype'] = 42
plt.rcParams['ps.fonttype'] = 42

# 3. 设置文件路径
input_path = "D:/卒中预后第一篇论文/2025.11.25-new work/3.模型比较/"
output_path = "D:/卒中预后第一篇论文/2025.11.25-new work/3.模型比较/"

# 确保输出目录存在
os.makedirs(output_path, exist_ok=True)
# ===========================================================

# 定义分类器列表
classifiers = [
    ('XGBoost', xgb.XGBClassifier(
        learning_rate=0.01, max_delta_step=1, sampling_method='uniform',
        random_state=30, eval_metric='logloss', use_label_encoder=False
    )),
    ('Catboost', cb.CatBoostClassifier(
        random_state=30, verbose=0, depth=4, learning_rate=0.01,
        l2_leaf_reg=100, subsample=0.7, random_strength=5,
        early_stopping_rounds=20, allow_writing_files=False
    )),
    ('LightGBM', lgb.LGBMClassifier(
        random_state=30, num_leaves=32, min_child_samples=100,
        learning_rate=0.02, reg_alpha=1, reg_lambda=5, feature_fraction=0.7,
        bagging_fraction=0.8, extra_trees=True, n_estimators=200, verbose=-1
    )),
    ('Logistic Regression', LogisticRegression(random_state=30, max_iter=1000)),
    ('Decision Tree', DecisionTreeClassifier(random_state=30)),
    ('K Neighbors', KNeighborsClassifier()),
    ('Naive Bayes', GaussianNB()),
    ('MLP Classifier', MLPClassifier(random_state=30, max_iter=1000)),
    ('Gaussian Process', GaussianProcessClassifier(random_state=30)),
    ('QDA', QuadraticDiscriminantAnalysis()),
    ('AdaBoost', AdaBoostClassifier(random_state=30)),
    ('Bagging', BaggingClassifier(random_state=30)),
    ('Gradient Boosting', GradientBoostingClassifier(random_state=30)),
    ('Extra Trees', ExtraTreesClassifier(
        random_state=30, n_estimators=300, max_depth=8, min_samples_split=10,
        min_samples_leaf=4, max_features="sqrt", bootstrap=True, ccp_alpha=0.03, n_jobs=-1
    ))
]

# ================= 核心计算与绘图函数 =================

def setup_plot_for_editable_pdf(figsize=(10, 8)):
    """初始化绘图，确保图层正确"""
    fig, ax = plt.subplots(figsize=figsize)
    ax.set_axisbelow(True)
    return fig, ax

# --- 1. DCA (决策曲线) 相关函数 ---
def calculate_net_benefit(y_true, y_prob, thresholds):
    """计算净收益 Net Benefit"""
    net_benefits = []
    y_true = np.array(y_true)
    y_prob = np.array(y_prob)
    n = len(y_true)
    
    for thresh in thresholds:
        # 预测为阳性 (Risky)
        y_pred = y_prob >= thresh
        
        tp = np.sum((y_true == 1) & (y_pred == 1))
        fp = np.sum((y_true == 0) & (y_pred == 1))
        
        # Net Benefit 公式: (TP/N) - (FP/N) * (pt / (1-pt))
        if thresh == 1.0:
            nb = 0 
        else:
            weight = thresh / (1 - thresh)
            nb = (tp / n) - (fp / n) * weight
        net_benefits.append(nb)
    return np.array(net_benefits)

def plot_decision_curves(dca_data, prevalence, filename, cohort_name="Internal"):
    """绘制决策曲线 (DCA)"""
    fig, ax = setup_plot_for_editable_pdf((10, 8))
    
    # 阈值范围 1% 到 99%
    thresholds = np.linspace(0.01, 0.99, 99)
    
    # 1. Plot Treat All (All Positive)
    # NB_all = Prevalence - (1-Prevalence)* (pt/(1-pt))
    treat_all = prevalence - (1 - prevalence) * thresholds / (1 - thresholds)
    ax.plot(thresholds, treat_all, linestyle=':', color='gray', label='Treat All', linewidth=1.5)
    
    # 2. Plot Treat None (All Negative, NB=0)
    ax.plot(thresholds, np.zeros_like(thresholds), linestyle='-', color='black', label='Treat None', linewidth=1.5)
    
    # 3. Plot Models
    colors = plt.cm.tab20(np.linspace(0, 1, len(dca_data)))
    for (model_name, data), color in zip(dca_data.items(), colors):
        y_true_local = data['y_true']
        y_prob_local = data['y_prob']
        
        nb = calculate_net_benefit(y_true_local, y_prob_local, thresholds)
        ax.plot(thresholds, nb, color=color, label=model_name, linewidth=2)
        
    # 动态设置 Y 轴上限 (Prevalence + buffer)
    y_max = max(prevalence, 0.1) + 0.15 
    ax.set_ylim([-0.05, y_max]) 
    ax.set_xlim([0, 1.0])
    
    ax.set_xlabel('Threshold Probability', fontsize=14)
    ax.set_ylabel('Net Benefit', fontsize=14)
    ax.set_title(f'Decision Curve Analysis - {cohort_name}', fontsize=16, fontweight='bold', pad=15)
    
    ax.legend(loc='upper right', fontsize=10, frameon=True)
    ax.grid(True, linestyle=':', alpha=0.6)
    
    save_path = os.path.join(output_path, f"{filename}_{cohort_name.lower()}.pdf")
    plt.savefig(save_path, format='pdf', bbox_inches='tight')
    plt.close()

# --- 2. ROC 曲线相关函数 ---
def plot_combined_roc_curves_with_ci(roc_data, title_suffix, filename):
    fig, ax = setup_plot_for_editable_pdf((10, 8))
    ax.plot([0, 1], [0, 1], 'k--', alpha=0.6, label='Chance Level (AUC = 0.500)')
    colors = plt.cm.tab20(np.linspace(0, 1, len(roc_data)))
    
    for (model_name, (fpr, tpr, auc_score, ci_lower, ci_upper)), color in zip(roc_data.items(), colors):
        if not np.isnan(auc_score):
            label = f'{model_name} (AUC = {auc_score:.3f} [{ci_lower:.3f}-{ci_upper:.3f}])'
            ax.plot(fpr, tpr, label=label, linewidth=2, color=color)
    
    ax.set_xlim([-0.02, 1.02])
    ax.set_ylim([-0.02, 1.05])
    ax.set_xlabel('1 - Specificity (False Positive Rate)', fontsize=14)
    ax.set_ylabel('Sensitivity (True Positive Rate)', fontsize=14)
    ax.set_title(f'ROC Curves - {title_suffix}', fontsize=16, fontweight='bold', pad=15)
    ax.legend(loc='lower right', fontsize=10, frameon=True)
    ax.grid(True, linestyle=':', alpha=0.6)
    
    save_path = os.path.join(output_path, f"{filename}.pdf")
    plt.savefig(save_path, format='pdf', bbox_inches='tight')
    plt.close()

# --- 3. 校准曲线相关函数 ---
def plot_enhanced_calibration_curves(calibration_data, title_suffix, filename, cohort_name="Internal"):
    fig, ax = setup_plot_for_editable_pdf((10, 8))
    ax.plot([0, 1], [0, 1], 'k--', alpha=0.6, linewidth=1.5, label='Perfectly Calibrated')
    colors = plt.cm.tab20(np.linspace(0, 1, len(calibration_data)))
    
    for (model_name, data), color in zip(calibration_data.items(), colors):
        prob_true, prob_pred = data['calibration_curve']
        brier = data.get('brier_score', np.nan)
        label_str = f'{model_name} (Brier: {brier:.3f})'
        ax.plot(prob_pred, prob_true, 's-', color=color, label=label_str, markersize=5, linewidth=1.5)
    
    ax.set_xlim([-0.02, 1.02])
    ax.set_ylim([-0.02, 1.05])
    ax.set_xlabel('Predicted Probability', fontsize=14)
    ax.set_ylabel('Observed Fraction', fontsize=14)
    ax.set_title(f'Calibration Curves - {cohort_name} Cohort', fontsize=16, fontweight='bold', pad=15)
    ax.legend(loc='best', fontsize=10, frameon=True)
    ax.grid(True, linestyle=':', alpha=0.6)
    
    save_path = os.path.join(output_path, f"{filename}_{cohort_name.lower()}.pdf")
    plt.savefig(save_path, format='pdf', bbox_inches='tight')
    plt.close()

# ================= 统计与指标计算函数 =================
def calculate_calibration_metrics(y_true, y_prob, n_bins=10):
    brier_score = brier_score_loss(y_true, y_prob)
    prob_true, prob_pred = calibration_curve(y_true, y_prob, n_bins=n_bins, strategy='quantile')
    try:
        epsilon = 1e-10
        y_prob_clipped = np.clip(y_prob, epsilon, 1-epsilon)
        logit_pred = np.log(y_prob_clipped / (1 - y_prob_clipped)).reshape(-1, 1)
        cal_model = LogisticRegression(fit_intercept=True, solver='liblinear')
        cal_model.fit(logit_pred, y_true)
        calibration_slope = cal_model.coef_[0][0]
        calibration_intercept = cal_model.intercept_[0]
    except:
        calibration_slope, calibration_intercept = np.nan, np.nan
    
    return {'brier_score': brier_score, 'calibration_slope': calibration_slope,
            'calibration_intercept': calibration_intercept, 'calibration_curve': (prob_true, prob_pred)}

def calculate_comprehensive_metrics(y_true, y_pred, y_prob=None, n_bootstrap=1000, alpha=0.05):
    """
    计算所有指标及其 95% 置信区间 (Bootstrap法)
    """
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    if y_prob is not None: y_prob = np.array(y_prob)
    
    # 辅助函数：计算单次迭代的指标
    def compute_single_pass(y_t, y_p, y_pr):
        m = {}
        m['accuracy'] = accuracy_score(y_t, y_p)
        m['precision'] = precision_score(y_t, y_p, average='macro', zero_division=0)
        m['recall'] = recall_score(y_t, y_p, average='macro', zero_division=0)
        m['f1_macro'] = f1_score(y_t, y_p, average='macro', zero_division=0)
        if y_pr is not None and len(np.unique(y_t)) == 2:
            try:
                m['auc'] = roc_auc_score(y_t, y_pr)
                m['brier_score'] = brier_score_loss(y_t, y_pr)
            except: m['auc'], m['brier_score'] = np.nan, np.nan
        else: m['auc'], m['brier_score'] = np.nan, np.nan
        return m

    # 1. 计算点估计值
    point_metrics = compute_single_pass(y_true, y_pred, y_prob)
    # 校准斜率/截距通常不进行 Bootstrap
    if y_prob is not None and len(np.unique(y_true)) == 2:
        point_metrics.update(calculate_calibration_metrics(y_true, y_prob))
    
    # 2. Bootstrap 计算置信区间
    boot_scores = defaultdict(list)
    for i in range(n_bootstrap):
        indices = resample(range(len(y_true)), replace=True, random_state=i)
        # 确保 Bootstrap 样本中包含两个类别
        if len(np.unique(y_true[indices])) < 2: continue
        
        try:
            m = compute_single_pass(y_true[indices], y_pred[indices], y_prob[indices] if y_prob is not None else None)
            for k, v in m.items():
                if not np.isnan(v): boot_scores[k].append(v)
        except: continue
            
    final_metrics = point_metrics.copy()
    # 计算 95% CI
    for metric in ['accuracy', 'precision', 'recall', 'f1_macro', 'auc', 'brier_score']:
        scores = boot_scores.get(metric, [])
        if scores:
            final_metrics[f'{metric}_ci_lower'] = np.percentile(scores, 100 * alpha / 2)
            final_metrics[f'{metric}_ci_upper'] = np.percentile(scores, 100 * (1 - alpha / 2))
        else:
            final_metrics[f'{metric}_ci_lower'], final_metrics[f'{metric}_ci_upper'] = np.nan, np.nan
            
    return final_metrics

def delong_test(y_true, y_prob1, y_prob2):
    y_true = np.array(y_true).reshape(-1)
    y_prob1 = np.array(y_prob1).reshape(-1)
    y_prob2 = np.array(y_prob2).reshape(-1)
    auc1 = roc_auc_score(y_true, y_prob1)
    auc2 = roc_auc_score(y_true, y_prob2)

    def compute_covariance(y_true, p1, p2):
        idx_pos = y_true == 1
        idx_neg = y_true == 0
        V10_1, V01_1 = np.zeros(idx_pos.sum()), np.zeros(idx_neg.sum())
        V10_2, V01_2 = np.zeros(idx_pos.sum()), np.zeros(idx_neg.sum())
        p1_pos, p1_neg = p1[idx_pos], p1[idx_neg]
        p2_pos, p2_neg = p2[idx_pos], p2[idx_neg]
        
        for i in range(len(p1_pos)): V10_1[i] = np.mean(p1_pos[i] > p1_neg) + 0.5 * np.mean(p1_pos[i] == p1_neg)
        for j in range(len(p1_neg)): V01_1[j] = np.mean(p1_pos > p1_neg[j]) + 0.5 * np.mean(p1_pos == p1_neg[j])
        for i in range(len(p2_pos)): V10_2[i] = np.mean(p2_pos[i] > p2_neg) + 0.5 * np.mean(p2_pos[i] == p2_neg)
        for j in range(len(p2_neg)): V01_2[j] = np.mean(p2_pos > p2_neg[j]) + 0.5 * np.mean(p2_pos == p2_neg[j])

        var1 = (np.var(V10_1, ddof=1)/idx_pos.sum() + np.var(V01_1, ddof=1)/idx_neg.sum())
        var2 = (np.var(V10_2, ddof=1)/idx_pos.sum() + np.var(V01_2, ddof=1)/idx_neg.sum())
        cov_12 = (np.cov(V10_1, V10_2)[0, 1]/idx_pos.sum() + np.cov(V01_1, V01_2)[0, 1]/idx_neg.sum())
        return var1, var2, cov_12

    var1, var2, cov_12 = compute_covariance(y_true, y_prob1, y_prob2)
    z = (auc1 - auc2) / np.sqrt(var1 + var2 - 2 * cov_12 + 1e-8)
    return z, 2 * (1 - norm.cdf(abs(z)))

def load_external_cohorts():
    external_data = {}
    external_files = {
        'External_Cohort': os.path.join(input_path, "外部验证集-量子计算.xlsx"),
    }
    for cohort_name, file_path in external_files.items():
        if os.path.exists(file_path):
            try:
                data = pd.read_excel(file_path)
                external_data[cohort_name] = data
                print(f"Loaded external cohort: {cohort_name}")
            except Exception as e: print(f"Failed to load {cohort_name}: {e}")
        else: print(f"External file not found: {file_path}")
    return external_data

def save_comprehensive_results_to_excel(internal_results, external_results, delong_results, filename):
    filepath = os.path.join(output_path, filename)
    
    def format_metrics(metrics_dict):
        """Helper to format row data with CIs"""
        row = {}
        for k in ['auc', 'accuracy', 'precision', 'recall', 'f1_macro', 'brier_score']:
            val = metrics_dict.get(k)
            low = metrics_dict.get(f'{k}_ci_lower')
            high = metrics_dict.get(f'{k}_ci_upper')
            
            row[f'{k.upper()}'] = val
            row[f'{k.upper()}_95CI_Low'] = low
            row[f'{k.upper()}_95CI_High'] = high
            
            # 创建一个组合字符串列，方便直接复制到论文
            if val is not None and low is not None:
                row[f'{k.upper()}_Str'] = f"{val:.3f} ({low:.3f}-{high:.3f})"
                
        row['Slope'] = metrics_dict.get('calibration_slope')
        row['Intercept'] = metrics_dict.get('calibration_intercept')
        return row

    try:
        with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
            # 保存内部验证结果
            int_summary = []
            for m_name, res in internal_results.items():
                row = {'Model': m_name, 'Cohort': 'Internal'}
                row.update(format_metrics(res['test_metrics']))
                int_summary.append(row)
            if int_summary: 
                pd.DataFrame(int_summary).to_excel(writer, sheet_name='Internal_Results', index=False)
            
            # 保存外部验证结果
            ext_summary = []
            for c_name, c_res in external_results.items():
                for m_name, res in c_res.items():
                    row = {'Model': m_name, 'Cohort': c_name}
                    row.update(format_metrics(res['metrics']))
                    ext_summary.append(row)
            if ext_summary: 
                pd.DataFrame(ext_summary).to_excel(writer, sheet_name='External_Results', index=False)
            
            # 保存 DeLong 测试结果
            if delong_results:
                d_data = [{'Comparison': k, 'Z': v['z_statistic'], 'P_Value': v['p_value'], 'Sig': v['significant']} 
                          for k, v in delong_results.items()]
                pd.DataFrame(d_data).to_excel(writer, sheet_name='DeLong_Tests', index=False)
                
        print(f"Results saved to: {filepath}")
    except Exception as e: print(f"Error saving Excel: {e}")

# ---------------------------- 主程序 (Main) ----------------------------
if __name__ == "__main__":
    print("Starting Full Analysis (Internal & External Validation with ROC/DCA/CI)...")
    
    # 1. 加载内部数据
    internal_file = os.path.join(input_path, "训练集-量子计算.xlsx")
    if not os.path.exists(internal_file):
        print(f"Error: File not found {internal_file}")
        exit(1)
    
    data = pd.read_excel(internal_file)
    if 'Prognosis' not in data.columns:
        print("Error: 'Prognosis' column not found in internal data.")
        exit(1)

    X = data.drop(columns=['Prognosis'])
    y = data['Prognosis']
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=30, stratify=y)
    
    # 结果容器
    comprehensive_results = {}
    trained_models = {}
    
    # 内部绘图数据容器
    internal_roc_data = {}
    internal_cal_data = {}
    internal_dca_data = {} 
    prob_dict = {}
    
    # 2. 训练与内部验证
    print("\n--- Training Models & Internal Validation ---")
    for name, model in classifiers:
        try:
            print(f"Training: {name}")
            model.fit(X_train, y_train)
            trained_models[name] = model
            
            # 预测
            y_pred = model.predict(X_test)
            y_prob = None
            if hasattr(model, "predict_proba"):
                y_prob = model.predict_proba(X_test)[:, 1]
                prob_dict[name] = y_prob
            
            # 计算全指标 (含CI)
            metrics = calculate_comprehensive_metrics(y_test, y_pred, y_prob, n_bootstrap=1000)
            comprehensive_results[name] = {'test_metrics': metrics, 'model': model}
            
            if y_prob is not None:
                # 收集内部 ROC 数据
                fpr, tpr, _ = roc_curve(y_test, y_prob)
                internal_roc_data[name] = (fpr, tpr, metrics.get('auc'), 
                                           metrics.get('auc_ci_lower'), metrics.get('auc_ci_upper'))
                
                # 收集内部校准数据
                if 'calibration_curve' in metrics:
                    internal_cal_data[name] = metrics
                else:
                    internal_cal_data[name] = calculate_calibration_metrics(y_test, y_prob)
                
                # 收集内部 DCA 数据
                internal_dca_data[name] = {'y_true': y_test, 'y_prob': y_prob}
                
        except Exception as e:
            print(f"Failed {name}: {e}")

    # 3. 绘制内部图表 (ROC, Calibration, DCA)
    if internal_roc_data:
        plot_combined_roc_curves_with_ci(internal_roc_data, "Internal Cohort", "ROC_Internal")
    if internal_cal_data:
        plot_enhanced_calibration_curves(internal_cal_data, "", "Calibration_Internal", "Internal")
    if internal_dca_data:
        # 计算内部患病率 (用于 DCA 的 Treat All 线)
        prevalence_internal = np.mean(y_test)
        plot_decision_curves(internal_dca_data, prevalence_internal, "DCA_Curve", "Internal")

    # 4. 外部验证 (External Validation)
    print("\n--- External Validation ---")
    external_data_dict = load_external_cohorts()
    external_results = {}
    
    if external_data_dict:
        for cohort_name, df_ext in external_data_dict.items():
            if 'Prognosis' not in df_ext.columns: 
                print(f"Skipping {cohort_name}: Missing 'Prognosis' column")
                continue
            
            print(f"Processing {cohort_name}...")
            X_ext = df_ext.drop(columns=['Prognosis'])
            y_ext = df_ext['Prognosis']
            
            cohort_results = {}
            
            # 外部绘图数据容器
            ext_roc_data = {}
            ext_cal_data = {}
            ext_dca_data = {}
            
            for name, model in trained_models.items():
                try:
                    y_pred_ext = model.predict(X_ext)
                    y_prob_ext = None
                    if hasattr(model, "predict_proba"):
                        y_prob_ext = model.predict_proba(X_ext)[:, 1]
                    
                    # 计算外部全指标 (含CI)
                    metrics = calculate_comprehensive_metrics(y_ext, y_pred_ext, y_prob_ext, n_bootstrap=1000)
                    cohort_results[name] = {'metrics': metrics}
                    
                    if y_prob_ext is not None:
                        # 收集外部 ROC 数据
                        fpr, tpr, _ = roc_curve(y_ext, y_prob_ext)
                        ext_roc_data[name] = (fpr, tpr, metrics.get('auc'),
                                              metrics.get('auc_ci_lower'), metrics.get('auc_ci_upper'))
                        
                        # 收集外部校准数据
                        if 'calibration_curve' in metrics:
                            ext_cal_data[name] = metrics
                        else:
                            ext_cal_data[name] = calculate_calibration_metrics(y_ext, y_prob_ext)
                            
                        # 收集外部 DCA 数据
                        ext_dca_data[name] = {'y_true': y_ext, 'y_prob': y_prob_ext}
                        
                except Exception as e:
                    print(f"Error evaluating {name} on {cohort_name}: {e}")
            
            external_results[cohort_name] = cohort_results
            
            # 绘制外部图表 (ROC, Calibration, DCA)
            if ext_roc_data:
                plot_combined_roc_curves_with_ci(ext_roc_data, f"{cohort_name}", f"ROC_{cohort_name}")
            if ext_cal_data:
                plot_enhanced_calibration_curves(ext_cal_data, "", f"Calibration_{cohort_name}", cohort_name)
            if ext_dca_data:
                prevalence_ext = np.mean(y_ext)
                plot_decision_curves(ext_dca_data, prevalence_ext, "DCA_Curve", cohort_name)

    # 5. DeLong 检验与 Excel 保存
    delong_results = perform_delong_tests(y_test, prob_dict)
    save_comprehensive_results_to_excel(comprehensive_results, external_results, 
                                      delong_results, "Model_Evaluation_Full.xlsx")
    
    print("\nAnalysis Complete.")
    print(f"Results saved to: {output_path}")
    print("Generated files include: Excel metrics, Internal/External ROC plots, DCA plots, and Calibration plots.")

In [None]:
# ==========================================================================
# 将此代码块添加到原代码的最后，用于保存预测概率，供后续 DeLong 脚本独立使用
# ==========================================================================

def save_predictions_for_delong(y_true, prob_dict, filename):
    """将真实标签和各模型的预测概率保存为Excel"""
    df = pd.DataFrame({'True_Label': y_true})
    # 将字典中的概率合并进来
    for model_name, probs in prob_dict.items():
        if probs is not None:
            df[model_name] = probs
    
    save_file = os.path.join(output_path, filename)
    df.to_excel(save_file, index=False)
    print(f"Predictions saved for DeLong test: {save_file}")

# 1. 保存内部验证集预测结果
if prob_dict:
    save_predictions_for_delong(y_test, prob_dict, "Predictions_Internal.xlsx")

# 2. 保存外部验证集预测结果
if external_results:
    for cohort_name, data in load_external_cohorts().items():
        if 'Prognosis' not in data.columns: continue
        
        X_ext = data.drop(columns=['Prognosis'])
        y_ext = data['Prognosis']
        
        ext_prob_dict = {}
        for name, model in trained_models.items():
            if hasattr(model, "predict_proba"):
                ext_prob_dict[name] = model.predict_proba(X_ext)[:, 1]
        
        save_predictions_for_delong(y_ext, ext_prob_dict, f"Predictions_{cohort_name}.xlsx")

In [None]:
import os
import pandas as pd
import numpy as np
from scipy.stats import norm
from sklearn.metrics import roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns

# ================= 配置区域 (Configuration) =================
# 请确保此路径是正确的，并且文件是通过第一步代码生成的
target_file_path = "D:/卒中预后第一篇论文/2025.11.25-new work/3.模型比较/Predictions_External_Cohort.xlsx"
output_path = os.path.dirname(target_file_path)

# 绘图设置
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['pdf.fonttype'] = 42
# ===========================================================

def delong_test_stat(y_true, y_prob1, y_prob2):
    """计算 DeLong Z 统计量和 P 值"""
    y_true = np.array(y_true).reshape(-1)
    y_prob1 = np.array(y_prob1).reshape(-1)
    y_prob2 = np.array(y_prob2).reshape(-1)

    auc1 = roc_auc_score(y_true, y_prob1)
    auc2 = roc_auc_score(y_true, y_prob2)

    def compute_covariance(y_true, p1, p2):
        idx_pos = y_true == 1
        idx_neg = y_true == 0
        p1_pos, p1_neg = p1[idx_pos], p1[idx_neg]
        p2_pos, p2_neg = p2[idx_pos], p2[idx_neg]
        n_pos, n_neg = len(p1_pos), len(p1_neg)

        def get_structural_components(p_pos, p_neg):
            V10 = np.zeros(n_pos)
            V01 = np.zeros(n_neg)
            for i in range(n_pos):
                V10[i] = np.mean(p_pos[i] > p_neg) + 0.5 * np.mean(p_pos[i] == p_neg)
            for j in range(n_neg):
                V01[j] = np.mean(p_pos > p_neg[j]) + 0.5 * np.mean(p_pos == p_neg[j])
            return V10, V01

        V10_1, V01_1 = get_structural_components(p1_pos, p1_neg)
        V10_2, V01_2 = get_structural_components(p2_pos, p2_neg)

        var1 = (np.var(V10_1, ddof=1) / n_pos + np.var(V01_1, ddof=1) / n_neg)
        var2 = (np.var(V10_2, ddof=1) / n_pos + np.var(V01_2, ddof=1) / n_neg)
        cov_12 = (np.cov(V10_1, V10_2)[0, 1] / n_pos + np.cov(V01_1, V01_2)[0, 1] / n_neg)
        return var1, var2, cov_12

    var1, var2, cov_12 = compute_covariance(y_true, y_prob1, y_prob2)
    
    sigma = np.sqrt(var1 + var2 - 2 * cov_12 + 1e-8)
    z = (auc1 - auc2) / sigma
    p_value = 2 * (1 - norm.cdf(abs(z)))
    
    return z, p_value, auc1, auc2

def plot_directional_heatmap(df_probs, title, filename):
    """绘制带有方向性的热图"""
    y_true = df_probs['True_Label'].values
    models = [c for c in df_probs.columns if c != 'True_Label']
    
    # 按 AUC 从高到低排序模型
    auc_scores = {m: roc_auc_score(y_true, df_probs[m]) for m in models}
    models = sorted(models, key=lambda x: auc_scores[x], reverse=True)
    
    n_models = len(models)
    z_matrix = pd.DataFrame(np.nan, index=models, columns=models)
    text_matrix = pd.DataFrame("", index=models, columns=models)
    
    print(f"Computing pairwise tests for {n_models} models...")
    
    for i, m1 in enumerate(models):
        for j, m2 in enumerate(models):
            if i == j: continue
            
            z, p, _, _ = delong_test_stat(y_true, df_probs[m1], df_probs[m2])
            z_matrix.loc[m1, m2] = z
            
            # 设置文本：P值，显著则加星号
            p_str = "<0.001" if p < 0.001 else f"{p:.3f}"
            if p < 0.05: p_str += "*"
            text_matrix.loc[m1, m2] = p_str

    plt.figure(figsize=(14, 12))
    
    # 设置颜色范围
    max_z = np.nanmax(np.abs(z_matrix.values))
    limit = max(3, max_z)
    
    # 修复了之前的报错：确保 cbar_kws 中的字符串在同一行，且不过长
    heatmap_label = "Z-Score (Red: Row > Col, Blue: Row < Col)"
    
    ax = sns.heatmap(z_matrix, 
                     annot=text_matrix, 
                     fmt="", 
                     cmap='RdBu_r', 
                     center=0,
                     vmin=-limit, vmax=limit,
                     square=True, 
                     linewidths=.5, 
                     cbar_kws={"label": heatmap_label})
    
    plt.title(f'Pairwise DeLong Test Comparison\n({title})', fontsize=16, fontweight='bold', pad=20)
    plt.xlabel("Comparison Model (Column)", fontsize=12)
    plt.ylabel("Reference Model (Row)", fontsize=12)
    plt.xticks(rotation=45, ha='right', fontsize=10)
    plt.yticks(rotation=0, fontsize=10)
    
    # 添加底部注解
    plt.figtext(0.5, 0.01, "* P < 0.05 | Red: Row Better | Blue: Row Worse", 
                ha="center", fontsize=10, bbox={"facecolor":"white", "alpha":0.8, "pad":5})
    
    save_file = os.path.join(output_path, filename)
    plt.tight_layout()
    plt.savefig(save_file, format='pdf', bbox_inches='tight')
    plt.close()
    print(f"Heatmap saved to: {save_file}")

def main():
    if not os.path.exists(target_file_path):
        print(f"Error: File not found {target_file_path}")
        return

    print(f"Loading predictions from: {target_file_path}")
    df = pd.read_excel(target_file_path)
    base_name = os.path.basename(target_file_path).replace('.xlsx', '')
    plot_directional_heatmap(df, base_name, f"{base_name}_DeLong_Heatmap.pdf")

if __name__ == "__main__":
    main()

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings

# Sklearn & Models
from sklearn.metrics import (roc_curve, auc, roc_auc_score, f1_score, accuracy_score, 
                             precision_score, recall_score, confusion_matrix, brier_score_loss)
from sklearn.model_selection import train_test_split
from sklearn.base import clone
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import AdaBoostClassifier, ExtraTreesClassifier
from sklearn.calibration import calibration_curve

# Boosting Libraries
import xgboost as xgb
import catboost as cb
import lightgbm as lgb

# 忽略警告
warnings.filterwarnings('ignore')

# 设置绘图风格
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman']

# ======================== 1. 基础计算与指标函数 ========================

def bootstrap_ci(y_true, y_prob, metric_func, n_bootstraps=1000, alpha=0.05, **kwargs):
    """Bootstrap计算95% CI"""
    rng = np.random.RandomState(42)
    y_true = np.array(y_true)
    y_prob = np.array(y_prob)
    scores = []
    
    for i in range(n_bootstraps):
        indices = rng.randint(0, len(y_prob), len(y_prob))
        if len(np.unique(y_true[indices])) < 2: continue
        try:
            scores.append(metric_func(y_true[indices], y_prob[indices], **kwargs))
        except: continue
            
    if not scores: return np.nan, np.nan
    scores = np.sort(scores)
    return np.percentile(scores, alpha/2*100), np.percentile(scores, (1-alpha/2)*100)

def calculate_metrics(y_true, y_prob, threshold):
    """计算指定阈值下的单次指标 (不含CI，用于循环中快速计算权重)"""
    y_pred = (y_prob >= threshold).astype(int)
    
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()
    
    acc = accuracy_score(y_true, y_pred)
    sen = recall_score(y_true, y_pred, zero_division=0)
    spe = tn / (tn + fp + 1e-10)
    ppv = precision_score(y_true, y_pred, zero_division=0)
    npv = tn / (tn + fn + 1e-10)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    
    # AUC 和 Brier 与阈值无关，但在表中展示作为参考
    auc_val = roc_auc_score(y_true, y_prob) if len(np.unique(y_true)) > 1 else np.nan
    
    return {
        'AUC': auc_val, 'F1': f1, 'ACC': acc, 
        'SEN': sen, 'SPE': spe, 'PPV': ppv, 'NPV': npv
    }

def get_full_metrics_with_ci(y_true, y_prob, threshold):
    """计算完整指标含CI (用于最终报告)"""
    base = calculate_metrics(y_true, y_prob, threshold)
    
    # 补充 Brier
    base['Brier'] = brier_score_loss(y_true, y_prob)
    
    # 计算 AUC CI
    low, high = bootstrap_ci(y_true, y_prob, roc_auc_score, n_bootstraps=500)
    base['AUC_95CI'] = f"{base['AUC']:.3f} ({low:.3f}-{high:.3f})"
    
    # 计算 Brier CI
    b_low, b_high = bootstrap_ci(y_true, y_prob, brier_score_loss, n_bootstraps=500)
    base['Brier_95CI'] = f"{base['Brier']:.3f} ({b_low:.3f}-{b_high:.3f})"
    
    return base

# ======================== 2. CRITIC 权重计算 ========================

def critic_weighting(metrics_df):
    """
    基于当前指标数据框计算 CRITIC 权重
    metrics_df: 行是模型，列是指标 (AUC, F1, ACC, SEN, SPE)
    """
    df = metrics_df.copy()
    # 选取用于权重的指标 (正向指标)
    indicators = ['AUC', 'F1', 'ACC', 'SEN', 'SPE']
    valid_inds = [c for c in indicators if c in df.columns]
    
    # 获取数据矩阵
    Z = df[valid_inds].values
    
    # 极差归一化
    ptp = Z.max(axis=0) - Z.min(axis=0)
    ptp[ptp == 0] = 1e-8 # 防止除零
    Z_norm = (Z - Z.min(axis=0)) / ptp
    
    # 标准差 (对比强度)
    std_dev = np.std(Z_norm, axis=0)
    
    # 相关系数 (冲突性)
    with np.errstate(invalid='ignore'):
        corr_matrix = np.corrcoef(Z_norm, rowvar=False)
    if np.isnan(corr_matrix).any(): 
        # 如果某列全是0(如阈值极端时)，相关系数无法计算，设冲突系数为1(无冲突信息)
        conflict = np.ones(len(valid_inds))
    else:
        conflict = np.sum(1 - np.abs(corr_matrix), axis=1)
    
    # 信息量
    info = std_dev * conflict
    
    # 指标权重
    w_j = info / (np.sum(info) + 1e-10)
    
    # 模型综合得分
    model_scores = np.sum(Z_norm * w_j, axis=1)
    
    # 归一化为模型权重
    final_model_weights = model_scores / (np.sum(model_scores) + 1e-10)
    
    return dict(zip(df['Classifier'], final_model_weights))

def get_ensemble_probs(probs_dict, weights):
    """根据权重组合概率"""
    ensemble_probs = np.zeros_like(list(probs_dict.values())[0])
    total_weight = 0
    for name, w in weights.items():
        if name in probs_dict:
            ensemble_probs += w * probs_dict[name]
            total_weight += w
    return ensemble_probs / (total_weight + 1e-10)

# ======================== 3. 绘图函数 ========================

def plot_roc_curves(y_true, probs_dict, save_path, title_suffix=""):
    plt.figure(figsize=(10, 8))
    colors = plt.cm.tab10(np.linspace(0, 1, len(probs_dict)))
    
    for i, (name, probs) in enumerate(probs_dict.items()):
        fpr, tpr, _ = roc_curve(y_true, probs)
        auc_val = roc_auc_score(y_true, probs)
        plt.plot(fpr, tpr, color=colors[i], lw=2, label=f'{name} (AUC={auc_val:.3f})')
        
    plt.plot([0, 1], [0, 1], 'k--', lw=1)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'ROC Curves - {title_suffix}')
    plt.legend(loc="lower right")
    plt.grid(True, alpha=0.3)
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()

def plot_calibration(y_true, probs_dict, save_path, title_suffix=""):
    plt.figure(figsize=(8, 8))
    plt.plot([0, 1], [0, 1], "k:", label="Perfectly calibrated")
    colors = plt.cm.tab10(np.linspace(0, 1, len(probs_dict)))
    
    for i, (name, probs) in enumerate(probs_dict.items()):
        frac, mean_val = calibration_curve(y_true, probs, n_bins=10)
        brier = brier_score_loss(y_true, probs)
        plt.plot(mean_val, frac, "s-", color=colors[i], label=f"{name} (Brier: {brier:.3f})")
    
    plt.ylabel("Fraction of positives")
    plt.xlabel("Mean predicted probability")
    plt.legend(loc="lower right")
    plt.title(f'Calibration - {title_suffix}')
    plt.grid(True, alpha=0.3)
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()

# ======================== 4. 主逻辑流程 ========================

def train_base_models(X_train, y_train, classifiers):
    models = {}
    for name, clf in classifiers:
        print(f"  Training {name}...")
        model = clone(clf)
        model.fit(X_train, y_train)
        models[name] = model
    return models

def run_dynamic_threshold_analysis(models, X_int, y_int, X_ext, y_ext, save_dir):
    """
    核心函数：遍历阈值 -> 动态计算权重 -> 动态构建COPE -> 评估内外部
    """
    # 1. 预先计算所有基础模型的概率 (避免循环中重复预测)
    base_probs_int = {}
    base_probs_ext = {}
    
    for name, model in models.items():
        if hasattr(model, "predict_proba"):
            base_probs_int[name] = model.predict_proba(X_int)[:, 1]
            base_probs_ext[name] = model.predict_proba(X_ext)[:, 1]
        else:
            base_probs_int[name] = model.predict(X_int)
            base_probs_ext[name] = model.predict(X_ext)
            
    # 2. 遍历阈值
    thresholds = np.arange(0.05, 0.96, 0.05)
    dynamic_results = []
    best_f1 = -1
    best_threshold = 0.5
    best_weights = None
    
    print("  Running dynamic threshold loop...")
    for thresh in thresholds:
        thresh = round(thresh, 2)
        
        # A. 计算内部集上基础模型的指标 (用于权重)
        model_metrics_list = []
        for name in models.keys():
            m = calculate_metrics(y_int, base_probs_int[name], thresh)
            m['Classifier'] = name
            model_metrics_list.append(m)
        
        metrics_df = pd.DataFrame(model_metrics_list)
        
        # B. 动态计算 CRITIC 权重
        current_weights = critic_weighting(metrics_df)
        
        # C. 构建动态 COPE
        cope_prob_int = get_ensemble_probs(base_probs_int, current_weights)
        cope_prob_ext = get_ensemble_probs(base_probs_ext, current_weights)
        
        # D. 评估 COPE (内部 & 外部)
        cope_res_int = calculate_metrics(y_int, cope_prob_int, thresh)
        cope_res_ext = calculate_metrics(y_ext, cope_prob_ext, thresh)
        
        # 记录结果
        row = {'Threshold': thresh}
        # 记录权重
        for k, v in current_weights.items():
            row[f'Weight_{k}'] = v
        # 记录内部指标
        for k, v in cope_res_int.items():
            row[f'Int_{k}'] = v
        # 记录外部指标
        for k, v in cope_res_ext.items():
            row[f'Ext_{k}'] = v
            
        dynamic_results.append(row)
        
        # 记录最佳阈值 (以内部 F1 为准)
        if cope_res_int['F1'] > best_f1:
            best_f1 = cope_res_int['F1']
            best_threshold = thresh
            best_weights = current_weights
            
    # 3. 保存动态分析大表
    analysis_df = pd.DataFrame(dynamic_results)
    analysis_df.to_excel(os.path.join(save_dir, 'Dynamic_Threshold_Analysis.xlsx'), index=False)
    
    # 4. 基于最佳阈值生成最终详细报告和图表
    print(f"  Best Threshold found: {best_threshold} (Internal F1: {best_f1:.3f})")
    
    # 构建最佳 COPE 概率
    final_cope_prob_int = get_ensemble_probs(base_probs_int, best_weights)
    final_cope_prob_ext = get_ensemble_probs(base_probs_ext, best_weights)
    
    # 准备绘图数据 (加入 COPE)
    plot_probs_int = base_probs_int.copy()
    plot_probs_int['COPE'] = final_cope_prob_int
    
    plot_probs_ext = base_probs_ext.copy()
    plot_probs_ext['COPE'] = final_cope_prob_ext
    
    # 生成图表
    plot_roc_curves(y_int, plot_probs_int, os.path.join(save_dir, 'Internal_ROC.png'), "Internal")
    plot_roc_curves(y_ext, plot_probs_ext, os.path.join(save_dir, 'External_ROC.png'), "External")
    
    plot_calibration(y_int, plot_probs_int, os.path.join(save_dir, 'Internal_Calibration.png'), "Internal")
    plot_calibration(y_ext, plot_probs_ext, os.path.join(save_dir, 'External_Calibration.png'), "External")
    
    # 生成详细指标表 (含CI)
    final_metrics = []
    # 内部
    for name, p in plot_probs_int.items():
        m = get_full_metrics_with_ci(y_int, p, best_threshold)
        m['Classifier'] = name
        m['Dataset'] = 'Internal'
        final_metrics.append(m)
    # 外部
    for name, p in plot_probs_ext.items():
        m = get_full_metrics_with_ci(y_ext, p, best_threshold)
        m['Classifier'] = name
        m['Dataset'] = 'External'
        final_metrics.append(m)
        
    pd.DataFrame(final_metrics).to_excel(os.path.join(save_dir, 'Final_Optimal_Metrics.xlsx'), index=False)
    
    # 绘制权重变化图
    plt.figure(figsize=(12, 6))
    weight_cols = [c for c in analysis_df.columns if c.startswith('Weight_')]
    for col in weight_cols:
        plt.plot(analysis_df['Threshold'], analysis_df[col], label=col.replace('Weight_', ''))
    plt.xlabel('Threshold')
    plt.ylabel('CRITIC Weight')
    plt.title('Dynamic CRITIC Weights vs Threshold')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, 'Weights_Dynamics.png'), dpi=300)
    plt.close()

# ======================== 5. 执行入口 ========================

classifiers = [
    ('Extra Trees', ExtraTreesClassifier(n_estimators=100, random_state=42)),
    ('Logistic Regression', LogisticRegression(max_iter=1000, random_state=42)),
    ('LightGBM', lgb.LGBMClassifier(random_state=42, verbose=-1)),
    ('CatBoost', cb.CatBoostClassifier(random_state=42, verbose=0, allow_writing_files=False)),
    ('AdaBoost', AdaBoostClassifier(random_state=42))
]

if __name__ == "__main__":
    work_dir = r"D:\卒中预后第一篇论文\2025.11.25-new work\4.COPE"
    os.makedirs(work_dir, exist_ok=True)
    
    # 文件配置
    train_file = "训练集-量子计算.xlsx"
    ext_file = "外部验证集-量子计算.xlsx"
    
    train_path = os.path.join(work_dir, train_file)
    ext_path = os.path.join(work_dir, ext_file)
    
    if not os.path.exists(train_path) or not os.path.exists(ext_path):
        print("Error: Input files missing.")
        exit()
        
    # 1. 加载数据
    print("Loading data...")
    d_train = pd.read_excel(train_path)
    d_ext = pd.read_excel(ext_path)
    
    X_int = d_train.drop(columns=['Prognosis'])
    y_int = d_train['Prognosis']
    
    X_ext = d_ext.drop(columns=['Prognosis'])
    y_ext = d_ext['Prognosis']
    
    # 内部再次划分 Train/Test (用于训练基础模型 vs 计算权重)
    # 这里的 X_test_int 将作为 "Internal Validation" 用来计算权重
    X_train_base, X_test_int, y_train_base, y_test_int = train_test_split(
        X_int, y_int, test_size=0.3, stratify=y_int, random_state=42
    )
    
    # 2. 训练基础模型
    print("Training base models...")
    trained_models = train_base_models(X_train_base, y_train_base, classifiers)
    
    # 3. 运行动态阈值分析
    # 使用 X_test_int 计算权重，然后应用到 X_ext
    print("Starting dynamic threshold analysis...")
    run_dynamic_threshold_analysis(trained_models, X_test_int, y_test_int, X_ext, y_ext, work_dir)
    
    print(f"\nAnalysis complete. Results saved to {work_dir}")

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

# ================= 配置区域 =================
work_dir = r"D:\卒中预后第一篇论文\2025.11.25-new work\4.COPE"
data_file = os.path.join(work_dir, "Dynamic_Threshold_Analysis.xlsx")
output_dir = os.path.join(work_dir, "Threshold_Dynamics_Plots")

# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)

# 设置绘图风格
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['font.size'] = 12

# ================= 绘图函数 =================

def plot_metric_dynamics(df, metrics, prefix, title_suffix):
    """绘制指标随阈值变化的曲线"""
    plt.figure(figsize=(10, 6))
    
    # 定义线条样式和颜色
    styles = ['-', '--', '-.', ':', '-']
    colors = sns.color_palette("tab10", len(metrics))
    
    for i, metric in enumerate(metrics):
        col_name = f"{prefix}_{metric}"
        if col_name in df.columns:
            plt.plot(df['Threshold'], df[col_name], 
                     label=metric, color=colors[i], linestyle=styles[i%len(styles)], linewidth=2)
    
    plt.xlabel('Threshold Probability', fontsize=14)
    plt.ylabel('Score', fontsize=14)
    plt.title(f'COPE Performance vs Threshold ({title_suffix})', fontsize=16, fontweight='bold')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', fontsize=10)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.ylim([0, 1.05])
    
    save_path = os.path.join(output_dir, f"{prefix}_Metrics_Dynamics.png")
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    # 保存PDF版本
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()
    print(f"Saved: {save_path}")

def plot_weight_dynamics_stacked(df):
    """绘制权重随阈值变化的堆叠面积图"""
    weight_cols = [c for c in df.columns if c.startswith('Weight_')]
    labels = [c.replace('Weight_', '') for c in weight_cols]
    
    plt.figure(figsize=(12, 7))
    
    # 绘制堆叠图
    plt.stackplot(df['Threshold'], df[weight_cols].T, labels=labels, alpha=0.85, cmap='viridis')
    
    plt.xlabel('Threshold Probability', fontsize=14)
    plt.ylabel('CRITIC Weight Contribution', fontsize=14)
    plt.title('Dynamic Model Weights Allocation vs Threshold', fontsize=16, fontweight='bold')
    plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=3, fontsize=11)
    plt.xlim([df['Threshold'].min(), df['Threshold'].max()])
    plt.ylim([0, 1.0])
    plt.grid(True, axis='x', linestyle='--', alpha=0.4)
    
    save_path = os.path.join(output_dir, "Weights_Dynamics_Stacked.png")
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()
    print(f"Saved: {save_path}")

def plot_tradeoff_curves(df, prefix, title_suffix):
    """专门绘制敏感度-特异度权衡图"""
    if f'{prefix}_SEN' not in df.columns or f'{prefix}_SPE' not in df.columns:
        return

    plt.figure(figsize=(8, 6))
    
    plt.plot(df['Threshold'], df[f'{prefix}_SEN'], label='Sensitivity', color='blue', lw=2)
    plt.plot(df['Threshold'], df[f'{prefix}_SPE'], label='Specificity', color='green', lw=2)
    
    # 寻找交叉点（平衡点）
    idx = np.argmin(np.abs(df[f'{prefix}_SEN'] - df[f'{prefix}_SPE']))
    cross_thresh = df.iloc[idx]['Threshold']
    cross_val = df.iloc[idx][f'{prefix}_SEN']
    
    plt.scatter(cross_thresh, cross_val, color='red', zorder=5)
    plt.annotate(f'Balance: {cross_thresh:.2f}', (cross_thresh, cross_val), 
                 xytext=(10, 10), textcoords='offset points', fontsize=10)

    plt.xlabel('Threshold', fontsize=14)
    plt.ylabel('Score', fontsize=14)
    plt.title(f'Sensitivity vs Specificity Trade-off ({title_suffix})', fontsize=16)
    plt.legend()
    plt.grid(True, alpha=0.5)
    
    save_path = os.path.join(output_dir, f"{prefix}_Tradeoff.png")
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()
    print(f"Saved: {save_path}")

# ================= 主程序 =================

if __name__ == "__main__":
    if not os.path.exists(data_file):
        print(f"Error: File not found at {data_file}")
        print("Please run the previous analysis code first to generate 'Dynamic_Threshold_Analysis.xlsx'.")
        exit()
        
    print(f"Loading data from: {data_file}")
    df = pd.read_excel(data_file)
    
    # 1. 绘制内部验证集指标动态
    metrics_to_plot = ['AUC', 'F1', 'ACC', 'SEN', 'SPE', 'PPV', 'NPV']
    # 筛选存在的列
    metrics_int = [m for m in metrics_to_plot if f'Int_{m}' in df.columns]
    plot_metric_dynamics(df, metrics_int, 'Int', 'Internal Validation')
    
    # 2. 绘制外部验证集指标动态
    metrics_ext = [m for m in metrics_to_plot if f'Ext_{m}' in df.columns]
    plot_metric_dynamics(df, metrics_ext, 'Ext', 'External Validation')
    
    # 3. 绘制权重堆叠图
    plot_weight_dynamics_stacked(df)
    
    # 4. 绘制敏感度-特异度权衡图
    plot_tradeoff_curves(df, 'Int', 'Internal')
    plot_tradeoff_curves(df, 'Ext', 'External')
    
    print("\nAll plots generated successfully in 'Threshold_Dynamics_Plots' folder.")

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings

# Sklearn & Models
from sklearn.metrics import (roc_curve, auc, roc_auc_score, f1_score, accuracy_score, 
                             precision_score, recall_score, confusion_matrix, brier_score_loss)
from sklearn.model_selection import train_test_split
from sklearn.base import clone
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import AdaBoostClassifier, ExtraTreesClassifier
from sklearn.calibration import calibration_curve

# Boosting Libraries
import xgboost as xgb
import catboost as cb
import lightgbm as lgb

# 忽略警告
warnings.filterwarnings('ignore')

# 设置绘图风格
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['pdf.fonttype'] = 42
# ======================== 1. 基础计算与指标函数 ========================

def bootstrap_ci(y_true, y_prob, metric_func, n_bootstraps=1000, alpha=0.05, **kwargs):
    """Bootstrap计算95% CI"""
    rng = np.random.RandomState(42)
    y_true = np.array(y_true)
    y_prob = np.array(y_prob)
    scores = []
    
    for i in range(n_bootstraps):
        indices = rng.randint(0, len(y_prob), len(y_prob))
        if len(np.unique(y_true[indices])) < 2: continue
        try:
            scores.append(metric_func(y_true[indices], y_prob[indices], **kwargs))
        except: continue
            
    if not scores: return np.nan, np.nan
    scores = np.sort(scores)
    return np.percentile(scores, alpha/2*100), np.percentile(scores, (1-alpha/2)*100)

def calculate_metrics(y_true, y_prob, threshold):
    """计算指定阈值下的单次指标 (不含CI，用于循环中快速计算权重)"""
    y_pred = (y_prob >= threshold).astype(int)
    
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()
    
    acc = accuracy_score(y_true, y_pred)
    sen = recall_score(y_true, y_pred, zero_division=0)
    spe = tn / (tn + fp + 1e-10)
    ppv = precision_score(y_true, y_pred, zero_division=0)
    npv = tn / (tn + fn + 1e-10)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    
    # AUC 和 Brier 与阈值无关，但在表中展示作为参考
    auc_val = roc_auc_score(y_true, y_prob) if len(np.unique(y_true)) > 1 else np.nan
    
    return {
        'AUC': auc_val, 'F1': f1, 'ACC': acc, 
        'SEN': sen, 'SPE': spe, 'PPV': ppv, 'NPV': npv
    }

def get_full_metrics_with_ci(y_true, y_prob, threshold):
    """计算完整指标含CI (用于最终报告)"""
    base = calculate_metrics(y_true, y_prob, threshold)
    
    # 补充 Brier
    base['Brier'] = brier_score_loss(y_true, y_prob)
    
    # 计算 AUC CI
    low, high = bootstrap_ci(y_true, y_prob, roc_auc_score, n_bootstraps=500)
    base['AUC_95CI'] = f"{base['AUC']:.3f} ({low:.3f}-{high:.3f})"
    
    # 计算 Brier CI
    b_low, b_high = bootstrap_ci(y_true, y_prob, brier_score_loss, n_bootstraps=500)
    base['Brier_95CI'] = f"{base['Brier']:.3f} ({b_low:.3f}-{b_high:.3f})"
    
    return base

# ======================== 2. CRITIC 权重计算 ========================

def critic_weighting(metrics_df):
    """
    基于当前指标数据框计算 CRITIC 权重
    metrics_df: 行是模型，列是指标 (AUC, F1, ACC, SEN, SPE)
    """
    df = metrics_df.copy()
    # 选取用于权重的指标 (正向指标)
    indicators = ['AUC', 'F1', 'ACC', 'SEN', 'SPE']
    valid_inds = [c for c in indicators if c in df.columns]
    
    # 获取数据矩阵
    Z = df[valid_inds].values
    
    # 极差归一化
    ptp = Z.max(axis=0) - Z.min(axis=0)
    ptp[ptp == 0] = 1e-8 # 防止除零
    Z_norm = (Z - Z.min(axis=0)) / ptp
    
    # 标准差 (对比强度)
    std_dev = np.std(Z_norm, axis=0)
    
    # 相关系数 (冲突性)
    with np.errstate(invalid='ignore'):
        corr_matrix = np.corrcoef(Z_norm, rowvar=False)
    if np.isnan(corr_matrix).any(): 
        # 如果某列全是0(如阈值极端时)，相关系数无法计算，设冲突系数为1(无冲突信息)
        conflict = np.ones(len(valid_inds))
    else:
        conflict = np.sum(1 - np.abs(corr_matrix), axis=1)
    
    # 信息量
    info = std_dev * conflict
    
    # 指标权重
    w_j = info / (np.sum(info) + 1e-10)
    
    # 模型综合得分
    model_scores = np.sum(Z_norm * w_j, axis=1)
    
    # 归一化为模型权重
    final_model_weights = model_scores / (np.sum(model_scores) + 1e-10)
    
    return dict(zip(df['Classifier'], final_model_weights))

def get_ensemble_probs(probs_dict, weights):
    """根据权重组合概率"""
    ensemble_probs = np.zeros_like(list(probs_dict.values())[0])
    total_weight = 0
    for name, w in weights.items():
        if name in probs_dict:
            ensemble_probs += w * probs_dict[name]
            total_weight += w
    return ensemble_probs / (total_weight + 1e-10)

# ======================== 3. 绘图函数 ========================

def plot_roc_curves(y_true, probs_dict, save_path, title_suffix=""):
    plt.figure(figsize=(10, 8))
    colors = plt.cm.tab10(np.linspace(0, 1, len(probs_dict)))
    
    for i, (name, probs) in enumerate(probs_dict.items()):
        fpr, tpr, _ = roc_curve(y_true, probs)
        auc_val = roc_auc_score(y_true, probs)
        plt.plot(fpr, tpr, color=colors[i], lw=2, label=f'{name} (AUC={auc_val:.3f})')
        
    plt.plot([0, 1], [0, 1], 'k--', lw=1)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'ROC Curves - {title_suffix}')
    plt.legend(loc="lower right")
    plt.grid(True, alpha=0.3)
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()

def plot_calibration(y_true, probs_dict, save_path, title_suffix=""):
    plt.figure(figsize=(8, 8))
    plt.plot([0, 1], [0, 1], "k:", label="Perfectly calibrated")
    colors = plt.cm.tab10(np.linspace(0, 1, len(probs_dict)))
    
    for i, (name, probs) in enumerate(probs_dict.items()):
        frac, mean_val = calibration_curve(y_true, probs, n_bins=10)
        brier = brier_score_loss(y_true, probs)
        plt.plot(mean_val, frac, "s-", color=colors[i], label=f"{name} (Brier: {brier:.3f})")
    
    plt.ylabel("Fraction of positives")
    plt.xlabel("Mean predicted probability")
    plt.legend(loc="lower right")
    plt.title(f'Calibration - {title_suffix}')
    plt.grid(True, alpha=0.3)
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()

# ======================== 4. 主逻辑流程 ========================

def train_base_models(X_train, y_train, classifiers):
    models = {}
    for name, clf in classifiers:
        print(f"  Training {name}...")
        model = clone(clf)
        model.fit(X_train, y_train)
        models[name] = model
    return models

def run_dynamic_threshold_analysis(models, X_int, y_int, X_ext, y_ext, save_dir):
    """
    核心函数：遍历阈值 -> 动态计算权重 -> 动态构建COPE -> 评估内外部
    """
    # 1. 预先计算所有基础模型的概率 (避免循环中重复预测)
    base_probs_int = {}
    base_probs_ext = {}
    
    for name, model in models.items():
        if hasattr(model, "predict_proba"):
            base_probs_int[name] = model.predict_proba(X_int)[:, 1]
            base_probs_ext[name] = model.predict_proba(X_ext)[:, 1]
        else:
            base_probs_int[name] = model.predict(X_int)
            base_probs_ext[name] = model.predict(X_ext)
            
    # 2. 遍历阈值
    thresholds = np.arange(0.05, 0.96, 0.05)
    dynamic_results = []
    best_f1 = -1
    best_threshold = 0.5
    best_weights = None
    
    print("  Running dynamic threshold loop...")
    for thresh in thresholds:
        thresh = round(thresh, 2)
        
        # A. 计算内部集上基础模型的指标 (用于权重)
        model_metrics_list = []
        for name in models.keys():
            m = calculate_metrics(y_int, base_probs_int[name], thresh)
            m['Classifier'] = name
            model_metrics_list.append(m)
        
        metrics_df = pd.DataFrame(model_metrics_list)
        
        # B. 动态计算 CRITIC 权重
        current_weights = critic_weighting(metrics_df)
        
        # C. 构建动态 COPE
        cope_prob_int = get_ensemble_probs(base_probs_int, current_weights)
        cope_prob_ext = get_ensemble_probs(base_probs_ext, current_weights)
        
        # D. 评估 COPE (内部 & 外部)
        cope_res_int = calculate_metrics(y_int, cope_prob_int, thresh)
        cope_res_ext = calculate_metrics(y_ext, cope_prob_ext, thresh)
        
        # 记录结果
        row = {'Threshold': thresh}
        # 记录权重
        for k, v in current_weights.items():
            row[f'Weight_{k}'] = v
        # 记录内部指标
        for k, v in cope_res_int.items():
            row[f'Int_{k}'] = v
        # 记录外部指标
        for k, v in cope_res_ext.items():
            row[f'Ext_{k}'] = v
            
        dynamic_results.append(row)
        
        # 记录最佳阈值 (以内部 F1 为准)
        if cope_res_int['F1'] > best_f1:
            best_f1 = cope_res_int['F1']
            best_threshold = thresh
            best_weights = current_weights
            
    # 3. 保存动态分析大表
    analysis_df = pd.DataFrame(dynamic_results)
    analysis_df.to_excel(os.path.join(save_dir, 'Dynamic_Threshold_Analysis.xlsx'), index=False)
    
    # 4. 基于最佳阈值生成最终详细报告和图表
    print(f"  Best Threshold found: {best_threshold} (Internal F1: {best_f1:.3f})")
    
    # 构建最佳 COPE 概率
    final_cope_prob_int = get_ensemble_probs(base_probs_int, best_weights)
    final_cope_prob_ext = get_ensemble_probs(base_probs_ext, best_weights)
    
    # 准备绘图数据 (加入 COPE)
    plot_probs_int = base_probs_int.copy()
    plot_probs_int['COPE'] = final_cope_prob_int
    
    plot_probs_ext = base_probs_ext.copy()
    plot_probs_ext['COPE'] = final_cope_prob_ext
    
    # 生成图表
    plot_roc_curves(y_int, plot_probs_int, os.path.join(save_dir, 'Internal_ROC.png'), "Internal")
    plot_roc_curves(y_ext, plot_probs_ext, os.path.join(save_dir, 'External_ROC.png'), "External")
    
    plot_calibration(y_int, plot_probs_int, os.path.join(save_dir, 'Internal_Calibration.png'), "Internal")
    plot_calibration(y_ext, plot_probs_ext, os.path.join(save_dir, 'External_Calibration.png'), "External")
    
    # 生成详细指标表 (含CI)
    final_metrics = []
    # 内部
    for name, p in plot_probs_int.items():
        m = get_full_metrics_with_ci(y_int, p, best_threshold)
        m['Classifier'] = name
        m['Dataset'] = 'Internal'
        final_metrics.append(m)
    # 外部
    for name, p in plot_probs_ext.items():
        m = get_full_metrics_with_ci(y_ext, p, best_threshold)
        m['Classifier'] = name
        m['Dataset'] = 'External'
        final_metrics.append(m)
        
    pd.DataFrame(final_metrics).to_excel(os.path.join(save_dir, 'Final_Optimal_Metrics.xlsx'), index=False)
    
    # 绘制权重变化图
    plt.figure(figsize=(12, 6))
    weight_cols = [c for c in analysis_df.columns if c.startswith('Weight_')]
    for col in weight_cols:
        plt.plot(analysis_df['Threshold'], analysis_df[col], label=col.replace('Weight_', ''))
    plt.xlabel('Threshold')
    plt.ylabel('CRITIC Weight')
    plt.title('Dynamic CRITIC Weights vs Threshold')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, 'Weights_Dynamics.png'), dpi=300)
    plt.close()

# ======================== 5. 执行入口 ========================

classifiers = [
    ('Extra Trees', ExtraTreesClassifier(n_estimators=100, random_state=42)),
    ('Logistic Regression', LogisticRegression(max_iter=1000, random_state=42)),
    ('LightGBM', lgb.LGBMClassifier(random_state=42, verbose=-1)),
    ('CatBoost', cb.CatBoostClassifier(random_state=42, verbose=0, allow_writing_files=False)),
    ('AdaBoost', AdaBoostClassifier(random_state=42))
]

if __name__ == "__main__":
    work_dir = r"D:/卒中预后第二篇论文/2025.11.25-new work/4.1 Non-CPD COPE"
    os.makedirs(work_dir, exist_ok=True)
    
    # 文件配置
    train_file = "训练集-量子计算.xlsx"
    ext_file = "外部验证集-量子计算.xlsx"
    
    train_path = os.path.join(work_dir, train_file)
    ext_path = os.path.join(work_dir, ext_file)
    
    if not os.path.exists(train_path) or not os.path.exists(ext_path):
        print("Error: Input files missing.")
        exit()
        
    # 1. 加载数据
    print("Loading data...")
    d_train = pd.read_excel(train_path)
    d_ext = pd.read_excel(ext_path)
    
    X_int = d_train.drop(columns=['Prognosis'])
    y_int = d_train['Prognosis']
    
    X_ext = d_ext.drop(columns=['Prognosis'])
    y_ext = d_ext['Prognosis']
    
    # 内部再次划分 Train/Test (用于训练基础模型 vs 计算权重)
    # 这里的 X_test_int 将作为 "Internal Validation" 用来计算权重
    X_train_base, X_test_int, y_train_base, y_test_int = train_test_split(
        X_int, y_int, test_size=0.3, stratify=y_int, random_state=42
    )
    
    # 2. 训练基础模型
    print("Training base models...")
    trained_models = train_base_models(X_train_base, y_train_base, classifiers)
    
    # 3. 运行动态阈值分析
    # 使用 X_test_int 计算权重，然后应用到 X_ext
    print("Starting dynamic threshold analysis...")
    run_dynamic_threshold_analysis(trained_models, X_test_int, y_test_int, X_ext, y_ext, work_dir)
    
    print(f"\nAnalysis complete. Results saved to {work_dir}")

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

# ================= 配置区域 =================
work_dir = r"D:/卒中预后第二篇论文/2025.11.25-new work/4.1 Non-CPD COPE"
data_file = os.path.join(work_dir, "Dynamic_Threshold_Analysis.xlsx")
output_dir = os.path.join(work_dir, "Threshold_Dynamics_Plots")

# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)

# 设置绘图风格
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['font.size'] = 12

# ================= 绘图函数 =================

def plot_metric_dynamics(df, metrics, prefix, title_suffix):
    """绘制指标随阈值变化的曲线"""
    plt.figure(figsize=(10, 6))
    
    # 定义线条样式和颜色
    styles = ['-', '--', '-.', ':', '-']
    colors = sns.color_palette("tab10", len(metrics))
    
    for i, metric in enumerate(metrics):
        col_name = f"{prefix}_{metric}"
        if col_name in df.columns:
            plt.plot(df['Threshold'], df[col_name], 
                     label=metric, color=colors[i], linestyle=styles[i%len(styles)], linewidth=2)
    
    plt.xlabel('Threshold Probability', fontsize=14)
    plt.ylabel('Score', fontsize=14)
    plt.title(f'COPE Performance vs Threshold ({title_suffix})', fontsize=16, fontweight='bold')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', fontsize=10)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.ylim([0, 1.05])
    
    save_path = os.path.join(output_dir, f"{prefix}_Metrics_Dynamics.png")
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    # 保存PDF版本
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()
    print(f"Saved: {save_path}")

def plot_weight_dynamics_stacked(df):
    """绘制权重随阈值变化的堆叠面积图"""
    weight_cols = [c for c in df.columns if c.startswith('Weight_')]
    labels = [c.replace('Weight_', '') for c in weight_cols]
    
    plt.figure(figsize=(12, 7))
    
    # 绘制堆叠图
    plt.stackplot(df['Threshold'], df[weight_cols].T, labels=labels, alpha=0.85, cmap='viridis')
    
    plt.xlabel('Threshold Probability', fontsize=14)
    plt.ylabel('CRITIC Weight Contribution', fontsize=14)
    plt.title('Dynamic Model Weights Allocation vs Threshold', fontsize=16, fontweight='bold')
    plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=3, fontsize=11)
    plt.xlim([df['Threshold'].min(), df['Threshold'].max()])
    plt.ylim([0, 1.0])
    plt.grid(True, axis='x', linestyle='--', alpha=0.4)
    
    save_path = os.path.join(output_dir, "Weights_Dynamics_Stacked.png")
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()
    print(f"Saved: {save_path}")

def plot_tradeoff_curves(df, prefix, title_suffix):
    """专门绘制敏感度-特异度权衡图"""
    if f'{prefix}_SEN' not in df.columns or f'{prefix}_SPE' not in df.columns:
        return

    plt.figure(figsize=(8, 6))
    
    plt.plot(df['Threshold'], df[f'{prefix}_SEN'], label='Sensitivity', color='blue', lw=2)
    plt.plot(df['Threshold'], df[f'{prefix}_SPE'], label='Specificity', color='green', lw=2)
    
    # 寻找交叉点（平衡点）
    idx = np.argmin(np.abs(df[f'{prefix}_SEN'] - df[f'{prefix}_SPE']))
    cross_thresh = df.iloc[idx]['Threshold']
    cross_val = df.iloc[idx][f'{prefix}_SEN']
    
    plt.scatter(cross_thresh, cross_val, color='red', zorder=5)
    plt.annotate(f'Balance: {cross_thresh:.2f}', (cross_thresh, cross_val), 
                 xytext=(10, 10), textcoords='offset points', fontsize=10)

    plt.xlabel('Threshold', fontsize=14)
    plt.ylabel('Score', fontsize=14)
    plt.title(f'Sensitivity vs Specificity Trade-off ({title_suffix})', fontsize=16)
    plt.legend()
    plt.grid(True, alpha=0.5)
    
    save_path = os.path.join(output_dir, f"{prefix}_Tradeoff.png")
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.savefig(save_path.replace('.png', '.pdf'))
    plt.close()
    print(f"Saved: {save_path}")

# ================= 主程序 =================

if __name__ == "__main__":
    if not os.path.exists(data_file):
        print(f"Error: File not found at {data_file}")
        print("Please run the previous analysis code first to generate 'Dynamic_Threshold_Analysis.xlsx'.")
        exit()
        
    print(f"Loading data from: {data_file}")
    df = pd.read_excel(data_file)
    
    # 1. 绘制内部验证集指标动态
    metrics_to_plot = ['AUC', 'F1', 'ACC', 'SEN', 'SPE', 'PPV', 'NPV']
    # 筛选存在的列
    metrics_int = [m for m in metrics_to_plot if f'Int_{m}' in df.columns]
    plot_metric_dynamics(df, metrics_int, 'Int', 'Internal Validation')
    
    # 2. 绘制外部验证集指标动态
    metrics_ext = [m for m in metrics_to_plot if f'Ext_{m}' in df.columns]
    plot_metric_dynamics(df, metrics_ext, 'Ext', 'External Validation')
    
    # 3. 绘制权重堆叠图
    plot_weight_dynamics_stacked(df)
    
    # 4. 绘制敏感度-特异度权衡图
    plot_tradeoff_curves(df, 'Int', 'Internal')
    plot_tradeoff_curves(df, 'Ext', 'External')
    
    print("\nAll plots generated successfully in 'Threshold_Dynamics_Plots' folder.")

In [None]:
import pandas as pd
import numpy as np
import shap
import matplotlib.pyplot as plt
import seaborn as sns
import os
import time
import warnings
import matplotlib

# ======================== 修改点 1: 设置 PDF 字体为 Type 42 (可编辑) ========================
matplotlib.use('Agg')  # 非交互式后端
plt.rcParams['pdf.fonttype'] = 42
plt.rcParams['ps.fonttype'] = 42
# ==========================================================================================

# Sklearn & Models
from sklearn.metrics import (roc_auc_score, f1_score, accuracy_score, precision_score, 
                             recall_score, confusion_matrix)
from sklearn.model_selection import train_test_split
from sklearn.base import clone
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import AdaBoostClassifier, ExtraTreesClassifier
import xgboost as xgb
import catboost as cb
import lightgbm as lgb

# 忽略警告
warnings.filterwarnings('ignore')

# 设置绘图字体
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman']
plt.rcParams['axes.unicode_minus'] = False

# ======================== 1. 基础训练与权重计算函数 ========================

def train_base_models(X_train, y_train, classifiers):
    """训练基础模型"""
    trained_models = {}
    for name, clf in classifiers:
        try:
            model = clone(clf)
            model.fit(X_train, y_train)
            trained_models[name] = model
            print(f"  Trained {name}")
        except Exception as e:
            print(f"  Failed to train {name}: {e}")
    return trained_models

def get_metrics(y_true, y_prob):
    y_pred = (y_prob >= 0.5).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return {
        'AUC': roc_auc_score(y_true, y_prob),
        'F1': f1_score(y_true, y_pred),
        'ACC': accuracy_score(y_true, y_pred),
        'SEN': recall_score(y_true, y_pred),
        'SPE': tn / (tn + fp + 1e-10)
    }

def critic_weighting(metrics_list):
    """CRITIC 权重计算"""
    df = pd.DataFrame(metrics_list)
    indicators = ['AUC', 'F1', 'ACC', 'SEN', 'SPE']
    Z = df[indicators].values
    
    # 归一化
    Z_norm = (Z - Z.min(axis=0)) / (Z.max(axis=0) - Z.min(axis=0) + 1e-8)
    
    std_dev = np.std(Z_norm, axis=0)
    corr_matrix = np.corrcoef(Z_norm, rowvar=False)
    if np.isnan(corr_matrix).any():
        conflict = np.ones(len(indicators))
    else:
        conflict = np.sum(1 - np.abs(corr_matrix), axis=1)
        
    info = std_dev * conflict
    weights = info / (np.sum(info) + 1e-10)
    
    model_scores = np.sum(Z_norm * weights, axis=1)
    final_weights = model_scores / (np.sum(model_scores) + 1e-10)
    
    return dict(zip(df['Classifier'], final_weights))

# ======================== 2. SHAP 计算函数 ========================

def compute_shap_for_model(model, X_train, X_test):
    """计算单个模型的 SHAP 值"""
    # 降采样背景数据加速 KernelExplainer
    background = shap.sample(X_train, 50) 
    
    if isinstance(model, (lgb.LGBMClassifier, cb.CatBoostClassifier, ExtraTreesClassifier)):
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_test)
    else:
        # Logistic Regression, AdaBoost 等用 KernelExplainer
        # 使用 predict_proba 解释概率输出
        explainer = shap.KernelExplainer(model.predict_proba, background)
        shap_values = explainer.shap_values(X_test)

    # 处理 SHAP 输出格式差异
    if isinstance(shap_values, list):
        # Binary classification, take positive class (index 1)
        if len(shap_values) == 2:
            return shap_values[1]
        return shap_values[0]
    elif len(np.shape(shap_values)) == 3:
        # (samples, features, classes) -> take class 1
        return shap_values[:, :, 1]
    
    return shap_values

def compute_cope_shap(trained_models, weights, X_train, X_test):
    """计算加权集成的 COPE SHAP 值"""
    print("  Computing SHAP for ensemble...")
    
    # 初始化
    n_samples, n_features = X_test.shape
    cope_shap = np.zeros((n_samples, n_features))
    
    for name, model in trained_models.items():
        if name not in weights: continue
        w = weights[name]
        
        try:
            sv = compute_shap_for_model(model, X_train, X_test)
            # 确保形状匹配
            if sv.shape == cope_shap.shape:
                cope_shap += w * sv
            else:
                print(f"    Shape mismatch for {name}, skipping.")
        except Exception as e:
            print(f"    Error computing SHAP for {name}: {e}")
            
    return cope_shap

# ======================== 3. 不同阈值下的可视化函数 ========================

def plot_shap_by_threshold_groups(shap_values, X_test, probs, save_dir):
    """
    将样本按预测概率分为低、中、高风险组，分别绘制 SHAP Summary Plot
    """
    groups = {
        'Low_Risk_Only (Prob < 0.3)': probs < 0.3,
        'Medium_Risk_Only (0.3 <= Prob <= 0.6)': (probs >= 0.3) & (probs <= 0.6),
        'High_Risk_Only (Prob > 0.6)': probs > 0.6
    }
    
    for label, mask in groups.items():
        if np.sum(mask) < 5:
            print(f"    Skipping {label}: not enough samples ({np.sum(mask)})")
            continue
            
        plt.figure(figsize=(10, 8))
        shap.summary_plot(shap_values[mask], X_test[mask], show=False)
        plt.title(f"SHAP Summary - {label}", fontsize=16)
        plt.tight_layout()
        plt.savefig(os.path.join(save_dir, f"SHAP_Summary_{label.split()[0]}.pdf"))
        plt.close()

def plot_feature_importance_heatmap_by_threshold(shap_values, X_test, probs, feature_names, save_dir):
    """
    绘制热图：不同风险区间下，各特征的平均绝对SHAP值
    """
    bins = [0, 0.2, 0.4, 0.6, 0.8, 1.0]
    labels = ['0-0.2', '0.2-0.4', '0.4-0.6', '0.6-0.8', '0.8-1.0']
    
    prob_bins = pd.cut(probs, bins=bins, labels=labels)
    
    df_shap = pd.DataFrame(np.abs(shap_values), columns=feature_names)
    df_shap['Bin'] = prob_bins
    
    grouped = df_shap.groupby('Bin').mean()
    
    # 仅选取总重要性排名前 10 的特征
    top_features = df_shap.drop('Bin', axis=1).mean().sort_values(ascending=False).head(10).index
    plot_data = grouped[top_features].T
    
    plt.figure(figsize=(8, 10))
    sns.heatmap(plot_data, cmap='viridis', annot=True, fmt='.3f', linewidths=.5)
    plt.title("Mean |SHAP| Value by Risk Probability Interval", fontsize=14)
    plt.xlabel("Predicted Probability Interval", fontsize=12)
    plt.ylabel("Top 10 Features", fontsize=12)
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, "SHAP_Feature_Importance_Heatmap_by_Threshold.pdf"))
    plt.close()

# ======================== 修改点 2: 合并 Decision Plot 函数 ========================
def plot_decision_plot_combined(shap_values, X_test, probs, save_dir):
    """
    将低风险和高风险样本的 Decision Plot 合并到一张图中 (1x2 Subplots)
    """
    # 按概率排序
    sorted_idx = np.argsort(probs)
    
    # 选几个代表点
    low_idx = sorted_idx[:20] # 最低概率的20个
    high_idx = sorted_idx[-20:] # 最高概率的20个
    
    base_value = np.mean(probs)
    
    # 创建画布：宽20，高8，准备画左右两张图
    fig = plt.figure(figsize=(20, 8))
    
    # --- 左图：Low Risk ---
    # 使用 fig.add_subplot 激活左侧子图区域，shap.decision_plot 会绘制在当前激活的轴上
    ax1 = fig.add_subplot(1, 2, 1)
    shap.decision_plot(
        base_value, 
        shap_values[low_idx], 
        X_test.iloc[low_idx], 
        show=False, 
        link='logit'
    )
    # 为左子图添加标题 (注意：decision_plot可能会有些遮挡，调整y位置)
    ax1.set_title("Decision Path - Lowest Probability Samples", fontsize=16, y=1.02)
    
    # --- 右图：High Risk ---
    ax2 = fig.add_subplot(1, 2, 2)
    shap.decision_plot(
        base_value, 
        shap_values[high_idx], 
        X_test.iloc[high_idx], 
        show=False, 
        link='logit'
    )
    ax2.set_title("Decision Path - Highest Probability Samples", fontsize=16, y=1.02)
    
    plt.tight_layout()
    # 保存为一张合并的 PDF
    plt.savefig(os.path.join(save_dir, "Decision_Plot_Combined_Risk.pdf"), bbox_inches='tight')
    plt.close()
# =================================================================================

# ======================== 4. 主程序 ========================

classifiers = [
    ('Extra Trees', ExtraTreesClassifier(n_estimators=100, random_state=42)),
    ('Logistic Regression', LogisticRegression(max_iter=1000, random_state=42)),
    ('LightGBM', lgb.LGBMClassifier(random_state=42, verbose=-1)),
    ('CatBoost', cb.CatBoostClassifier(random_state=42, verbose=0, allow_writing_files=False)),
    ('AdaBoost', AdaBoostClassifier(random_state=42))
]

if __name__ == "__main__":
    # 1. 设置路径
    work_dir = r"D:\卒中预后第二篇论文\2025.11.25-new work\5.SHAP"
    data_path = os.path.join(work_dir, "训练集-量子计算.xlsx")
    save_dir = os.path.join(work_dir, "Threshold_SHAP_Results")
    os.makedirs(save_dir, exist_ok=True)
    
    if not os.path.exists(data_path):
        print(f"Error: Data file not found at {data_path}")
        exit()

    # 2. 加载数据
    print("Loading data...")
    data = pd.read_excel(data_path)
    X = data.drop(columns=['Prognosis'])
    y = data['Prognosis']
    
    # 处理非数值列 (简单编码)
    for col in X.columns:
        if X[col].dtype == 'object':
            X[col] = X[col].astype('category').cat.codes
            
    # 3. 划分训练/测试
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, stratify=y, random_state=42
    )
    
    # 4. 训练基础模型
    print("Training base models...")
    models = train_base_models(X_train, y_train, classifiers)
    
    # 5. 计算 CRITIC 权重 (一次性计算全局权重用于 COPE)
    print("Calculating CRITIC weights...")
    metrics_list = []
    for name, model in models.items():
        p = model.predict_proba(X_test)[:, 1] if hasattr(model, "predict_proba") else model.predict(X_test)
        m = get_metrics(y_test, p)
        m['Classifier'] = name
        metrics_list.append(m)
    weights = critic_weighting(metrics_list)
    
    # 6. 计算 COPE 概率
    print("Calculating COPE predictions...")
    cope_probs = np.zeros(len(X_test))
    total_w = 0
    for name, model in models.items():
        w = weights[name]
        p = model.predict_proba(X_test)[:, 1] if hasattr(model, "predict_proba") else model.predict(X_test)
        cope_probs += w * p
        total_w += w
    cope_probs /= total_w
    
    # 7. 计算 COPE SHAP (加权集成)
    cope_shap = compute_cope_shap(models, weights, X_train, X_test)
    
    # 8. 执行不同阈值下的 SHAP 可视化
    print("Generating threshold-stratified SHAP plots...")
    
    # A. 分组 Summary Plot (Low/Medium/High Risk)
    plot_shap_by_threshold_groups(cope_shap, X_test, cope_probs, save_dir)
    
    # B. 特征重要性热图 (随概率区间变化)
    plot_feature_importance_heatmap_by_threshold(cope_shap, X_test, cope_probs, X.columns, save_dir)
    
    # C. 决策路径图 (已修改为合并图)
    print("  Creating Combined Decision Plot...")
    plot_decision_plot_combined(cope_shap, X_test, cope_probs, save_dir)
    
    # D. 保存全局 SHAP Summary 作为参考
    plt.figure(figsize=(10, 8))
    shap.summary_plot(cope_shap, X_test, show=False)
    plt.title("Global COPE SHAP Summary", fontsize=16)
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, "SHAP_Summary_Global.pdf"))
    plt.close()
    
    print(f"\nAnalysis complete. All plots saved to: {save_dir}")

In [None]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
from scipy.stats import mannwhitneyu

# Sklearn imports
from sklearn.cluster import KMeans, MiniBatchKMeans, AgglomerativeClustering, Birch
from sklearn.mixture import GaussianMixture
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from sklearn.decomposition import FastICA
from sklearn.preprocessing import StandardScaler

# 忽略警告
warnings.filterwarnings('ignore')

# ================= 设置绘图风格 =================
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['pdf.fonttype'] = 42 # 确保导出为矢量字体

class StrokeSubphenotypeClustering:
    def __init__(self, work_dir, n_clusters=2, random_state=42):
        self.work_dir = work_dir
        self.n_clusters = n_clusters
        self.random_state = random_state
        self.scaler = StandardScaler()
        
        self.models = {
            'GMM': GaussianMixture(n_components=n_clusters, random_state=random_state),
            'KMeans': KMeans(n_clusters=n_clusters, random_state=random_state),
            'MiniBatchKMeans': MiniBatchKMeans(n_clusters=n_clusters, random_state=random_state),
            'HAC': AgglomerativeClustering(n_clusters=n_clusters),
            'Birch': Birch(n_clusters=n_clusters)
        }
        
        self.output_dir = os.path.join(work_dir, "Clustering_Results")
        os.makedirs(self.output_dir, exist_ok=True)
        
    def load_and_preprocess_data(self, train_file, val_file=None):
        print(f"Loading training data from: {train_file}")
        train_df = pd.read_excel(train_file)
        
        if 'Prognosis' in train_df.columns:
            self.feature_cols = [c for c in train_df.columns if c != 'Prognosis']
            self.label_col = 'Prognosis'
        else:
            self.feature_cols = train_df.columns.tolist()
            self.label_col = None
            
        print(f"Features ({len(self.feature_cols)}): {self.feature_cols}")
        
        X_train = train_df[self.feature_cols].copy()
        X_train = X_train.fillna(X_train.mean())
        X_train_scaled = self.scaler.fit_transform(X_train)
        
        X_val_scaled = None
        val_df = None
        if val_file and os.path.exists(val_file):
            print(f"Loading validation data from: {val_file}")
            val_df = pd.read_excel(val_file)
            X_val = val_df[self.feature_cols].copy()
            X_val = X_val.fillna(X_train.mean())
            X_val_scaled = self.scaler.transform(X_val)
            
        return X_train_scaled, X_val_scaled, train_df, val_df

    def determine_optimal_clusters(self, X, max_k=8):
        print("\nDetermining optimal k...")
        k_range = range(2, max_k + 1)
        scores = {'Silhouette': [], 'Calinski-Harabasz': [], 'Davies-Bouldin': []}
        
        for k in k_range:
            gmm = GaussianMixture(n_components=k, random_state=self.random_state)
            labels = gmm.fit_predict(X)
            scores['Silhouette'].append(silhouette_score(X, labels))
            scores['Calinski-Harabasz'].append(calinski_harabasz_score(X, labels))
            scores['Davies-Bouldin'].append(davies_bouldin_score(X, labels))
            
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))
        metrics = [('Silhouette', 'bo-'), ('Calinski-Harabasz', 'ro-'), ('Davies-Bouldin', 'go-')]
        
        for i, (name, style) in enumerate(metrics):
            axes[i].plot(k_range, scores[name], style)
            axes[i].set_title(name)
            axes[i].set_xlabel('k')
            
        plt.tight_layout()
        plt.savefig(os.path.join(self.output_dir, "Optimal_K_Selection.pdf"), format='pdf')
        plt.close()
        
        optimal_k = k_range[np.argmax(scores['Silhouette'])]
        print(f"Optimal k: {optimal_k}")
        return optimal_k

    def compare_clustering_methods(self, X):
        print("\nComparing algorithms...")
        results = []
        for name, model in self.models.items():
            try:
                if name == 'GMM':
                    model.fit(X)
                    labels = model.predict(X)
                else:
                    labels = model.fit_predict(X)
                
                results.append({
                    'Method': name,
                    'Silhouette': silhouette_score(X, labels),
                    'Calinski-Harabasz': calinski_harabasz_score(X, labels),
                    'Davies-Bouldin': davies_bouldin_score(X, labels)
                })
            except Exception as e:
                print(f"Error with {name}: {e}")
                
        res_df = pd.DataFrame(results)
        print(res_df)
        res_df.to_excel(os.path.join(self.output_dir, "Method_Comparison.xlsx"), index=False)
        return res_df.sort_values('Silhouette', ascending=False).iloc[0]['Method']

    def visualize_clusters_ica(self, X, labels, title, filename):
        ica = FastICA(n_components=2, random_state=self.random_state)
        X_ica = ica.fit_transform(X)
        
        plt.figure(figsize=(8, 6))
        scatter = plt.scatter(X_ica[:, 0], X_ica[:, 1], c=labels, cmap='viridis', alpha=0.7, s=60)
        plt.colorbar(scatter, label='Cluster')
        plt.title(f'{title} (FastICA)')
        plt.xlabel('ICA Component 1')
        plt.ylabel('ICA Component 2')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(os.path.join(self.output_dir, filename), format='pdf')
        plt.close()

    def plot_feature_violins(self, df, labels, prefix):
        """绘制所有指标的小提琴图并进行 Mann-Whitney U 检验"""
        data = df.copy()
        data['Cluster'] = labels
        
        # 需要绘制的所有列：特征 + Prognosis
        plot_cols = self.feature_cols + ([self.label_col] if self.label_col else [])
        
        # 确保只有2个簇才能做 Mann-Whitney U，否则只画图
        unique_clusters = np.unique(labels)
        is_binary = len(unique_clusters) == 2
        
        # 计算行数，每行4个图
        n_cols = 4
        n_rows = int(np.ceil(len(plot_cols) / n_cols))
        
        fig, axes = plt.subplots(n_rows, n_cols, figsize=(20, 5 * n_rows))
        axes = axes.flatten()
        
        for i, col in enumerate(plot_cols):
            ax = axes[i]
            
            # 绘制小提琴图
            sns.violinplot(x='Cluster', y=col, data=data, ax=ax, palette="muted", inner="box")
            ax.set_title(f'{col}')
            ax.set_xlabel('')
            ax.set_ylabel('Value')
            
            # 如果是二分类聚类，进行统计检验
            if is_binary:
                group0 = data[data['Cluster'] == unique_clusters[0]][col]
                group1 = data[data['Cluster'] == unique_clusters[1]][col]
                
                try:
                    stat, p = mannwhitneyu(group0, group1)
                    # 在图上标注 P 值
                    y_max = data[col].max()
                    ax.text(0.5, y_max, f'p={p:.3e}', ha='center', va='bottom', 
                            fontsize=10, color='red' if p < 0.05 else 'black')
                except Exception as e:
                    print(f"Stat error on {col}: {e}")
                    
        # 隐藏多余的子图
        for j in range(i + 1, len(axes)):
            fig.delaxes(axes[j])
            
        plt.tight_layout()
        plt.savefig(os.path.join(self.output_dir, f"{prefix}_Violin_Plots_with_P_Values.pdf"), format='pdf')
        plt.close()

    def analyze_cluster_characteristics(self, df, labels, prefix="Train"):
        data = df.copy()
        data['Cluster'] = labels
        
        # 保存数据
        data.to_excel(os.path.join(self.output_dir, f"{prefix}_Data_with_Clusters.xlsx"), index=False)
        
        # 统计描述
        summary = data.groupby('Cluster').agg(['mean', 'std'])
        summary.to_excel(os.path.join(self.output_dir, f"{prefix}_Feature_Stats.xlsx"))
        
        # 绘制小提琴图 + 检验
        self.plot_feature_violins(df, labels, prefix)

        # 单独绘制 Prognosis 柱状图（更直观）
        if self.label_col in data.columns:
            mortality = data.groupby('Cluster')[self.label_col].mean()
            plt.figure(figsize=(5, 4))
            mortality.plot(kind='bar', color='coral', alpha=0.8, edgecolor='black')
            plt.title(f'{prefix} - Prognosis Rate by Cluster')
            plt.ylabel('Rate')
            plt.xticks(rotation=0)
            plt.tight_layout()
            plt.savefig(os.path.join(self.output_dir, f"{prefix}_Prognosis_Bar.pdf"), format='pdf')
            plt.close()

    def run(self, train_file, val_file):
        # 1. Load
        X_train_scaled, X_val_scaled, df_train, df_val = self.load_and_preprocess_data(train_file, val_file)
        
        # 2. Optimal K
        self.n_clusters = self.determine_optimal_clusters(X_train_scaled)
        for model in self.models.values():
            if hasattr(model, 'n_components'): model.set_params(n_components=self.n_clusters)
            elif hasattr(model, 'n_clusters'): model.set_params(n_clusters=self.n_clusters)
            
        # 3. Best Method
        best_method = self.compare_clustering_methods(X_train_scaled)
        best_model = self.models[best_method]
        
        # 4. Train Cluster
        print(f"\nClustering Train set with {best_method}...")
        if best_method == 'GMM':
            best_model.fit(X_train_scaled)
            train_labels = best_model.predict(X_train_scaled)
        else:
            train_labels = best_model.fit_predict(X_train_scaled)
            
        self.visualize_clusters_ica(X_train_scaled, train_labels, "Train Clusters", "Train_ICA.pdf")
        self.analyze_cluster_characteristics(df_train, train_labels, "Train")
        
        # 5. Validation Cluster
        if X_val_scaled is not None:
            print(f"\nClustering Validation set...")
            if hasattr(best_model, 'predict'):
                val_labels = best_model.predict(X_val_scaled)
            else:
                val_labels = best_model.fit_predict(X_val_scaled)
            
            self.visualize_clusters_ica(X_val_scaled, val_labels, "Validation Clusters", "Val_ICA.pdf")
            self.analyze_cluster_characteristics(df_val, val_labels, "Val")
            
        print(f"\nDone. Results in: {self.output_dir}")

if __name__ == "__main__":
    WORK_DIR = r"D:\卒中预后第一篇论文\2025.11.25-new work\6.无监督聚类"
    TRAIN_FILE = os.path.join(WORK_DIR, "训练集-量子计算.xlsx")
    VAL_FILE = os.path.join(WORK_DIR, "外部验证集-量子计算.xlsx")
    
    clustering = StrokeSubphenotypeClustering(WORK_DIR)
    clustering.run(TRAIN_FILE, VAL_FILE)