In [None]:
import pandas as pd
import os

# data_dir = 'Synthetic_total/'
data_dir = 'Please enter the dataset root path here'

excel_file = 'Please enter the dataset file here'
train = pd.read_csv(os.path.join(data_dir,excel_file))
train.rename(columns={'ID': 'patient_id', 'REAL_STONE':'target'}, inplace=True)
columns = ['HR', 'BT', 'AGE', 'DUCT_DILIATATION_10MM', 'Hb','PLT','WBC','ALP', 'ALT', 'AST', 'TOTAL_BILIRUBIN',  'target']
train = train[columns]

print(train['target'].value_counts())
print(train.isna().sum())  # 컬럼별 NaN 개수 확인
print(len(train))  # 전체 데이터셋에 NaN이 하나라도 있는지 확인
print(len(train.dropna()))  # 전체 데이터셋에 NaN이 하나라도 있는지 확인

In [None]:
import pandas as pd
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split

data_dir = 'Please enter the dataset root path here'
excel_file = 'Please enter the dataset file path here'

data = pd.read_csv(os.path.join(data_dir, excel_file))

train_df, test_data = train_test_split(data, test_size=0.3, stratify=data['target'], random_state=123, shuffle=True)
val_df, test = train_test_split(test_data, test_size=0.4, stratify=test_data['target'], random_state=123,shuffle=True)
test_origin = test.copy()

train_df.drop(columns=['patient_id'],inplace = True)
val_df.drop(columns=['patient_id'],inplace = True)
test.drop(columns=['patient_id'],inplace = True)

train = pd.concat([train, train_df, val_df],axis=0)
train.reset_index(inplace=True, drop=True)
test.reset_index(inplace=True, drop=True)
print(len(train))
print(len(test))

In [None]:
from pycaret.classification import *

exp_clf = setup(data = train, target = 'target', session_id=11, n_jobs=5)
best = compare_models(sort='F1', fold=10, n_select=5)
tuned_model = [tune_model(i, choose_better=True) for i in best]
blended_model = blend_models(estimator_list = tuned_model)
stack_model = stack_models(estimator_list = tuned_model, fold = 5)
final_model = finalize_model(stack_model)

evaluate_model(final_model)

In [None]:
prediction = predict_model(tuned_model[0], data = test)

df = prediction

categories = ['ERCP', 'Stone']

for category in categories:
    column = 'prediction_label'
    
    if category == 'ERCP':
        yes_count = (df[column] == 1).sum()
        total_count = len(df)
    else:  # Stone
        yes_count = (df[df[column] == 1]['target'] == 1).sum()
        total_count = (df[df[column] == 1]['target'] == 1).sum() + (df[df[column] == 1]['target'] == 0).sum()

    yes_percent = round(yes_count / total_count * 100, 2)
    no_count = total_count - yes_count
    no_percent = round(100 - yes_percent, 2)
    ercp_no_stone_yes = ((df[column] == 0) & (df['target'] == 1)).sum()
    total_ercp_no = (df[column] == 0).sum()
    percent = round(ercp_no_stone_yes / total_ercp_no * 100, 2)

    print(f'{category} Yes: {yes_count} ({yes_percent}%)')
    print(f'{category} No: {no_count} ({no_percent}%)')
    print(f'ERCP No, REAL_STONE Yes: {ercp_no_stone_yes} ({percent}%)')

In [None]:
# import os
# from pytz import timezone
# from datetime import datetime

# log_dir = 'logs/'
# mode = 'test'
# modality = 'tabular'
# seoul_timezone = timezone('Asia/Seoul')
# today_seoul = datetime.now(seoul_timezone)

# directory_name = today_seoul.strftime("%Y-%m-%d-%H-%M")

# log_dir = log_dir+directory_name + '-' + mode + '-' + modality

# if os.path.exists(log_dir):
#     pass
# else:
#     os.makedirs(log_dir)
    
    
# save_model(final_model, os.path.join(log_dir,'ensemble'))

In [None]:
from sklearn.metrics import roc_auc_score, accuracy_score, confusion_matrix, auc, roc_curve
import numpy as np

def cal(df):
    # AUROC
    auroc = roc_auc_score(df["target"], df["prediction_prob"])

    # Accuracy
    accuracy = accuracy_score(df["target"], df["prediction_label"])

    # Confusion Matrix
    tn, fp, fn, tp = confusion_matrix(df["target"], df["prediction_label"]).ravel()

    # Sensitivity (Recall)
    sensitivity = tp / (tp + fn)

    # Specificity
    specificity = tn / (tn + fp)

    # PPV (Positive Predictive Value, Precision)
    ppv = tp / (tp + fp) if (tp + fp) > 0 else 0

    # NPV (Negative Predictive Value)
    npv = tn / (tn + fn) if (tn + fn) > 0 else 0

   
    # 결과 출력
    print(f"AUROC: {auroc:.2f}")
    print(f"Accuracy: {accuracy:.2f}")
    print(f"Sensitivity, Recall: {sensitivity:.2f}")
    print(f"Specificity: {specificity:.2f}")
    print(f"Precision, PPV: {ppv:.2f}")
    print(f"NPV: {npv:.2f}")
    
prediction = predict_model(final_model, data = test)

# target 1에 대한 확률로 바꾸기
prediction['prediction_prob'] = np.where(prediction['prediction_label'] == 1, prediction['prediction_score'], 1 - prediction['prediction_score'])    
cal(prediction)

# 결과를 CSV 파일로 저장
prediction.to_csv('DUMC_analysis/ExtraTreesClassifier.csv', index=False)
# prediction.to_csv('DUMC_analysis/CatBoostClassifier_external.csv', index=False)

In [None]:
prediction = predict_model(tuned_model[0], data = test)

interpret_model(tuned_model[0], save = True)
plot_model(tuned_model[0], plot = 'feature')
# interpret_model(tuned_model[0], plot = 'correlation')
# interpret_model(tuned_model[0], plot = 'msa')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, roc_auc_score, precision_score, accuracy_score, confusion_matrix
from numpy import *

# Seaborn 스타일 설정
sns.set(style="whitegrid")

def compute_youden_index_cutoff(y_true, y_prob):
    """
    Calculate the optimal cutoff using Youden Index (J).
    """
    fpr, tpr, thresholds = roc_curve(y_true, y_prob)
    J = tpr - fpr
    # ix = argmax(J)
    ix = np.argsort(J)[::-1][4]
    
    best_threshold = thresholds[ix]
    sensitivity = tpr[ix]
    specificity = 1 - fpr[ix]
    y_pred = (y_prob >= best_threshold).astype(int)
    precision = precision_score(y_true, y_pred)
    
    return best_threshold, sensitivity, specificity, precision, fpr, tpr, thresholds, ix, y_pred

def bootstrap_roc(y_true, y_prob, n_bootstraps=1000, random_seed=42):
    """
    Calculate the ROC confidence intervals using bootstrapping.
    """
    rng = np.random.default_rng(random_seed)
    bootstrapped_tprs = []
    base_fpr = np.linspace(0, 1, 100)
    
    for _ in range(n_bootstraps):
        indices = rng.choice(len(y_true), size=len(y_true), replace=True)
        y_true_boot = y_true[indices]
        y_prob_boot = y_prob[indices]
        fpr, tpr, _ = roc_curve(y_true_boot, y_prob_boot)
        tpr_interp = np.interp(base_fpr, fpr, tpr)
        tpr_interp[0] = 0.0
        bootstrapped_tprs.append(tpr_interp)

    bootstrapped_tprs = np.array(bootstrapped_tprs)
    mean_tpr = bootstrapped_tprs.mean(axis=0)
    std_tpr = bootstrapped_tprs.std(axis=0)
    tpr_lower = np.maximum(mean_tpr - 1.96 * std_tpr, 0)
    tpr_upper = np.minimum(mean_tpr + 1.96 * std_tpr, 1)
    
    return base_fpr, mean_tpr, tpr_lower, tpr_upper

# Assuming y_true and y_prob are your data inputs
y_true = prediction['target'].values
y_prob = prediction['prediction_prob'].values

# Calculate the optimal cutoff using Youden Index
best_threshold, sensitivity, specificity, precision, fpr, tpr, thresholds, optimal_idx, y_pred = compute_youden_index_cutoff(y_true, y_prob)

# Calculate AUC
roc_auc = roc_auc_score(y_true, y_prob)

# Bootstrap for 95% CI
base_fpr, mean_tpr, tpr_lower, tpr_upper = bootstrap_roc(y_true, y_prob)

# Calculate additional metrics
accuracy = accuracy_score(y_true, y_pred)

# Confusion matrix for NPV calculation
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

# Specific metrics
sensitivity = tp / (tp + fn)  # Recall (True Positive Rate)
specificity = tn / (tn + fp)  # True Negative Rate
precision = tp / (tp + fp)    # Positive Predictive Value
npv = tn / (tn + fn)          # Negative Predictive Value

# Plot the ROC curve
plt.figure(figsize=(8, 6))

# Plot the mean ROC curve
sns.lineplot(x=base_fpr, y=mean_tpr, color="skyblue", label="Mean ROC Curve (AUC = %.2f)" % roc_auc)

# Plot the 95% confidence interval
plt.fill_between(base_fpr, tpr_lower, tpr_upper, color="blue", alpha=0.2, label="95% Confidence Interval")

# Diagonal line
plt.plot([0, 1], [0, 1], linestyle='--', color='gray')

# Highlight the optimal cutoff point
plt.scatter(fpr[optimal_idx], tpr[optimal_idx], marker='+', s=100, color='red', 
            label='Optimal Cutoff = %.3f\nSensitivity = %.3f\nSpecificity = %.3f' % 
                  (best_threshold, sensitivity, specificity))

# Axis labels
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve with 95% Confidence Interval')

# Add legend
plt.legend(loc='lower right')

# Show the plot
plt.show()

# Print metrics for the optimal cutoff
print(f"Optimal Cutoff Using Youden Index = {best_threshold:.3f}")
print(f"AUROC: {roc_auc:.3f}")
print(f"Accuracy: {accuracy:.3f}")
print(f"Sensitivity (Recall): {sensitivity:.3f}")
print(f"Specificity: {specificity:.3f}")
print(f"Precision (PPV): {precision:.3f}")
print(f"Negative Predictive Value (NPV): {npv:.3f}")


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.metrics import roc_curve, roc_auc_score, precision_recall_curve, average_precision_score
from sklearn.calibration import calibration_curve
plt.rcParams["font.family"] = "DejaVu Sans"

def decision_curve_analysis(y_true, y_prob, thresholds=None):
    if thresholds is None:
        thresholds = np.arange(0.1, 0.7, 0.1)

    net_benefit_model = []
    net_benefit_all = []
    net_benefit_none = 0  # Net benefit of treat-none is always 0

    for threshold in thresholds:
        treat_model = (y_prob >= threshold).astype(int)
        tp_model = np.sum((y_true == 1) & (treat_model == 1))
        fp_model = np.sum((y_true == 0) & (treat_model == 1))
        prob_tp = tp_model / len(y_true)
        prob_fp = fp_model / len(y_true)
        net_benefit = prob_tp - (prob_fp * threshold / (1 - threshold))

        if net_benefit >= 0:
            net_benefit_model.append(net_benefit)
            net_benefit_all.append((np.sum(y_true == 1) / len(y_true)) - (np.sum(y_true == 0) / len(y_true)) * (threshold / (1 - threshold)))
        else:
            net_benefit_model.append(None)
            net_benefit_all.append(None)

    return {
        'thresholds': thresholds,
        'net_benefit_model': net_benefit_model,
        'net_benefit_all': net_benefit_all,
        'net_benefit_none': [net_benefit_none] * len(thresholds)
    }

def plot_roc_curve(ax, models_data):
    for model_data in models_data:
        fpr, tpr, _ = roc_curve(model_data['y_true'], model_data['y_prob'])
        roc_auc = roc_auc_score(model_data['y_true'], model_data['y_prob'])
        ax.plot(fpr, tpr, label=f'{model_data["label"]} (AUC = {roc_auc:.2f})', color=model_data['color'])
    ax.plot([0, 1], [0, 1], linestyle='--', color='gray')  # Diagonal line (random model)
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    ax.set_title('ROC Curve')
    ax.legend(loc='lower right')

def plot_dca(ax, models_data):
    for model_data in models_data:
        dca_results = decision_curve_analysis(model_data['y_true'], model_data['y_prob'])
        filtered_thresholds = [t for t, nb in zip(dca_results['thresholds'], dca_results['net_benefit_model']) if nb is not None]
        filtered_net_benefit_model = [nb for nb in dca_results['net_benefit_model'] if nb is not None]
        filtered_net_benefit_all = [nb for nb in dca_results['net_benefit_all'] if nb is not None]
        
        ax.plot(filtered_thresholds, filtered_net_benefit_model, label=f'{model_data["label"]}', color=model_data['color'])
    ax.plot(filtered_thresholds, filtered_net_benefit_all, linestyle='--', color='green', label='Treat All')
    ax.axhline(0, color='red', linestyle='--', label='Treat None')
    ax.set_xlabel('Threshold Probability')
    ax.set_ylabel('Net Benefit')
    ax.set_title('Decision Curve Analysis (DCA)')
    ax.legend(loc='lower right')

def plot_pr_curve(ax, models_data):
    for model_data in models_data:
        precision, recall, _ = precision_recall_curve(model_data['y_true'], model_data['y_prob'])
        ap = average_precision_score(model_data['y_true'], model_data['y_prob'])
        ax.plot(recall, precision, label=f'{model_data["label"]} (AP = {ap:.2f})', color=model_data['color'])
    ax.set_xlabel('Recall')
    ax.set_ylabel('Precision')
    ax.set_title('Precision-Recall (PR) Curve')
    ax.legend(loc='lower left')

def plot_all(csv_list):
    fig, axs = plt.subplots(1, 3, figsize=(20, 6))

    models_data = []
    for idx, csv_file in enumerate(csv_list):
        data = pd.read_csv(csv_file)
        y_true = data['target']
        data['prediction_prob'] = np.where(data['prediction_label'] == 1, data['prediction_score'], 1 - data['prediction_score'])    
        y_prob = data['prediction_prob']
        experiment_name = csv_file.split("/")[-1].split(".")[0]
        models_data.append({'y_true': y_true, 'y_prob': y_prob, 'label': experiment_name, 'color': plt.cm.viridis(idx / len(csv_list))})

    # Plot ROC, DCA, and PR Curve
    plot_roc_curve(axs[0], models_data)
    plot_dca(axs[1], models_data)
    plot_pr_curve(axs[2], models_data)

    plt.tight_layout()
    plt.savefig("model_performance_comparison.png", dpi=400)
    plt.show()

# 사용 예시
csv_list = [
    # 'DUMC_analysis/ExtraTreesClassifier.csv',
    # 'DUMC_analysis/XGBClassifier.csv',
    # 'DUMC_analysis/RandomForestClassifier.csv',
    # 'DUMC_analysis/CatBoostClassifier.csv',
    # 'DUMC_analysis/LGBMClassifier.csv',
    
    # 'DUMC_analysis/ExtraTreesClassifier_external.csv',
    # 'DUMC_analysis/CatBoostClassifier_external.csv',  
    # 'DUMC_analysis/XGBClassifier_external.csv',
    # 'DUMC_analysis/RandomForestClassifier_external.csv',
    # 'DUMC_analysis/LGBMClassifier_external.csv',
]

plot_all(csv_list)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.calibration import calibration_curve
from sklearn.metrics import mean_squared_error
import pandas as pd 

# Seaborn 스타일 적용
sns.set(style="whitegrid")

def plot_calibration_curve(ax, models_data):
    for model_data in models_data:
        # 보정 곡선 데이터 생성
        fraction_of_positives, mean_predicted_value = calibration_curve(
            model_data['y_true'], model_data['y_prob'], n_bins=10
        )

        
        # 보정 곡선 그리기
        # ax.scatter(mean_predicted_value, fraction_of_positives, 
        #         color=model_data['color'], marker='o', label=f'{model_data["label"]}')
        ax.scatter(mean_predicted_value, fraction_of_positives, 
                color=model_data['color'], marker='o')
    
    # 이상적인 보정선
    ax.plot([0, 1], [0, 1], linestyle='--', color='black', lw=1.5, label='Ideal Calibration')
    
    # 그래프 레이블 및 제목 설정
    ax.set_xlabel('Mean Predicted Probability')
    ax.set_ylabel('Fraction of Positives')
    ax.set_title('Calibration Curve')
    ax.legend(loc='best', fontsize=10)

def plot_all(csv_list):
    # 플롯 설정
    fig, axs = plt.subplots(1, 1, figsize=(8, 6))  # 단일 보정 곡선 그래프
    models_data = []
    
    # 데이터 처리 및 모델 데이터 준비
    for idx, csv_file in enumerate(csv_list):
        data = pd.read_csv(csv_file)
        y_true = data['target']
        
        # target이 1일 확률로 변환
        data['prediction_prob'] = np.where(data['prediction_label'] == 1, 
                                           data['prediction_score'], 
                                           1 - data['prediction_score'])    
        y_prob = data['prediction_prob']
        experiment_name = csv_file.split("/")[-1].split(".")[0]
        
        # 모델 정보 저장
        models_data.append({
            'y_true': y_true, 
            'y_prob': y_prob, 
            # 'label': experiment_name, 
            'color': sns.color_palette("Set2", len(csv_list))[idx]
        })
    
    # 보정 곡선 플롯
    plot_calibration_curve(axs, models_data)
    
    # 레이아웃 및 출력
    plt.tight_layout()
    plt.show()

# Example usage with multiple CSV files
csv_list = [
    # 'DUMC_analysis/Our_model.csv',
    # 'DUMC_analysis/Our_model_external.csv',
]

plot_all(csv_list)
