In [None]:
#数据拼接
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
import os
from sklearn.metrics import roc_curve, auc, roc_auc_score
from sklearn.preprocessing import label_binarize
from sklearn.linear_model import Lasso
import matplotlib.pyplot as plt
import numpy as np
import warnings
from sklearn.linear_model import LassoCV
import matplotlib as mpl
from collections import Counter

# 忽略所有警告
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['AR PL UKai CN']  # 用来正常显示中文标签 # 选择已安装的中文字体，这里以 'DejaVu Sans' 为例
plt.rcParams['axes.unicode_minus'] = True  # 解决负号显示问题
# mpl.rcParams['font.sans-serif'] = ['DejaVu Sans']

#将数据进行分类
def data_split(dataframe, random_state, test_size=0.3): #0.2是测试集的比例，用于后面测试的！！！
    # the columns of dataframe: names, labels, radiomics_features...
    X = dataframe.iloc[:,1:]  # 从第2列开始的特征，具体根据自己的数据改
    Y = dataframe['label'] #填自己的标签
    # 使用分层采样
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=test_size, random_state=random_state, stratify=Y)
    
    return X_train, X_test, Y_train, Y_test, X, Y
  


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 假设数据存储在DataFrame中
A = './final_stats/retrospective.xlsx'  # 替换为您的文件路径
# file_path = './final_stats/prospective.xlsx'  # 替换为您的文件路径

data = pd.read_excel(A)  # 读取Excel文件
# data = data.dropna()
columns_to_keep = ['AGE', 'BMI', 'tPSA', 'fPSA', 'f/t', 'p2PSA',
                   'PHI', 'Volume', 'PSAD', 'PHID', 'Blood Neutrophil',
                   'Blood Lymphocyte', 'NLR', 'PI-RADS', 'Urinary leukocytes']

X_A = data[columns_to_keep].round(4)
y_A = data['SPC']
# print(X_A.head())
# X_raw_train, X_raw_test, Y_train, Y_test = train_test_split(X_A, y_A, test_size=0.3, random_state=2024, stratify=y_A)

B = './final_stats/prospective.xlsx'  
data = pd.read_excel(B)
data = data.dropna()
X_B = data[columns_to_keep].round(4)
y_B = data['SPC']
# print(X_B.head())



In [None]:
print(pd.read_excel(A).shape)

In [None]:
# # # 标准化特征
# scaler = StandardScaler()

# # 归一化特征
# scaler = MinMaxScaler()
# scaled_features_train = scaler.fit_transform(X_A)
# X_train = pd.DataFrame(scaled_features_train, columns=X_A.columns)
Y_train = y_A
# X_test = pd.DataFrame(scaler.transform(X_B), columns=X_B.columns)
Y_test = y_B

In [None]:
# 定义混合类型特征预处理
from sklearn.compose import ColumnTransformer
# 定义特征类型
continuous_features = ['AGE', 'BMI', 'tPSA', 'fPSA', 'p2PSA',
                      'PHI', 'Volume', 'PSAD', 'PHID',
                      'Blood Neutrophil', 'Blood Lymphocyte','f/t', 'NLR']
non_trans_features = ['Urinary leukocytes', 'PI-RADS']
# ordinal_features = ['PI-RADS']
# 构建预处理Pipeline
preprocessor = ColumnTransformer(
        transformers=[
                ('continuous', MinMaxScaler(), continuous_features),
                ('non-trans', 'passthrough', non_trans_features),
                # ('ordinal', 'passthrough', ordinal_features)  # 保留有序编码
])
X_train = preprocessor.fit_transform(X_A)
# 将结果转换为 DataFrame
X_train_scaled_df = pd.DataFrame(X_train, columns=continuous_features + non_trans_features)
# 确保列的顺序和原始数据一致
X_train = X_train_scaled_df[continuous_features + non_trans_features]
X_test = preprocessor.transform(X_B)
X_test_scaled_df = pd.DataFrame(X_test, columns=continuous_features + non_trans_features)
X_test = X_test_scaled_df[continuous_features + non_trans_features]



In [None]:
print(X_train.shape)
print(X_test.shape)

In [None]:
# Lasso
index=  X_train.columns
alphas = np.logspace(-7,1,100)
max_iter = 100000
model_lassoCV = LassoCV(alphas=alphas, max_iter=max_iter).fit(X_train, Y_train)
coefs = model_lassoCV.path(X_train, Y_train, alphas=alphas, max_iter=max_iter)[1].T
# #为True的时候打开这行代码,false就注释掉
# coefs = coefs.reshape(coefs.shape[0], (coefs.shape[1]*coefs.shape[2]))
# print(coefs.shape)
alphas = model_lassoCV.alpha_
print("LassoCV回归后的最佳参数值",alphas)

# 计算LassoCV后剩下的非零特征权重
tmp = pd.Series(model_lassoCV.coef_, index=index)
print(model_lassoCV.coef_)
featureImportance = tmp[tmp!=0]
featureImportance.sort_values(ascending=True,inplace=True)
print("LassoCV后的特征数量为",featureImportance.shape[0])
print("对应的特征名称包括",featureImportance.index.values.tolist())
print()

# 图例输出
# 可视化LassoCV过程
plt.figure(figsize=(6, 6))
plt.semilogx(model_lassoCV.alphas_,coefs, '-')
plt.axvline(model_lassoCV.alpha_,color = 'gray',ls="--")
plt.tick_params(axis='x', labelsize=20)  # 设置X轴刻度
plt.tick_params(axis='y', labelsize=20)  # 设置Y轴刻度
plt.xlabel('正则化参数',fontsize=20)
plt.ylabel('回归系数',fontsize=20)
plt.savefig("./final_stats/02LassoCV_01.png",dpi=600,format='eps')

# 可视化非零特征的权重值
plt.figure(figsize=(12,6))
plt.barh(featureImportance.index,featureImportance)
plt.title('Relative Importance of Features',fontdict={'size': 20})
plt.tight_layout()
plt.tick_params(axis='x', labelsize=20)  # 设置X轴刻度
plt.tick_params(axis='y', labelsize=20)  # 设置Y轴刻度
plt.savefig("./final_stats/02LassoCV_02.png",dpi=600,format='eps')

# 可视化不同Lambda值对应的MSE
MSE = model_lassoCV.mse_path_
MSE_mean = np.mean(MSE,axis=1)
MSE_std = np.std(MSE,axis=1)

plt.figure(figsize=(6,6))
plt.errorbar(model_lassoCV.alphas_,MSE_mean        #x, y数据，一一对应
                , yerr=MSE_std                     #y误差范围
                , fmt="o"                          #数据点标记
                , ms=2                             #数据点大小
                , mfc="g"                          #数据点颜色
                , mec="r"                          #数据点边缘颜色
                , ecolor="lightblue"               #误差棒颜色
                , elinewidth=2                     #误差棒线宽
                , capsize=4                        #误差棒边界线长度
                , capthick=1)                      #误差棒边界线厚度
plt.semilogx()
plt.axvline(model_lassoCV.alpha_,color = 'gray',ls="--")
plt.xlabel('正则化参数',fontdict={'size': 20})
plt.ylabel('均方误差',fontdict={'size': 20})
plt.tick_params(axis='x', labelsize=20)  # 设置X轴刻度
plt.tick_params(axis='y', labelsize=20)  # 设置Y轴刻度
plt.savefig("./final_stats/02LassoCV_03.png",dpi=600,format='eps')


# 获取筛选后的临床数据（训练集和测试集）
tmp = pd.Series(model_lassoCV.coef_, index=index)
featureImportance = tmp[tmp!=0]
featureImportance.sort_values(ascending=True,inplace=True)
X_train = X_train.loc[:,featureImportance.index]#根据选取需要的特征
X_test =X_test.loc[:,featureImportance.index]
# X = X.loc[:,featureImportance.index]

In [None]:
model_predictions = {}

In [None]:
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from xgboost import XGBClassifier
from catboost import CatBoostClassifier
from sklearn.model_selection import GridSearchCV, StratifiedKFold

# 定义参数网格
param_grids = {
    'RandomForest': {
        'n_estimators': [50, 100, 150, 200],
        'max_depth': [5, 10, 15, 20],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'max_features': ['sqrt', 0.3, 0.7]
    },
    'DecisionTree': {
        'max_depth': [5, 10, 15],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'max_features': ['sqrt', 0.3, 0.7]
    },
    'KNN': {
        'n_neighbors': [3, 5, 7, 9, 11],
        'weights': ['uniform', 'distance']
    },
    'LogisticRegression': {
        'penalty': ['l1', 'l2'],
        'C': [0.01, 0.1, 1, 10, 100],
        'solver': ['liblinear', 'saga']  # 根据penalty动态匹配
    },
    'SVM': {
        'C': [0.01, 0.1, 1, 10, 100],
        'kernel': ['linear', 'poly', 'rbf'],
        'gamma': ['scale', 0.1, 1]
    },
    'XGBoost': {
        'n_estimators': [50, 100, 200],
        'max_depth': [3, 6, 9],
        'learning_rate': [0.01, 0.05, 0.1],
        'colsample_bytree': [0.5, 0.7, 1]
    },
    'CatBoost': {
        'iterations': [100, 200],
        'depth': [4, 6, 8],
        'learning_rate': [0.01, 0.05, 0.1]
    },
    'GradientBoosting': {
        'n_estimators': [50, 100, 200],
        'learning_rate': [0.01, 0.1, 0.2],
        'max_depth': [3, 4, 5]
    },
    'MLP': {
        'hidden_layer_sizes': [(50,), (100,), (50,50)],
        'activation': ['relu', 'tanh'],
        'solver': ['adam'],
        'alpha': [0.0001, 0.001, 0.01]
    }
}

# 创建模型实例
models = {
    'RandomForest': RandomForestClassifier(random_state=2024),
    'DecisionTree': DecisionTreeClassifier(random_state=2024),
    # 'KNN': KNeighborsClassifier(),
    'LogisticRegression': LogisticRegression(random_state=2024, max_iter=1000),
    'SVM': SVC(random_state=2024, probability=True),
    'XGBoost': XGBClassifier(random_state=2024, verbosity=0),
    'CatBoost': CatBoostClassifier(random_state=2024, verbose=0),
    # 'GradientBoosting': GradientBoostingClassifier(random_state=2024),
    'MLP': MLPClassifier(max_iter=500, random_state=2024)
}

# 用于存储最佳参数和分数
best_params = {}
best_scores = {}

from sklearn.metrics import make_scorer, f1_score
# 定义一个使用宏平均F1分数的评分器
# macro_f1_scorer = make_scorer(f1_score, average='macro')

# 使用StratifiedKFold进行分层交叉验证
cv = StratifiedKFold(n_splits=5, random_state=2024, shuffle=True)
print(X_train.shape)
# 对每个模型进行GridSearchCV
for model_name in models.keys():
    print(f"Running GridSearchCV for {model_name}...")
    grid_search = GridSearchCV(estimator=models[model_name], param_grid=param_grids[model_name], cv=cv, n_jobs=4)
    grid_search.fit(X_train, Y_train)
    
    best_params[model_name] = grid_search.best_params_
    best_scores[model_name] = grid_search.best_score_
    print(f"Best parameters for {model_name}: {grid_search.best_params_}")
    print(f"Best score for {model_name}: {grid_search.best_score_}\n")

# 输出所有模型的最佳参数和分数
for model_name in best_params.keys():
    print(f"{model_name} - Best Parameters: {best_params[model_name]}")
    print(f"{model_name} - Best Score: {best_scores[model_name]}\n")


In [None]:
# 建立模型池
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.ensemble import VotingClassifier

# 机器学习分类模型建模
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import label_binarize
from sklearn.neural_network import MLPClassifier
from catboost import CatBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.utils import compute_sample_weight
random_seed = 2024
n_classes = [0, 1]

print(best_params)
# best_params = {'RandomForest': {'max_depth': None, 'max_features': 'sqrt', 'min_samples_leaf': 4, 'min_samples_split': 10, 'n_estimators': 200}, 'DecisionTree': {'max_depth': None, 'max_features': 'sqrt', 'min_samples_leaf': 8, 'min_samples_split': 2}, 'KNN': {'algorithm': 'auto', 'n_neighbors': 7, 'weights': 'distance'}, 'LogisticRegression': {'C': 10, 'penalty': 'l2', 'solver': 'liblinear'}, 'SVM': {'C': 100, 'gamma': 'auto', 'kernel': 'rbf'}, 'XGBoost': {'colsample_bytree': 1, 'learning_rate': 0.01, 'max_depth': 3, 'n_estimators': 100}, 'CatBoost': {'depth': 8, 'iterations': 100, 'learning_rate': 0.1}, 'GradientBoosting': {'learning_rate': 0.01, 'max_depth': 3, 'n_estimators': 200}, 'MLP': {'activation': 'logistic', 'alpha': 0.0001, 'hidden_layer_sizes': (100, 50), 'learning_rate': 'constant', 'solver': 'adam'}}
# best_params = {'RandomForest': {'max_depth': 20, 'max_features': 'sqrt', 'min_samples_leaf': 1, 'min_samples_split': 10, 'n_estimators': 100},
#             'DecisionTree': {'max_depth': 10, 'max_features': 'sqrt', 'min_samples_leaf': 2, 'min_samples_split': 2},
#             'KNN': {'algorithm': 'auto', 'n_neighbors': 9, 'weights': 'uniform'},
#             'LogisticRegression': {'C': 0.01, 'penalty': 'l2', 'solver': 'liblinear'},
#             'SVM': {'C': 1, 'gamma': 'scale', 'kernel': 'rbf'},
#             'XGBoost': {'colsample_bytree': 1, 'learning_rate': 0.01, 'max_depth': 3, 'n_estimators': 50},
#             'CatBoost': {'depth': 6, 'iterations': 100, 'learning_rate': 0.1},
#             'GradientBoosting': {'learning_rate': 0.01, 'max_depth': 3, 'n_estimators': 200},
#             'MLP': {'activation': 'relu', 'alpha': 0.0001, 'hidden_layer_sizes': (100,), 'learning_rate': 'constant', 'solver': 'sgd'}}

# 使用最佳参数初始化模型
rf_model = RandomForestClassifier(**best_params['RandomForest'], random_state=2024)
dt_model = DecisionTreeClassifier(**best_params['DecisionTree'], random_state=2024)
# knn_model = KNeighborsClassifier(**best_params['KNN'])
lr_model = LogisticRegression(**best_params['LogisticRegression'], random_state=2024, max_iter=1000)
svm_model = SVC(**best_params['SVM'], random_state=2024, probability=True)  
xgb_model = XGBClassifier(**best_params['XGBoost'], random_state=2024, verbosity=0)
cat_model = CatBoostClassifier(**best_params['CatBoost'], random_state=2024, verbose=0)
# gb_model = GradientBoostingClassifier(**best_params['GradientBoosting'], random_state=2024)
mlp_model = MLPClassifier(**best_params['MLP'], random_state=2024, max_iter=1000)
tuned_models = {
    'RandomForest': RandomForestClassifier(**best_params['RandomForest'], random_state=2024),
    'DecisionTree': DecisionTreeClassifier(**best_params['DecisionTree'], random_state=2024),
    # 'KNN': KNeighborsClassifier(**best_params['KNN']),
    'LogisticRegression': LogisticRegression(**best_params['LogisticRegression'], random_state=2024, max_iter=1000),
    'SVM':SVC(**best_params['SVM'], random_state=2024, probability=True)  ,
    'XGBoost': XGBClassifier(**best_params['XGBoost'], random_state=2024, verbosity=0),
    'CatBoost': CatBoostClassifier(**best_params['CatBoost'], random_state=2024, verbose=0),
    # 'GradientBoosting': GradientBoostingClassifier(**best_params['GradientBoosting'], random_state=2024),
    'MLP': MLPClassifier(**best_params['MLP'], random_state=2024, max_iter=1000)
}

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, f1_score, roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import label_binarize
from scipy.interpolate import interp1d


In [None]:
from sklearn.utils import resample
import numpy as np

def compute_bootstrap_ci(metric_func, y_true, y_pred, y_proba=None, n_bootstraps=1000, alpha=0.05):
    """
    使用 bootstrap 方法计算给定指标的置信区间.
    
    参数:
        metric_func: 评估指标函数（如 accuracy_score, f1_score 等）
        y_true: 测试集的真实标签
        y_pred: 测试集的预测标签
        y_proba: 测试集的预测概率（如果需要计算 AUC）
        n_bootstraps: 重采样的次数
        alpha: 显著性水平（0.05 表示 95% 置信区间）
    
    返回:
        均值, (下限, 上限) 的置信区间
    """
    bootstrapped_scores = []
    n_samples = len(y_true)

    # 引入重采样的部分
    for _ in range(n_bootstraps):
        indices = resample(np.arange(n_samples), replace=True, n_samples=n_samples)
        
        # 使用 iloc 来索引
        y_true_sample = y_true.iloc[indices]
        # y_pred_sample = y_pred.iloc[indices]
        y_pred_sample = y_pred[indices]

        # 如果有 y_proba
        if y_proba is not None:
            y_proba_sample = y_proba[indices]

        # 计算指标
        score = metric_func(y_true_sample, y_pred_sample)
        bootstrapped_scores.append(score)

    
    # 计算均值和置信区间
    bootstrapped_scores = np.array(bootstrapped_scores)
    lower = round(np.percentile(bootstrapped_scores, 100 * alpha / 2), 3)
    upper = round(np.percentile(bootstrapped_scores, 100 * (1 - alpha / 2)), 3)
    mean_score = np.mean(bootstrapped_scores)
    return mean_score, (lower, upper)

# 定义计算特异度的函数
def specificity_metric(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    tn = cm[0, 0]  # True negatives
    fp = cm[0, 1]  # False positives
    return tn / (tn + fp)

# 定义计算特异度的函数
def sensitivity_metric(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    tp = cm[1, 1]  # True positives
    fn = cm[1, 0]  # False negatives
    return tp / (tp + fn)

def ppv_metric(y_true, y_pred):
    """计算 Positive Predictive Value (PPV)"""
    cm = confusion_matrix(y_true, y_pred)
    TP = cm[1][1]
    FP = cm[0][1]
    return TP / (TP + FP) if (TP + FP) > 0 else 0

def npv_metric(y_true, y_pred):
    """计算 Negative Predictive Value (NPV)"""
    cm = confusion_matrix(y_true, y_pred)
    TN = cm[0][0]
    FN = cm[1][0]
    return TN / (TN + FN) if (TN + FN) > 0 else 0

from scipy.stats import norm
import numpy as np
from sklearn.metrics import roc_auc_score

def delong_ci(y_true, y_proba, alpha=0.95):
    """使用 DeLong 方法计算 AUC 的置信区间"""
    # auc = roc_auc_score(y_true, y_proba)
    # n1 = np.sum(y_true == 1)  # 正类样本数
    # n0 = len(y_true) - n1     # 负类样本数

    # # DeLong 变异计算
    # v10 = np.var(y_proba[y_true == 1]) / n1
    # v01 = np.var(y_proba[y_true == 0]) / n0
    # se = np.sqrt(v10 + v01)

    # # 计算 Z 值
    # z = norm.ppf(1 - (1 - alpha) / 2)
    # lower = max(0, auc - z * se)
    # upper = min(1, auc + z * se)
    # return auc, (lower, upper)
    auc = roc_auc_score(y_true, y_proba)
    n1 = np.sum(y_true == 1)  # 正类样本数
    n0 = len(y_true) - n1     # 负类样本数
    
    # DeLong方差计算
    v10 = 1 / n1 * np.sum((y_proba[y_true == 1] - auc) ** 2)
    v01 = 1 / n0 * np.sum((y_proba[y_true == 0] - auc) ** 2)
    se = np.sqrt((v10 / n1) + (v01 / n0))
    
    # 计算Z值
    z = norm.ppf(1 - (1 - alpha) / 2)
    lower = auc - z * se
    upper = auc + z * se
    return auc, (max(0, lower), min(1, upper))
    

In [None]:
base_models = [
    ('RandomForest', rf_model),
    ('LogisticRegression', lr_model),
    ('SVM', svm_model),
    ('XGBoost', xgb_model),
    # ('CatBoost', cat_model),
    ('MLP', mlp_model)
]
from sklearn.ensemble import StackingClassifier
tuned_models['stacking'] = StackingClassifier(estimators=base_models, final_estimator=LogisticRegression(), cv=5)
tuned_models['voting'] = VotingClassifier(estimators=base_models, voting='soft')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, f1_score, roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import label_binarize

n_classes = 2  # 二分类问题

# 初始化存储每个模型结果的字典
train_results = {}

# 初始化存储每个折叠结果的列表
train_tprs_micro = []
train_aucs_micro = []
train_mean_fpr_micro = np.linspace(0, 1, 100)
train_aucs_95ci = []
train_conf_matrices = []

# 评估模型在测试集上的性能
for model_name, model in tuned_models.items():
    model.fit(X_train, Y_train)
    # 预测测试集上的概率
    y_proba_train = model.predict_proba(X_train)
    

    # 计算ROC曲线和AUC
    fpr, tpr, _ = roc_curve(Y_train, y_proba_train[:, 1])
    roc_auc = auc(fpr, tpr)

    # 计算 micro 平均的ROC曲线和AUC
    interp_tpr_micro = np.interp(train_mean_fpr_micro, fpr, tpr)
    interp_tpr_micro[0] = 0.0  # 初始点为0
    train_tprs_micro.append(interp_tpr_micro)
    train_aucs_micro.append(roc_auc)
    
    # print(model.predict_proba(X_train))
    # 设定新的阈值
    threshold = 0.5

    # 使用新的阈值进行预测
    y_pred_train = (y_proba_train[:, 1] >= threshold).astype(int)
    
    # 计算训练集上的性能指标
    # y_pred_test = model.predict(X_test)
    accuracy_train = accuracy_score(Y_train, y_pred_train)
    f1_train = f1_score(Y_train, y_pred_train, average='weighted')

    # 计算测试集上的混淆矩阵
    cm = confusion_matrix(Y_train, y_pred_train, labels=np.unique(Y_train))
    train_conf_matrices.append(cm)

    # 计算sensitivity和specificity    
    TP = cm[1][1]  # 真阳性的数量
    FN = cm[1][0]  # 假阴性的数量
    TN = cm[0][0]  # 真阴性的数量
    FP = cm[0][1]  # 假阳性的数量
    sensitivity = TP / (TP + FN) if (TP + FN) != 0 else 0
    specificity = TN / (TN + FP) if (TN + FP) != 0 else 0
    ppv = TP / (TP + FP) if (TP + FP) != 0 else 0
    npv = TN / (TN + FN) if (TP + FN) != 0 else 0

    # 计算95%置信区间
    accuracy_ci = compute_bootstrap_ci(accuracy_score, Y_train, y_pred_train)
    f1_ci = compute_bootstrap_ci(lambda y, y_hat: f1_score(y, y_hat, average='weighted'), Y_train, y_pred_train)
    auc_ci = compute_bootstrap_ci(roc_auc_score, Y_train, y_proba_train[:, 1])

    sensitivity_ci = compute_bootstrap_ci(sensitivity_metric, Y_train, y_pred_train)

    specificity_ci = compute_bootstrap_ci(specificity_metric, Y_train, y_pred_train)
    ppv_mean, ppv_ci = compute_bootstrap_ci(ppv_metric, Y_train, y_pred_train)
    # print(f"PPV: {ppv_mean}, 95%CI: {ppv_ci}")

    # 计算 NPV 的置信区间
    npv_mean, npv_ci = compute_bootstrap_ci(npv_metric, Y_train, y_pred_train)

    train_aucs_95ci.append(auc_ci[1])

    # 输出模型的性能指标
    print(f'{model_name} (Internal Test Set):')
    # 输出指标及其95%置信区间
    print(f'  Accuracy = {accuracy_train:.3f}, 95%CI: {accuracy_ci[1]}')
    print(f'  AUC (micro) = {roc_auc:.3f}, 95%CI: {auc_ci[1]}')
    print(f'  F1-score = {f1_train:.3f}, 95%CI: {f1_ci[1]}')
    print(f'  Mean Sensitivity = {sensitivity_ci[0]:.3f}, 95%CI: {sensitivity_ci[1]}')
    print(f'  Mean Specificity = {specificity_ci[0]:.3f}, 95%CI: {specificity_ci[1]}')
    print(f'  PPV = {ppv:.3f}, 95%CI: {ppv_ci}')
    print(f'  NPV = {npv:.3f}, 95%CI: {npv_ci}')
    
    # 存储结果
    train_results[model_name] = {
        'accuracy': accuracy_train,
        'auc_micro': roc_auc,
        'f1_score': f1_train,
        'sensitivity': sensitivity,
        'specificity': specificity,
        'PPV': ppv,
        'NPV': npv,
    }
    
# 绘制 micro 平均的ROC曲线图
plt.figure(figsize=(8, 8))
for i, model_name in enumerate(tuned_models.keys()):
    plt.plot(train_mean_fpr_micro, train_tprs_micro[i], lw=2, label=f'{model_name} (AUC = {train_aucs_micro[i]:.3f} 95%CI: {train_aucs_95ci[i]})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves (Retrospective Set)')
plt.legend(loc='lower right')
plt.gca().set_aspect('equal', adjustable='box')
plt.xlim(0, 1)  # 设置 x 轴范围
plt.ylim(0, 1)  # 设置 y 轴范围
plt.show()

# 绘制每个模型的混淆矩阵图像
n_models = len(tuned_models)  # 模型数量
n_cols = 5  # 每行显示的模型数量
n_rows = (n_models + n_cols - 1) // n_cols  # 计算行数

# plt.figure(figsize=(20, 4 * n_rows))  # 调整整体图像大小

# for i, (model_name, conf_matrix) in enumerate(zip(tuned_models.keys(), train_conf_matrices)):
#     plt.subplot(n_rows, n_cols, i + 1)
#     disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=np.unique(Y_train))
#     disp.plot(cmap=plt.cm.Blues, ax=plt.gca(), colorbar=False)
#     plt.title(f'Confusion Matrix for {model_name}')
#     plt.xlabel('')  # 去掉x轴标签
#     plt.ylabel('')  # 去掉y轴标签

# plt.tight_layout()  # 调整子图间的布局以避免重叠
# plt.show()



In [None]:
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
# 绘制主图
plt.figure(figsize=(8, 8),dpi=600)
for i, model_name in enumerate(tuned_models.keys()):
    plt.plot(train_mean_fpr_micro, train_tprs_micro[i], lw=2,
             label=f'{model_name} (AUC = {train_aucs_micro[i]:.3f} 95%CI: {train_aucs_95ci[i][0]:.3f}-{train_aucs_95ci[i][1]:.3f})',)


# 主图设置
plt.xlabel('假阳性率')
plt.ylabel('真正率')
plt.title('ROC 曲线 (回顾性队列)')
plt.legend(loc='lower right', fontsize=8)
plt.xlim(0, 1)
plt.ylim(0, 1)


# 添加放大子图（调整位置和大小）
ax_inset = inset_axes(plt.gca(), width="40%", height="40%", loc='lower right',
                      bbox_to_anchor=(-0.32, 0.3,1.3,1.3),  # 控制位置和大小
                      bbox_transform=plt.gca().transAxes)
# 添加放大区域的边框线
# plt.gca().indicate_inset_zoom(ax_inset, edgecolor="gray")
# 绘制放大区域的曲线（限定坐标范围）
for i, model_name in enumerate(tuned_models.keys()):
    ax_inset.plot(train_mean_fpr_micro, train_tprs_micro[i], lw=1.5, alpha=0.8)
# 设置放大区域的坐标范围（示例：FPR 0~0.2，TPR 0.8~1）
ax_inset.set_xlim(0, 0.2)
ax_inset.set_ylim(0.8, 1.0)
ax_inset.grid(linestyle='--', alpha=0.5)



plt.show()

In [None]:
# base_models = [
#     ('RandomForest', RandomForestClassifier(**best_params['RandomForest'], random_state=2024)),
#     ('LogisticRegression', LogisticRegression(**best_params['LogisticRegression'], random_state=2024, max_iter=1000)),
#     ('SVM', SVC(**best_params['SVM'], random_state=2024, probability=True)),
#     ('XGBoost', XGBClassifier(**best_params['XGBoost'], random_state=2024, verbosity=0)),
#     ('CatBoost', CatBoostClassifier(**best_params['CatBoost'], random_state=2024, verbose=0)),
#     ('MLP', MLPClassifier(**best_params['MLP'], random_state=2024, max_iter=1000))
# ]
# from sklearn.ensemble import StackingClassifier
# tuned_models['stacking'] = StackingClassifier(estimators=base_models, final_estimator=LogisticRegression(), cv=5)
# tuned_models['voting'] = VotingClassifier(estimators=base_models, voting='soft')

# tuned_models['stacking'].fit(X_train, Y_train)
# tuned_models['voting'].fit(X_train, Y_train)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, f1_score, roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import label_binarize

n_classes = 2  # 二分类问题

# 初始化存储每个模型结果的字典
test_results = {}

# 初始化存储每个折叠结果的列表
test_tprs_micro = []
test_aucs_micro = []
test_mean_fpr_micro = np.linspace(0, 1, 100)
test_aucs_95ci = []
test_conf_matrices = []

# 评估模型在测试集上的性能
for model_name, model in tuned_models.items():
    # 预测测试集上的概率
    y_proba_test = model.predict_proba(X_test)
    model_predictions[model_name] = y_proba_test[:, 1]
    
    # 计算ROC曲线和AUC
    fpr, tpr, _ = roc_curve(Y_test, y_proba_test[:, 1])
    roc_auc = auc(fpr, tpr)

    # 计算 micro 平均的ROC曲线和AUC
    interp_tpr_micro = np.interp(test_mean_fpr_micro, fpr, tpr)
    interp_tpr_micro[0] = 0.0  # 初始点为0
    test_tprs_micro.append(interp_tpr_micro)
    test_aucs_micro.append(roc_auc)
    
    # print(model.predict_proba(X_test))
    # 设定新的阈值
    threshold = 0.5

    # 使用新的阈值进行预测
    y_pred_test = (y_proba_test[:, 1] >= threshold).astype(int)
    
    # 计算测试集上的性能指标
    # y_pred_test = model.predict(X_test)
    accuracy_test = accuracy_score(Y_test, y_pred_test)
    f1_test = f1_score(Y_test, y_pred_test, average='weighted')

    # 计算测试集上的混淆矩阵
    cm = confusion_matrix(Y_test, y_pred_test, labels=np.unique(Y_train))
    test_conf_matrices.append(cm)

    # 计算sensitivity和specificity    
    TP = cm[1][1]  # 真阳性的数量
    FN = cm[1][0]  # 假阴性的数量
    TN = cm[0][0]  # 真阴性的数量
    FP = cm[0][1]  # 假阳性的数量
    sensitivity = TP / (TP + FN) if (TP + FN) != 0 else 0
    specificity = TN / (TN + FP) if (TN + FP) != 0 else 0
    ppv = TP / (TP + FP) if (TP + FP) != 0 else 0
    npv = TN / (TN + FN) if (TP + FN) != 0 else 0

    # 计算95%置信区间
    accuracy_ci = compute_bootstrap_ci(accuracy_score, Y_test, y_pred_test)
    f1_ci = compute_bootstrap_ci(lambda y, y_hat: f1_score(y, y_hat, average='weighted'), Y_test, y_pred_test)
    auc_ci = compute_bootstrap_ci(roc_auc_score, Y_test, y_proba_test[:, 1])

    sensitivity_ci = compute_bootstrap_ci(sensitivity_metric, Y_test, y_pred_test)

    specificity_ci = compute_bootstrap_ci(specificity_metric, Y_test, y_pred_test)
    ppv_mean, ppv_ci = compute_bootstrap_ci(ppv_metric, Y_test, y_pred_test)
    # print(f"PPV: {ppv_mean}, 95%CI: {ppv_ci}")

    # 计算 NPV 的置信区间
    npv_mean, npv_ci = compute_bootstrap_ci(npv_metric, Y_test, y_pred_test)

    test_aucs_95ci.append(auc_ci[1])

    # 输出模型的性能指标
    print(f'{model_name} (Internal Test Set):')
    # 输出指标及其95%置信区间
    print(f'  Accuracy = {accuracy_test:.3f}, 95%CI: {accuracy_ci[1]}')
    print(f'  AUC (micro) = {roc_auc:.3f}, 95%CI: {auc_ci[1]}')
    print(f'  F1-score = {f1_test:.3f}, 95%CI: {f1_ci[1]}')
    print(f'  Mean Sensitivity = {sensitivity_ci[0]:.3f}, 95%CI: {sensitivity_ci[1]}')
    print(f'  Mean Specificity = {specificity_ci[0]:.3f}, 95%CI: {specificity_ci[1]}')
    print(f'  PPV = {ppv:.3f}, 95%CI: {ppv_ci}')
    print(f'  NPV = {npv:.3f}, 95%CI: {npv_ci}')
    
    # 存储结果
    test_results[model_name] = {
        'accuracy': accuracy_test,
        'auc_micro': roc_auc,
        'f1_score': f1_test,
        'sensitivity': sensitivity,
        'specificity': specificity,
        'PPV': ppv,
        'NPV': npv,
    }
    

# # 绘制主图
# plt.figure(figsize=(10, 10))
# for i, model_name in enumerate(tuned_models.keys()):
#     plt.plot(test_mean_fpr_micro, test_tprs_micro[i], lw=2, 
#              label=f'{model_name} (AUC = {test_aucs_micro[i]:.3f} 95%CI: {test_aucs_95ci[i]})')

# # 添加放大子图（调整位置和大小）
# ax_inset = inset_axes(plt.gca(), width="40%", height="40%", loc='lower right',
#                       bbox_to_anchor=(0.1, 0.1, 0.9, 0.9),  # 控制位置和大小
#                       bbox_transform=plt.gca().transAxes)

# # 绘制放大区域的曲线（限定坐标范围）
# for i, model_name in enumerate(tuned_models.keys()):
#     ax_inset.plot(test_mean_fpr_micro, test_tprs_micro[i], lw=1.5, alpha=0.8)

# # 设置放大区域的坐标范围（示例：FPR 0~0.2，TPR 0.8~1）
# ax_inset.set_xlim(0, 0.3)
# ax_inset.set_ylim(0.7, 1.0)
# ax_inset.grid(linestyle='--', alpha=0.5)

# # 添加放大区域的边框线
# plt.gca().indicate_inset_zoom(ax_inset, edgecolor="gray")

# # 主图设置
# plt.xlabel('False Positive Rate')
# plt.ylabel('True Positive Rate')
# plt.title('ROC Curves with Zoomed Inset')
# plt.legend(loc='lower right', fontsize=8)
# plt.xlim(0, 1)
# plt.ylim(0, 1)
# plt.show()

# # 绘制 micro 平均的ROC曲线图
# plt.figure(figsize=(8, 8))
# for i, model_name in enumerate(tuned_models.keys()):
#     plt.plot(test_mean_fpr_micro, test_tprs_micro[i], lw=2, label=f'{model_name} (AUC = {test_aucs_micro[i]:.3f} 95%CI: {test_aucs_95ci[i]})')
# plt.xlabel('False Positive Rate')
# plt.ylabel('True Positive Rate')
# plt.title('ROC Curves (Test Set)')
# plt.legend(loc='lower right')
# plt.gca().set_aspect('equal', adjustable='box')
# plt.xlim(0, 1)  # 设置 x 轴范围
# plt.ylim(0, 1)  # 设置 y 轴范围
# plt.show()

# # 绘制每个模型的混淆矩阵图像
# n_models = len(tuned_models)  # 模型数量
# n_cols = 5  # 每行显示的模型数量
# n_rows = (n_models + n_cols - 1) // n_cols  # 计算行数

# plt.figure(figsize=(20, 4 * n_rows))  # 调整整体图像大小

# for i, (model_name, conf_matrix) in enumerate(zip(tuned_models.keys(), test_conf_matrices)):
#     plt.subplot(n_rows, n_cols, i + 1)
#     disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=np.unique(Y_train))
#     disp.plot(cmap=plt.cm.Blues, ax=plt.gca(), colorbar=False)
#     plt.title(f'Confusion Matrix for {model_name}')
#     plt.xlabel('')  # 去掉x轴标签
#     plt.ylabel('')  # 去掉y轴标签

# plt.tight_layout()  # 调整子图间的布局以避免重叠
# plt.show()



In [None]:
# 绘制主图
plt.figure(figsize=(8, 8),dpi=600)
for i, model_name in enumerate(tuned_models.keys()):
    # print(i)
    plt.plot(test_mean_fpr_micro, test_tprs_micro[i], lw=2,
             label=f'{model_name} (AUC = {test_aucs_micro[i]:.3f} 95%CI: {test_aucs_95ci[i][0]:.3f}-{test_aucs_95ci[i][1]:.3f})',)


# 主图设置
plt.xlabel('假阳性率')
plt.ylabel('真正率')
plt.title('ROC 曲线 (前瞻性队列)')
plt.legend(loc='lower right', fontsize=8)
plt.xlim(0, 1)
plt.ylim(0, 1)


# 添加放大子图（调整位置和大小）
ax_inset = inset_axes(plt.gca(), width="40%", height="40%", loc='lower right',
                      bbox_to_anchor=(-0.32, 0.3,1.3,1.3),  # 控制位置和大小
                      bbox_transform=plt.gca().transAxes)
# 添加放大区域的边框线
# plt.gca().indicate_inset_zoom(ax_inset, edgecolor="gray")
# 绘制放大区域的曲线（限定坐标范围）
for i, model_name in enumerate(tuned_models.keys()):
    ax_inset.plot(test_mean_fpr_micro, test_tprs_micro[i], lw=1.5, alpha=0.8)
# 设置放大区域的坐标范围（示例：FPR 0~0.2，TPR 0.8~1）
ax_inset.set_xlim(0.0, 0.3)
ax_inset.set_ylim(0.7, 1.0)
ax_inset.grid(linestyle='--', alpha=0.5)



plt.show()

In [None]:
import matplotlib.pyplot as plt
from sklearn.calibration import CalibrationDisplay
from sklearn.metrics import brier_score_loss
import numpy as np
from scipy.interpolate import make_interp_spline
bin_edges = [0,0.1,0.2,0.5,0.8,1.0]

# 计算模型的校准曲线
def custom_calibration_curve(y_true, y_prob, bins):
    # 初始化存储校准曲线的数据
    fraction_of_positives = []
    mean_predicted_value = []
    
    # 将预测概率分入各个区间
    for i in range(len(bins) - 1):
        bin_start = bins[i]
        bin_end = bins[i + 1]
        
        # 在这个区间内找到所有概率
        # 如果是最后一个分箱，使用 <= 1.0 的条件
        if i == len(bins) - 2:
            bin_mask = (y_prob >= bin_start) & (y_prob <= bin_end)
        else:
            bin_mask = (y_prob >= bin_start) & (y_prob < bin_end)
        bin_true = y_true[bin_mask]
        bin_prob = y_prob[bin_mask]
        
        if len(bin_true) > 0:  # 确保该区间不为空
            fraction_of_positives.append(np.mean(bin_true))  # 该区间的正类比例
            mean_predicted_value.append(np.mean(bin_prob))  # 该区间的平均预测概率
    
    return np.array(mean_predicted_value), np.array(fraction_of_positives)

# 计算每个模型的 Brier score
brier_scores = {}
for model_name, model in tuned_models.items():
    # if model_name == 'stacking' or model_name == 'voting' or model_name == 'RandomForest' or model_name == 'XGBoost' or model_name == 'MLP':
    y_prob = model.predict_proba(X_test)[:, 1]
    brier_score = brier_score_loss(Y_test, y_prob)
    brier_scores[model_name] = brier_score

# 按 Brier score 升序排列模型
# sorted_models = sorted(brier_scores.items(), key=lambda x: x[1])

# 创建图形对象
plt.figure(figsize=(8, 8), dpi=600)

# 设置颜色和线型
colors = plt.cm.viridis(np.linspace(0, 1, len(brier_scores.items())))
# colors='blue'
# linestyles = ['-', '--', '-.', ':', '-', '--', '-.', ':', '-']

# 绘制模型的校准曲线
for (model_name, _), color in zip(brier_scores.items(), colors):
    model = tuned_models[model_name]
    y_prob = model.predict_proba(X_test)[:, 1]
    # CalibrationDisplay.from_estimator(model, X_test, Y_test, n_bins=5, name=f'{model_name} (Brier: {brier_scores[model_name]:.4f})', ax=plt.gca(), color=color, linestyle='--')
    model_brier = brier_score_loss(Y_test, y_prob)
    prob_true, prob_pred = custom_calibration_curve(Y_test, y_prob, bin_edges)
    print(model_name, prob_true.shape, prob_pred.shape)
    # 使用插值方法对校准曲线进行平滑处理
    spl = make_interp_spline(prob_true, prob_pred, k=2)  # k=3 是三次样条插值
    smooth_x = np.linspace(prob_true.min(), prob_true.max(), 10)
    smooth_y = spl(smooth_x)
    stack_brier_ci = compute_bootstrap_ci(brier_score_loss, Y_test, y_prob)
    plt.plot(smooth_x, smooth_y, label=f'{model_name} (Brier Score: {model_brier:.3f})', linestyle='-', marker='')
    # 绘制完美校准的对角线
plt.plot([0, 1], [0, 1], linestyle='--', color='gray', label='Perfectly Calibrated')

# 添加标题和标签
plt.title('校准曲线')
plt.xlabel('预测概率')
plt.ylabel('真实频率')
plt.legend(loc='best')
ax = plt.gca()
ax.set_aspect(1)
# 显示图像
plt.tight_layout()
plt.show()


In [None]:
# 各模型DCA曲线
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
print(len(Y_test))
def calculate_net_benefit(y_true, y_pred_prob, thresholds):
    net_benefit = []
    total_samples = len(y_true)
    
    for threshold in thresholds:
        y_pred = (y_pred_prob >= threshold).astype(int)
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        
        net_benefit.append(tp / total_samples - (fp / total_samples) * (threshold / (1 - threshold)))
    
    return net_benefit

thresholds = np.linspace(0, 1, 35)

# selected_models = {
#     # 'RandomForest': models['RandomForest'],
#     # 'LogisticRegression': models['LogisticRegression'],
#     'XGBoost': models['XGBoost']
# }

# 存储每个模型的净收益
net_benefit_models = {}

# 为每个基础模型计算预测概率和净收益
for model_name, model in tuned_models.items():
    # if model_name == 'stacking' or model_name == 'voting' or model_name == 'RandomForest' or model_name == 'XGBoost' or model_name == 'MLP':
    y_pred_prob = model.predict_proba(X_test)[:, 1]  # 针对二分类，选取正类概率
    net_benefit = calculate_net_benefit(Y_test, y_pred_prob, thresholds)
    net_benefit_models[model_name] = net_benefit
    

# 计算全干预线的净收益
full_intervention_net_benefit = []
for threshold in thresholds:
    net_benefit = (35 / len(Y_test)) - (73 / len(Y_test)) * (threshold / (1 - threshold))
    full_intervention_net_benefit.append(net_benefit)
    

calibra_color = plt.cm.viridis(np.linspace(0, 1, len(tuned_models)))
# 绘制 DCA 曲线
plt.figure(figsize=(8, 8), dpi=600)
for model_name, net_benefit in net_benefit_models.items():
    plt.plot(thresholds, net_benefit, label=f"{model_name}")
# plt.plot(thresholds, net_benefit_vote, label="VotingClassifier", color='blue')
# plt.plot(thresholds, net_benefit_stack, label="StackingClassifier", color='red')
plt.plot(thresholds, full_intervention_net_benefit, label="ALL", linestyle='--')
plt.axhline(0, color='grey', linestyle='--', label="None")
plt.ylim(-0.1, 0.4)
plt.xlabel("阈值")
plt.ylabel("净收益")
plt.title("决策曲线分析")
plt.legend()
plt.show()


In [None]:
def compute_bootstrap_ci(metric_func, y_true, y_pred, y_proba=None, n_bootstraps=1000, alpha=0.05):
    """
    使用 bootstrap 方法计算给定指标的置信区间.
    
    参数:
        metric_func: 评估指标函数（如 accuracy_score, f1_score 等）
        y_true: 测试集的真实标签
        y_pred: 测试集的预测标签
        y_proba: 测试集的预测概率（如果需要计算 AUC）
        n_bootstraps: 重采样的次数
        alpha: 显著性水平（0.05 表示 95% 置信区间）
    
    返回:
        均值, (下限, 上限) 的置信区间
    """
    bootstrapped_scores = []
    n_samples = len(y_true)

    # 引入重采样的部分
    for _ in range(n_bootstraps):
        indices = resample(np.arange(n_samples), replace=True, n_samples=n_samples)
        
        # 使用 iloc 来索引
        y_true_sample = y_true.iloc[indices]
        y_pred_sample = y_pred[indices]

        # 如果有 y_proba
        if y_proba is not None:
            y_proba_sample = y_proba[indices]

        # 计算指标
        score = metric_func(y_true_sample, y_pred_sample)
        bootstrapped_scores.append(score)

    
    # 计算均值和置信区间
    bootstrapped_scores = np.array(bootstrapped_scores)
    lower = round(np.percentile(bootstrapped_scores, 100 * alpha / 2), 3)
    upper = round(np.percentile(bootstrapped_scores, 100 * (1 - alpha / 2)), 3)
    mean_score = np.mean(bootstrapped_scores)
    return mean_score, (lower, upper)




In [None]:
import shap
# 选择一个基模型进行SHAP分析，例如XGBoost

stacking_model = tuned_models['stacking']
model_to_explain = stacking_model.named_estimators_['XGBoost']  # XGBoost作为例子
X_train_for_shap = X_train  # 用训练数据来计算SHAP值

# 创建SHAP解释器
explainer = shap.Explainer(model_to_explain, X_train_for_shap)

# 计算SHAP值
shap_values = explainer(X_train_for_shap)

# 绘制SHAP值图
shap.summary_plot(shap_values, X_train_for_shap)

# 获取类别 1 的 SHAP 值
shap_values_test = explainer.shap_values(X_test)
# 绘制SHAP值总结图（Summary Plot）
plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
shap.summary_plot(shap_values, X_train, plot_type="bar", show=False)
plt.title("X_train")
plt.xlabel('')  # 移除 x 轴标签避免x轴重叠
plt.xlim(0,2)

plt.subplot(1, 2, 2)
shap.summary_plot(shap_values_test, X_test, plot_type="bar", show=False)
plt.title("X_test")
plt.xlabel('')  
plt.xlim(0,2)

plt.tight_layout()
plt.show()

In [None]:
import shap
import matplotlib.pyplot as plt

# 假设 stacking_model 是你已经训练好的 Stacking 模型
# 假设 X_test 是测试集

# 使用 TreeExplainer (如果是树模型的话) 或 KernelExplainer (对于一般模型)
# explainer = shap.TreeExplainer(stacking_model)  # 如果是树模型
explainer = shap.KernelExplainer(stacking_model.predict_proba, X_train, output='probability')  # 对于其他模型

# 计算 SHAP 值
shap_values = explainer.shap_values(X_test)  # 对测试集进行 SHAP 计算

In [None]:


# 假设你要分析 X_test 中的第一个样本
sample_idx = 3  # 选择第一个
# print(shap_values[1])
# print(explainer.expected_value[1])
sample_shap_values = shap_values[:, :, 1]  # 如果是二分类，1表示正类的 SHAP 值

# print(sample_shap_values)
# print(sample_shap_values[sample_idx])  # 打印期望值
shap.force_plot(explainer.expected_value[1], sample_shap_values[sample_idx], X_A[featureImportance.index.values.tolist()].iloc[sample_idx], matplotlib=True)
plt.show()

In [None]:
# plt.subplot(1, 2, 2)
shap.summary_plot(shap_values[:, :, 1], X_test, plot_type="bar", show=False)
# plt.title("X_test")
plt.xlabel('mean(|SHAP value|) \n(average impact on model output magnitude)')  
plt.xlim(0,0.2)
plt.xticks(np.arange(0, 0.3, 0.1))
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import pandas as pd

file_path = './final_stats/prospective.xlsx'  # 替换为您的文件路径

data = pd.read_excel(file_path)
# data = data.dropna()
columns_to_keep = ['AGE', 'BMI', 'tPSA', 'fPSA', 'f/t', 'p2PSA',
                   'PHI', 'Volume', 'PSAD', 'PHID', 'Blood Neutrophil',
                   'Blood Lymphocyte', 'NLR', 'PI-RADS', 'Urinary leukocytes',
                   'Gleason Score']
X_test_save = data[columns_to_keep]
Y_test = data['SPC']
# X_train_save, X_test_save, Y_train_save, Y_test_save = train_test_split(X, Y, test_size=0.3, random_state=2024, stratify=Y)

# 计算中位数
# median1 = np.mean(y_train_prob_stack[Y_train == 0])
# median2 = np.mean(y_train_prob_stack[Y_train == 1])
# median2 = 0.9

# 划分区间
# def classify_interval(prob):
#     if prob < median1:
#         return 'low'
#     elif median1 <= prob < median2:
#         return 'intermediate'
#     else:
#         return 'high'


# 创建训练集结果的 DataFrame
# train_results_df = pd.DataFrame({
#     'Gleason Score': X_train_save['Gleason Score'],
#     'PSA': X_train_save['tPSA'],
#     'PHID': X_train_save['PHID'],
#     'PI-RADS': X_train_save['PI-RADS'],
#     'Volume': X_train_save['Volume'],
#     'probabilities': y_train_prob_stack,
#     'labels': Y_train,
# })
save_model_name = 'stacking'
# print(tuned_models)
for model_name, model in tuned_models.items():
    if model_name == save_model_name:
        print(model_name)
        y_test_prob_save = model.predict_proba(X_test)[:, 1]
        # 创建测试集结果的 DataFrame
        test_results_df = pd.DataFrame({
            'Gleason Score': X_test_save['Gleason Score'],
            'PSA' : X_test_save['tPSA'],
            'PHI': X_test_save['PHI'],
            'PSAD': X_test_save['PSAD'],
            'PHID': X_test_save['PHID'],
            'PI-RADS': X_test_save['PI-RADS'],
            'Volume': X_test_save['Volume'],
            'probabilities': y_test_prob_save,
            'labels': Y_test,
        })

# 将训练集和测试集的DataFrame合并
# results_df = pd.concat([train_results_df, test_results_df], ignore_index=True)
# print(results_df.shape)
# 将合并后的DataFrame保存到CSV文件
test_results_df.to_excel('./final_stats/stacking_test_predictions_new.xlsx', index=False)




In [None]:
# print(X_test.head)

In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, recall_score, precision_score, confusion_matrix

# 1. 获取模型的预测概率
y_probs = tuned_models['stacking'].predict_proba(X_train)[:, 1]  # 取正类的概率
y_true = Y_train  # 真实标签

# 2. 定义不同的阈值
thresholds = np.linspace(0, 1, 1000)  # 生成多个阈值

# 3. 存储性能指标
results = []

for thresh in thresholds:
    y_pred = (y_probs >= thresh).astype(int)  # 应用阈值进行分类
    
    # 计算混淆矩阵
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    
    # 计算各项指标
    # avoid_bispsy = 
    accuracy = accuracy_score(y_true, y_pred)
    auc = roc_auc_score(y_true, y_probs)  # AUC 与阈值无关
    f1 = f1_score(y_true, y_pred)
    sensitivity = recall_score(y_true, y_pred)  # 召回率
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0  # 特异度
    ppv = precision_score(y_true, y_pred) if (tp + fp) > 0 else 0  # 阳性预测值 PPV
    npv = tn / (tn + fn) if (tn + fn) > 0 else 0  # 阴性预测值 NPV

    results.append([thresh, accuracy, auc, f1, sensitivity, specificity, ppv, npv])

# 4. 转换为 DataFrame
df_results = pd.DataFrame(results, columns=["Threshold", "Accuracy", "AUC", "F1-score", "Sensitivity", "Specificity", "PPV", "NPV"])

# 5. 查找 NPV = 100% 的最佳阈值
df_npv_100 = df_results[df_results["NPV"] == 1.0]

if not df_npv_100.empty:
    # 选择 Accuracy 最高的行作为最优阈值
    best_row = df_npv_100.loc[df_npv_100["Accuracy"].idxmax()]
    best_threshold = best_row["Threshold"]

    print(f"最佳阈值 (NPV=100%): {best_threshold:.4f}")
    print(best_row)
else:
    print("找不到 NPV=100% 的阈值，尝试放宽条件。")


df_npv_95 = df_results[(0.96 >=  df_results["NPV"]) & (df_results["NPV"] >= 0.95)]

if not df_npv_95.empty:
    # 选择 Accuracy 最高的行作为最优阈值
    best_row = df_npv_95.loc[df_npv_95["Accuracy"].idxmax()]
    best_threshold = best_row["Threshold"]

    print(f"最佳阈值 (NPV>=95%): {best_threshold:.4f}")
    print(best_row)
else:
    print("找不到 NPV>=95% 的阈值，尝试放宽条件。")
    
df_ppv_100 = df_results[df_results["PPV"] == 1.0]

if not df_ppv_100.empty:
    # 选择 Accuracy 最高的行作为最优阈值
    best_row = df_ppv_100.loc[df_ppv_100["Accuracy"].idxmax()]
    best_threshold = best_row["Threshold"]

    print(f"最佳阈值 (PPV=100%): {best_threshold:.4f}")
    print(best_row)
else:
    print("找不到 PPV=100% 的阈值，尝试放宽条件。")


df_ppv_95 = df_results[(0.96 >=  df_results["PPV"]) & (df_results["PPV"] >= 0.95)]

if not df_ppv_95.empty:
    # 选择 Accuracy 最高的行作为最优阈值
    best_row = df_ppv_95.loc[df_ppv_95["Accuracy"].idxmax()]
    best_threshold = best_row["Threshold"]

    print(f"最佳阈值 (PPV>=95%): {best_threshold:.4f}")
    print(best_row)
else:
    print("找不到 PPV=95% 的阈值，尝试放宽条件。")


In [None]:


X_test_psa_leq_10 = X_B[X_B['tPSA'] <= 10]
X_test_psa_gt_10 = X_B[X_B['tPSA'] > 10]
X_test_psa_leq_10_transformed = preprocessor.transform(X_test_psa_leq_10)
X_test_psa_gt_10_transformed = preprocessor.transform(X_test_psa_gt_10)
# 4. 将结果转换为 DataFrame
X_test_psa_leq_10_scaled_df = pd.DataFrame(X_test_psa_leq_10_transformed, columns=continuous_features + non_trans_features)
X_test_psa_gt_10_scaled_df = pd.DataFrame(X_test_psa_gt_10_transformed, columns=continuous_features + non_trans_features)

# 5. 确保列的顺序和原始数据一致
X_test_psa_leq_10 = X_test_psa_leq_10_scaled_df[continuous_features + non_trans_features]
X_test_psa_gt_10 = X_test_psa_gt_10_scaled_df[continuous_features + non_trans_features]
# 1. 根据 tPSA 划分 X_test 和 Y_test
Y_test_psa_leq_10 = y_B[X_B['tPSA'] <= 10]
Y_test_psa_gt_10 = y_B[X_B['tPSA'] > 10]

X_test_psa_leq_10 = X_test_psa_leq_10.loc[:,featureImportance.index]
X_test_psa_gt_10 = X_test_psa_gt_10.loc[:,featureImportance.index]



In [None]:
X_test = X_test_psa_gt_10
Y_test = Y_test_psa_gt_10
from sklearn.metrics import accuracy_score, f1_score, roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay
# 初始化存储每个模型结果的字典
test_results = {}

# 初始化存储每个折叠结果的列表
test_tprs_micro = []
test_aucs_micro = []
test_mean_fpr_micro = np.linspace(0, 1, 100)
test_aucs_95ci = []
test_conf_matrices = []

# 评估模型在测试集上的性能
for model_name, model in tuned_models.items():
    if model_name == 'voting':
        # 预测测试集上的概率
        y_proba_test = model.predict_proba(X_test)
        model_predictions[model_name] = y_proba_test[:, 1]
        
        # 计算ROC曲线和AUC
        fpr, tpr, _ = roc_curve(Y_test, y_proba_test[:, 1])
        # print(fpr, tpr)
        roc_auc = auc(fpr, tpr)

        # 计算 micro 平均的ROC曲线和AUC
        interp_tpr_micro = np.interp(test_mean_fpr_micro, fpr, tpr)
        interp_tpr_micro[0] = 0.0  # 初始点为0
        test_tprs_micro.append(interp_tpr_micro)
        test_aucs_micro.append(roc_auc)
        
        # print(model.predict_proba(X_test))
        # 设定新的阈值
        threshold = 0.5

        # 使用新的阈值进行预测
        y_pred_test = (y_proba_test[:, 1] >= threshold).astype(int)
        
        # 计算测试集上的性能指标
        # y_pred_test = model.predict(X_test)
        accuracy_test = accuracy_score(Y_test, y_pred_test)
        f1_test = f1_score(Y_test, y_pred_test, average='weighted')

        # 计算测试集上的混淆矩阵
        cm = confusion_matrix(Y_test, y_pred_test, labels=np.unique(Y_train))
        test_conf_matrices.append(cm)

        # 计算sensitivity和specificity    
        TP = cm[1][1]  # 真阳性的数量
        FN = cm[1][0]  # 假阴性的数量
        TN = cm[0][0]  # 真阴性的数量
        FP = cm[0][1]  # 假阳性的数量
        sensitivity = TP / (TP + FN) if (TP + FN) != 0 else 0
        specificity = TN / (TN + FP) if (TN + FP) != 0 else 0
        ppv = TP / (TP + FP) if (TP + FP) != 0 else 0
        npv = TN / (TN + FN) if (TP + FN) != 0 else 0

        # 计算95%置信区间
        accuracy_ci = compute_bootstrap_ci(accuracy_score, Y_test, y_pred_test)
        f1_ci = compute_bootstrap_ci(lambda y, y_hat: f1_score(y, y_hat, average='weighted'), Y_test, y_pred_test)
        auc_ci = compute_bootstrap_ci(roc_auc_score, Y_test, y_proba_test[:, 1])

        sensitivity_ci = compute_bootstrap_ci(sensitivity_metric, Y_test, y_pred_test)

        specificity_ci = compute_bootstrap_ci(specificity_metric, Y_test, y_pred_test)
        ppv_mean, ppv_ci = compute_bootstrap_ci(ppv_metric, Y_test, y_pred_test)
        # print(f"PPV: {ppv_mean}, 95%CI: {ppv_ci}")

        # 计算 NPV 的置信区间
        npv_mean, npv_ci = compute_bootstrap_ci(npv_metric, Y_test, y_pred_test)

        test_aucs_95ci.append(auc_ci[1])

        # 输出模型的性能指标
        print(f'{model_name} (Internal Test Set):')
        # 输出指标及其95%置信区间
        print(f'  Accuracy = {accuracy_test:.3f}, 95%CI: {accuracy_ci[1]}')
        print(f'  AUC (micro) = {roc_auc:.3f}, 95%CI: {auc_ci[1]}')
        print(f'  F1-score = {f1_test:.3f}, 95%CI: {f1_ci[1]}')
        print(f'  Mean Sensitivity = {sensitivity_ci[0]:.3f}, 95%CI: {sensitivity_ci[1]}')
        print(f'  Mean Specificity = {specificity_ci[0]:.3f}, 95%CI: {specificity_ci[1]}')
        print(f'  PPV = {ppv:.3f}, 95%CI: {ppv_ci}')
        print(f'  NPV = {npv:.3f}, 95%CI: {npv_ci}')
        
        # 存储结果
        test_results[model_name] = {
            'accuracy': accuracy_test,
            'auc_micro': roc_auc,
            'f1_score': f1_test,
            'sensitivity': sensitivity,
            'specificity': specificity,
            'PPV': ppv,
            'NPV': npv,
        }
    

In [None]:
print(X_test.shape)

In [None]:
import numpy as np
import pandas as pd

file_path = './final_stats/prospective.xlsx'  # 替换为您的文件路径

data = pd.read_excel(file_path)
# data = data.dropna()
columns_to_keep = ['AGE', 'BMI', 'tPSA', 'fPSA', 'f/t', 'p2PSA',
                   'PHI', 'Volume', 'PSAD', 'PHID', 'Blood Neutrophil',
                   'Blood Lymphocyte', 'NLR', 'PI-RADS', 'Urinary leukocytes',
                   'Gleason Score']

# print(data)
Y_test = data['SPC']
save_model_name = 'voting'
Y_test = Y_test[X_B['tPSA'] > 10]
data = data[columns_to_keep]
X_test_save = data[X_B['tPSA'] > 10]

print(X_test_save, X_test_psa_gt_10.head())
# print(X_test_save.head(),X_test.shape)
# print(tuned_models)
for model_name, model in tuned_models.items():
    if model_name == save_model_name:
        print(model_name)
        y_test_prob_save = model.predict_proba(X_test_psa_gt_10)[:, 1]
        # 创建测试集结果的 DataFrame
        test_results_df = pd.DataFrame({
            'Gleason Score': X_test_save['Gleason Score'],
            'PSA' : X_test_save['tPSA'],
            'PHID': X_test_save['PHID'],
            'PI-RADS': X_test_save['PI-RADS'],
            'Volume': X_test_save['Volume'],
            'probabilities': y_test_prob_save,
            'labels': Y_test,
        })

# 将训练集和测试集的DataFrame合并
# results_df = pd.concat([train_results_df, test_results_df], ignore_index=True)
# print(results_df.shape)
# 将合并后的DataFrame保存到CSV文件
test_results_df.to_excel('./final_stats/voting_test_predictions_tpsa>10.xlsx', index=False)

In [None]:
def auc_ci(y_true, y_pred_proba):
    # Bootstrap法计算置信区间
    n_iterations = 1000
    auc_scores = []

    for _ in range(n_iterations):
        # 重采样数据
        indices = resample(np.arange(len(y_true)), n_samples=len(y_true))
        y_true_resampled = y_true[indices]
        y_pred_resampled = y_pred_proba[indices]
        
        # 计算每次采样的AUC
        auc_scores.append(roc_auc_score(y_true_resampled, y_pred_resampled))

    # 计算置信区间
    lower = round(np.percentile(auc_scores, 2.5),3)
    upper = round(np.percentile(auc_scores, 97.5),3)

    print(f"AUC: {auc}")
    print(f"95% Confidence Interval: ({lower}, {upper})")
    return (lower, upper)

In [None]:
from sklearn.metrics import roc_curve, auc
file_path = './final_stats/prospective.xlsx'  # 替换为您的文件路径

external_data = pd.read_excel(file_path)
external_data = external_data.dropna()
columns_to_keep = ['AGE', 'BMI', 'tPSA', 'fPSA', 'f/t', 'p2PSA',
                   'PHI', 'Volume', 'PSAD', 'PHID', 'Blood Neutrophil',
                   'Blood Lymphocyte', 'NLR', 'PI-RADS', 'Urinary leukocytes']
external_X_test = external_data[columns_to_keep]
# external_X_test = external_data[featureImportance.index.values.tolist()]
external_Y_test = external_data['SPC']

# 假设外部验证集为 external_test
scaled_features_external = scaler.transform(external_X_test)

# 将归一化后的数据转换为 DataFrame，保持列名一致
external_X_test = pd.DataFrame(scaled_features_external, columns=external_X_test.columns)

external_X_test = external_X_test[featureImportance.index.values.tolist()]
# print(external_X_test)

# XGBoost模型
# 提取 XGBoost 模型
xgboost_model = models['XGBoost']
# 计算 XGBoost 模型的预测概率
y_proba_test_xgboost = xgboost_model.predict_proba(external_X_test)[:, 1]  # 正类概率

# 计算 XGBoost 模型的 ROC 曲线和 AUC
fpr_xgboost, tpr_xgboost, _ = roc_curve(external_Y_test, y_proba_test_xgboost)
roc_auc_xgboost = auc(fpr_xgboost, tpr_xgboost)

# 应用自定义阈值
y_pred_test_xgboost = (y_proba_test_xgboost >= custom_threshold).astype(int)
test_conf_matrices = []
# 计算 XGBoost 模型的混淆矩阵
cm_xgboost = confusion_matrix(external_Y_test, y_pred_test_xgboost, labels=np.unique(external_Y_test))
test_conf_matrices.append(cm_xgboost)

# 计算 XGBoost 的评估指标
TP = cm_xgboost[1, 1]
FN = cm_xgboost[1, 0]
TN = cm_xgboost[0, 0]
FP = cm_xgboost[0, 1]
PPV = TP / (TP + FP)
NPV = TN / (TN + FN)
sensitivity_xgboost = TP / (TP + FN)
specificity_xgboost = TN / (TN + FP)
accuracy_xgboost = accuracy_score(external_Y_test, y_pred_test_xgboost)
f1_xgboost = f1_score(external_Y_test, y_pred_test_xgboost, average='weighted')
# xg_auc_ci = compute_bootstrap_ci(roc_auc_score, external_Y_test, y_pred_test_xgboost, y_proba=y_proba_test_xgboost)
xg_auc_ci = auc_ci(external_Y_test, y_proba_test_xgboost)
xg_accuracy_ci = compute_bootstrap_ci(accuracy_score, external_Y_test, y_pred_test_xgboost)
xg_f1_ci = compute_bootstrap_ci(lambda y, y_hat: f1_score(y, y_hat, average='weighted'), external_Y_test, y_pred_test_xgboost)
xg_sensitivity_ci = compute_bootstrap_ci(lambda y, y_hat: np.mean(np.diag(confusion_matrix(y, y_hat)) / np.sum(confusion_matrix(y, y_hat), axis=1)),
                                    external_Y_test, y_pred_test_xgboost)
xg_specificity_ci = compute_bootstrap_ci(specificity_metric, external_Y_test, y_pred_test_xgboost)
xg_ppv_mean, xg_ppv_ci = compute_bootstrap_ci(ppv_metric, external_Y_test, y_pred_test_xgboost)
xg_npv_mean, xg_npv_ci = compute_bootstrap_ci(npv_metric, external_Y_test, y_pred_test_xgboost)
# 打印 XGBoost 的结果
model_name_xgboost = 'XGBoost'
print(f'{model_name_xgboost} (External Test Set):')
print(f'Accuracy = {accuracy_xgboost:.4f} 95%CI: {xg_accuracy_ci[1]}')
print(f'F1-score = {f1_xgboost:.4f} 95%CI: {xg_f1_ci[1]}')
print(f'AUC (micro) = {roc_auc_xgboost:.4f} 95%CI: {xg_auc_ci}')
print(f'Sensitivity = {sensitivity_xgboost:.4f} 95%CI: {xg_sensitivity_ci[1]}')
print(f'Specificity = {specificity_xgboost:.4f} 95%CI: {xg_specificity_ci[1]}')
print(f'NPV = {NPV:.4f} 95%CI: {xg_npv_ci}')
print(f'PPV = {PPV:.4f} 95%CI: {xg_ppv_ci}')
print()

model_names = []
test_accuracies = []
test_f1_scores = []
test_aucs_micro = []
test_sensitivities = []
test_specificities = []
# 存储结果
model_names.append(model_name_xgboost)
test_accuracies.append(accuracy_xgboost)
test_f1_scores.append(f1_xgboost)
test_aucs_micro.append(roc_auc_xgboost)
test_sensitivities.append(sensitivity_xgboost)
test_specificities.append(specificity_xgboost)

# Stacking 模型
# 计算 Stacking 模型的预测概率
y_proba_test_stacking = stacking_model.predict_proba(external_X_test)[:, 1]  # 正类概率

# 计算 Stacking 模型的 ROC 曲线和 AUC
fpr_stacking, tpr_stacking, _ = roc_curve(external_Y_test, y_proba_test_stacking)
roc_auc_stacking = auc(fpr_stacking, tpr_stacking)

# 应用自定义阈值
y_pred_test_stacking = (y_proba_test_stacking >= custom_threshold).astype(int)

# 计算 Stacking 模型的混淆矩阵
cm_stacking = confusion_matrix(external_Y_test, y_pred_test_stacking, labels=np.unique(external_Y_test))
test_conf_matrices.append(cm_stacking)

# 计算 Stacking 的评估指标
TP = cm_stacking[1, 1]
FN = cm_stacking[1, 0]
TN = cm_stacking[0, 0]
FP = cm_stacking[0, 1]
PPV = TP / (TP + FP)
NPV = TN / (TN + FN)
sensitivity_stacking = TP / (TP + FN)
specificity_stacking = TN / (TN + FP)
accuracy_stacking = accuracy_score(external_Y_test, y_pred_test_stacking)
f1_stacking = f1_score(external_Y_test, y_pred_test_stacking, average='weighted')
# stack_auc_ci = compute_bootstrap_ci(roc_auc_score, external_Y_test, y_pred_test_stacking, y_proba=y_proba_test_stacking)
stack_auc_ci =   auc_ci(external_Y_test, y_proba_test_stacking)
stack_accuracy_ci = compute_bootstrap_ci(accuracy_score, external_Y_test, y_pred_test_stacking)
stack_f1_ci = compute_bootstrap_ci(lambda y, y_hat: f1_score(y, y_hat, average='weighted'), external_Y_test, y_pred_test_stacking)

stack_sensitivity_ci = compute_bootstrap_ci(sensitivity_metric, external_Y_test, y_pred_test_stacking)

stack_specificity_ci = compute_bootstrap_ci(specificity_metric, external_Y_test, y_pred_test_stacking)
stack_ppv_mean, stack_ppv_ci = compute_bootstrap_ci(ppv_metric, external_Y_test, y_pred_test_stacking)
stack_npv_mean, stack_npv_ci = compute_bootstrap_ci(npv_metric, external_Y_test, y_pred_test_stacking)
# 打印 Stacking 的结果
model_name_stacking = 'Stacking'
print(f'{model_name_stacking} (External Test Set):')
print(f'Accuracy = {accuracy_stacking:.4f} 95%CI: {stack_accuracy_ci[1]}')
print(f'F1-score = {f1_stacking:.4f} 95%CI: {stack_f1_ci[1]}')
print(f'AUC (micro) = {roc_auc_stacking:.4f} 95%CI: {stack_auc_ci}')
print(f'Sensitivity = {sensitivity_stacking:.4f} 95%CI: {stack_sensitivity_ci[1]}')
print(f'Specificity = {specificity_stacking:.4f} 95%CI: {stack_specificity_ci[1]}')
print(f'NPV = {NPV:.4f} 95%CI: {stack_npv_ci}')
print(f'PPV = {PPV:.4f} 95%CI: {stack_ppv_ci}')
print()

# 存储结果
model_names.append(model_name_stacking)
test_accuracies.append(accuracy_stacking)
test_f1_scores.append(f1_stacking)
test_aucs_micro.append(roc_auc_stacking)
test_sensitivities.append(sensitivity_stacking)
test_specificities.append(specificity_stacking)

# Voting 模型
# Predict probabilities for the positive class on test data
y_proba_test_voting = voting_model.predict_proba(external_X_test)[:, 1]  # 只选择正类的概率

# 应用自定义阈值：根据阈值将概率转换为预测类别
y_pred_test_voting_custom_threshold = (y_proba_test_voting >= custom_threshold).astype(int)

# 计算 Voting 模型的 ROC 曲线和 AUC
fpr_voting, tpr_voting, _ = roc_curve(external_Y_test, y_proba_test_voting)
roc_auc_voting = auc(fpr_voting, tpr_voting)

# 绘制 Voting 模型的 ROC 曲线
# plt.plot(fpr_voting, tpr_voting, lw=2, label=f'Voting (AUC = {roc_auc_voting:.4f})')

# 计算 Voting 模型的测试集准确率和 F1-score
# y_pred_test_voting = voting_model.predict(X_test)
y_pred_test_voting = y_pred_test_voting_custom_threshold
accuracy_test_voting = accuracy_score(external_Y_test, y_pred_test_voting)
f1_test_voting = f1_score(external_Y_test, y_pred_test_voting, average='weighted')

# 计算 Voting 模型的混淆矩阵

cm_voting = confusion_matrix(external_Y_test, y_pred_test_voting, labels=np.unique(external_Y_test))
test_conf_matrices.append(cm_voting)


TP = cm_voting[1, 1]  # 真阳性
FN = cm_voting[1, 0]  # 假阴性
TN = cm_voting[0, 0]  # 真阴性
FP = cm_voting[0, 1]  # 假阳性
# 计算sensitivity和specificity
sensitivity_vote = TP / (TP + FN)
specificity_vote = TN / (TN + FP)
PPV = TP / (TP + FP)
NPV = TN / (TN + FN)
# vote_auc_ci = compute_bootstrap_ci(roc_auc_score, external_Y_test, y_pred_test_voting, y_proba=y_proba_test_voting)
vote_auc_ci = auc_ci(external_Y_test, y_proba_test_voting)
vote_accuracy_ci = compute_bootstrap_ci(accuracy_score, external_Y_test, y_pred_test_voting)
vote_f1_ci = compute_bootstrap_ci(lambda y, y_hat: f1_score(y, y_hat, average='weighted'), external_Y_test, y_pred_test_voting)
vote_sensitivity_ci = compute_bootstrap_ci(sensitivity_metric, external_Y_test, y_pred_test_voting)
vote_specificity_ci = compute_bootstrap_ci(specificity_metric, external_Y_test, y_pred_test_voting)
vote_ppv_mean, vote_ppv_ci = compute_bootstrap_ci(ppv_metric, external_Y_test, y_pred_test_voting)
vote_npv_mean, vote_npv_ci = compute_bootstrap_ci(npv_metric, external_Y_test, y_pred_test_voting)

# 打印并存储 Voting 模型的结果
model_name_voting = 'Voting'
print(f'{model_name_voting} (External Test Set):')
print(f'Accuracy = {accuracy_test_voting:.4f} 95%CI: {vote_accuracy_ci[1]}')
print(f'F1-score = {f1_test_voting:.4f} 95%CI: {vote_f1_ci[1]}')
print(f'AUC (micro) = {roc_auc_voting:.4f} 95%CI: {vote_auc_ci}')
print(f'Sensitivity = {sensitivity_vote:.4f} 95%CI: {vote_sensitivity_ci[1]}')
print(f'Specificity = {specificity_vote:.4f} 95%CI: {vote_specificity_ci[1]}')
print(f'NPV = {vote_npv_mean:.4f} 95%CI: {vote_npv_ci}')
print(f'PPV = {vote_ppv_mean:.4f} 95%CI: {vote_ppv_ci}')
print()

model_names.append(model_name_voting)
test_accuracies.append(accuracy_test_voting)
test_f1_scores.append(f1_test_voting)
test_aucs_micro.append(roc_auc_voting)
test_sensitivities.append(sensitivity)
test_specificities.append(specificity)

import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc

# Initialize figure for ROC curve
plt.figure(figsize=(8, 8))


# 绘制 Stacking 模型的 ROC 曲线
plt.plot(fpr_stacking, tpr_stacking, lw=2, label=f'Stacking (AUC = {roc_auc_stacking:.3f} 95%CI: {stack_auc_ci})')

# 绘制 Voting 模型的 ROC 曲线
fpr_micro_voting, tpr_micro_voting, _ = roc_curve(external_Y_test, y_proba_test_voting)  # Voting 模型的 fpr 和 tpr
roc_auc_micro_voting = auc(fpr_micro_voting, tpr_micro_voting)  # 计算 AUC
plt.plot(fpr_micro_voting, tpr_micro_voting, lw=2, label=f'Voting (AUC = {roc_auc_voting:.3f} 95%CI: {vote_auc_ci})')

# 绘制 XGBoost 模型的 ROC 曲线
plt.plot(fpr_xgboost, tpr_xgboost, lw=2, label=f'XGBoost (AUC = {roc_auc_xgboost:.3f} 95%CI: {xg_auc_ci})')



# 添加图形细节
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves for Stacking and Voting Models')
plt.legend(loc='lower right')

# 设置轴的范围
plt.xlim([0, 1])
plt.ylim([0, 1])

# 显示最终的图
plt.gca().set_aspect('equal', adjustable='box')
plt.show()

n_models = len(model_names)
n_cols = 3  # 每行展示 3 个混淆矩阵
n_rows = (n_models + n_cols - 1) // n_cols  # 计算需要的行数

plt.figure(figsize=(20, 4 * n_rows))

for i, (model_name, conf_matrix) in enumerate(zip(model_names, test_conf_matrices)):
    plt.subplot(n_rows, n_cols, i + 1)
    disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=np.unique(external_Y_test))
    disp.plot(cmap=plt.cm.Blues, ax=plt.gca(), colorbar=False)
    plt.title(f'Confusion Matrix for {model_name}')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')

plt.tight_layout()
plt.show()

print("Model Names: ", model_names)
print("Number of Models: ", len(model_names))
print("Confusion Matrices: ", test_conf_matrices)
print("Number of Confusion Matrices: ", len(test_conf_matrices))

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
print(len(external_Y_test))
def calculate_net_benefit(y_true, y_pred_prob, thresholds):
    net_benefit = []
    total_samples = len(y_true)
    
    for threshold in thresholds:
        y_pred = (y_pred_prob >= threshold).astype(int)
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        
        net_benefit.append(tp / total_samples - (fp / total_samples) * (threshold / (1 - threshold)))
    
    return net_benefit

thresholds = np.linspace(0, 1, 30)

selected_models = {
    # 'RandomForest': models['RandomForest'],
    # 'LogisticRegression': models['LogisticRegression'],
    'CatBoost': models['CatBoost']
}

# 存储每个模型的净收益
net_benefit_models = {}

# 为每个基础模型计算预测概率和净收益
for model_name, model in selected_models.items():
    y_pred_prob = model.predict_proba(external_X_test)[:, 1]  # 针对二分类，选取正类概率
    net_benefit = calculate_net_benefit(external_Y_test, y_pred_prob, thresholds)
    net_benefit_models[model_name] = net_benefit
    
# 预测概率
external_y_pred_prob_vote = voting_model.predict_proba(external_X_test)[:, 1]  # 针对二分类，选取正类概率
external_y_pred_prob_stack = stacking_model.predict_proba(external_X_test)[:, 1]  # 针对二分类，选取正类概率


# 计算净收益
net_benefit_vote = calculate_net_benefit(external_Y_test, external_y_pred_prob_vote, thresholds)
net_benefit_stack = calculate_net_benefit(external_Y_test, external_y_pred_prob_stack, thresholds)

# 计算全干预线的净收益
full_intervention_net_benefit = []
for threshold in thresholds:
    net_benefit = (35 / len(external_Y_test)) - (73 / len(external_Y_test)) * (threshold / (1 - threshold))
    full_intervention_net_benefit.append(net_benefit)
    


# 绘制 DCA 曲线
plt.figure(figsize=(8, 8))

plt.plot(thresholds, net_benefit_stack, label="StackingClassifier")
plt.plot(thresholds, net_benefit_vote, label="VotingClassifier")
for model_name, net_benefit in net_benefit_models.items():
    plt.plot(thresholds, net_benefit, label=f"{model_name}")
plt.plot(thresholds, full_intervention_net_benefit, label="ALL", color='grey', linestyle='--')
plt.axhline(0, color='grey', linestyle='--', label="None")
plt.ylim(-0.4, 0.4)
plt.xlabel("Threshold Probability")
plt.ylabel("Net Benefit")
plt.title("Decision Curve Analysis (DCA)")
plt.legend()
plt.show()


In [None]:
# 校准曲线
import matplotlib.pyplot as plt
from sklearn.calibration import CalibrationDisplay
from sklearn.metrics import brier_score_loss
import numpy as np
from sklearn.calibration import calibration_curve
# 创建图形对象
plt.figure(figsize=(8, 8))
bin_edges = [0,0.1, 0.4, 0.6, 0.9,1.0]  # 自定义区间 
bin_centers = [(bin_edges[i] + bin_edges[i+1]) / 2 for i in range(len(bin_edges) - 1)]  # 计算区间中心
import numpy as np
from scipy.interpolate import make_interp_spline
# 计算模型的校准曲线
def custom_calibration_curve(y_true, y_prob, bins):
    # 初始化存储校准曲线的数据
    fraction_of_positives = []
    mean_predicted_value = []
    
    # 将预测概率分入各个区间
    for i in range(len(bins) - 1):
        bin_start = bins[i]
        bin_end = bins[i + 1]
        
        # 在这个区间内找到所有概率
        bin_mask = (y_prob >= bin_start) & (y_prob < bin_end)
        bin_true = y_true[bin_mask]
        bin_prob = y_prob[bin_mask]
        
        if len(bin_true) > 0:  # 确保该区间不为空
            fraction_of_positives.append(np.mean(bin_true))  # 该区间的正类比例
            mean_predicted_value.append(np.mean(bin_prob))  # 该区间的平均预测概率
    
    return np.array(mean_predicted_value), np.array(fraction_of_positives)

# 绘制 stacking 模型的校准曲线
stack_y_prob = stacking_model.predict_proba(external_X_test)[:, 1]
stacking_model_brier = brier_score_loss(external_Y_test, stack_y_prob)
prob_true, prob_pred = custom_calibration_curve(external_Y_test, stack_y_prob, bin_edges)

# 使用插值方法对校准曲线进行平滑处理
spl = make_interp_spline(prob_true, prob_pred, k=2)  # k=3 是三次样条插值
smooth_x = np.linspace(prob_true.min(), prob_true.max(), 20)
smooth_y = spl(smooth_x)
stack_brier_ci = compute_bootstrap_ci(brier_score_loss, external_Y_test, stack_y_prob)
plt.plot(smooth_x, smooth_y, label=f'Stacking (Brier Score: {stacking_model_brier:.3f} 95%CI: {stack_brier_ci[1]})', linestyle='-', marker='')

# 绘制 voting 模型的校准曲线
vote_y_prob = voting_model.predict_proba(external_X_test)[:, 1]
voting_model_brier = brier_score_loss(external_Y_test, vote_y_prob)
prob_true, prob_pred = custom_calibration_curve(external_Y_test, vote_y_prob, bin_edges)
spl = make_interp_spline(prob_true, prob_pred, k=2)  # k=3 是三次样条插值
smooth_x = np.linspace(prob_true.min(), prob_true.max(), 20)
smooth_y = spl(smooth_x)
vote_brier_ci = compute_bootstrap_ci(brier_score_loss, external_Y_test, vote_y_prob)
plt.plot(smooth_x, smooth_y, label=f'Voting (Brier Score: {voting_model_brier:.3f} 95%CI: {vote_brier_ci[1]})', linestyle='-', marker='')


xgmodel = selected_models['CatBoost']
xgmodel_y_prob = model.predict_proba(external_X_test)[:, 1]  # 针对二分类，选取正类概率
xgmodel_brier = brier_score_loss(external_Y_test, xgmodel_y_prob)
prob_true, prob_pred = custom_calibration_curve(external_Y_test, xgmodel_y_prob, bin_edges)
spl = make_interp_spline(prob_true, prob_pred, k=2)  # k=3 是三次样条插值
smooth_x = np.linspace(prob_true.min(), prob_true.max(), 20)
smooth_y = spl(smooth_x)
xgmodel_brier_ci = compute_bootstrap_ci(brier_score_loss, external_Y_test, xgmodel_y_prob)
plt.plot(smooth_x, smooth_y, label=f'CatBoost (Brier Score: {xgmodel_brier:.3f} 95%CI: {xgmodel_brier_ci[1]})', linestyle='-', marker='')

# 绘制完美校准的对角线
plt.plot([0, 1], [0, 1], linestyle='--', color='gray', label='Perfectly Calibrated')

# 添加标题和标签
plt.title('Calibration Curves Sorted by Brier Score')
plt.xlabel('Mean Predicted Probability')
plt.ylabel('Fraction of Positives')
plt.legend(loc='best')

# 设置坐标轴比例
ax = plt.gca()
ax.set_aspect(1)

# 显示图像
plt.tight_layout()
plt.show()

In [None]:
# 外部验证集Waterfall plot
# 将训练集的预测概率和真实标签存储到DataFrame中
# train_results_df = pd.DataFrame({
#     'True_Label': Y_train,
#     'Predicted_Probability': y_train_prob_vote
# })
from matplotlib.colors import Normalize
from matplotlib import cm

file_path = './final_stats/retrospective.xlsx'  # 替换为您的文件路径

external_data = pd.read_excel(file_path)
external_data = external_data.dropna()
columns_to_keep = ['AGE', 'BMI', 'tPSA', 'fPSA', 'f/t', 'p2PSA',
                   'PHI', 'Volume', 'PSAD', 'PHID', 'Blood Neutrophil',
                   'Blood Lymphocyte', 'NLR', 'PI-RADS', 'Urinary leukocytes',
                   'Gleason Score']
external_X_test = external_data[columns_to_keep]

# 将测试集的预测概率和真实标签存储到DataFrame中
test_results_df = pd.DataFrame({
    'Gleason Score': external_X_test['Gleason Score'],
    'PSA': external_X_test['tPSA'],
    'PHID': external_X_test['PHID'],
    'PI-RADS': external_X_test['PI-RADS'],
    'Volume': external_X_test['Volume'],
    'labels': external_Y_test,
    'probabilities': external_y_pred_prob_stack,
})

# 将训练集和测试集的DataFrame合并
# results_df = pd.concat([train_results_df, test_results_df], ignore_index=True)

# 将合并后的DataFrame保存到CSV文件
test_results_df.to_excel('./final_stats/internal_stack_predictions.xlsx', index=False)



In [None]:
# 将预测概率作为一个新特征与 X_test 拼接
X_test_with_prob = X_test.copy()  # 保持原始特征不变
X_test_with_prob['Predicted_Probability'] = y_test_prob_vote  # 将预测概率加入新的列

# 查看拼接后的数据
print(X_test_with_prob.head())