# 實驗 17：滑動窗口 5-Fold 交叉驗證模型比較

## 與實驗 13 的差異
1. **資料**：使用滑動窗口資料集（13,514 筆樣本 vs 6,056 筆）
2. **交叉驗證策略**：StratifiedGroupKFold（以 patient_id 分組）
3. **防止資料洩漏**：同一患者不會同時出現在訓練集與測試集

## 參考文獻
- Wang et al. (2024, PLoS ONE)：使用類似的滑動窗口方法

## 日期：2026-01-12
## 執行時間：約 8 分 26 秒（5-Fold CV 主訓練）

In [None]:
# 匯入套件
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

from sklearn.model_selection import StratifiedGroupKFold
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.metrics import (
    roc_auc_score, recall_score, precision_score, f1_score,
    balanced_accuracy_score, confusion_matrix, roc_curve
)
import xgboost as xgb
from scipy import stats

# 設定中文字型
plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False

print("套件載入完成")

In [None]:
# 載入滑動窗口資料
df = pd.read_csv('../../data/01_primary/SUA/processed/SUA_sliding_window.csv')
print(f"資料載入完成：{len(df):,} 筆樣本，來自 {df['patient_id'].nunique():,} 位患者")
print(f"每位患者平均樣本數：{len(df) / df['patient_id'].nunique():.2f}")
print(f"\n欄位：{list(df.columns)}")

In [None]:
# 檢視窗口起始點分佈
print("窗口起始點分佈：")
print(df['window_start'].value_counts().sort_index())

In [None]:
# 定義特徵與目標變數
feature_cols = [
    'sex', 'Age',
    # Y-2 時間點特徵
    'FBG_Tinput1', 'TC_Tinput1', 'Cr_Tinput1', 'UA_Tinput1', 'GFR_Tinput1', 'BMI_Tinput1', 'SBP_Tinput1', 'DBP_Tinput1',
    # Y-1 時間點特徵
    'FBG_Tinput2', 'TC_Tinput2', 'Cr_Tinput2', 'UA_Tinput2', 'GFR_Tinput2', 'BMI_Tinput2', 'SBP_Tinput2', 'DBP_Tinput2',
    # Δ 變化量特徵
    'Delta_FBG', 'Delta_TC', 'Delta_Cr', 'Delta_UA', 'Delta_GFR', 'Delta_BMI', 'Delta_SBP', 'Delta_DBP'
]

target_cols = {
    '高血壓': 'hypertension_target',
    '高血糖': 'hyperglycemia_target',
    '高血脂': 'dyslipidemia_target'
}

# 分組欄位（用於 GroupKFold）
groups = df['patient_id']

X = df[feature_cols].copy()

# 轉換目標變數：1=正常, 2=患病 → 0=正常, 1=患病
targets = {}
for name, col in target_cols.items():
    targets[name] = (df[col] == 2).astype(int)

print("=" * 60)
print("特徵資訊")
print("=" * 60)
print(f"特徵總數：{len(feature_cols)}")
print(f"  - 人口學特徵：2")
print(f"  - Y-2 時間點：8")
print(f"  - Y-1 時間點：8")
print(f"  - Δ 變化量：8")

print("\n" + "=" * 60)
print("類別不平衡狀況（滑動窗口資料）")
print("=" * 60)
for name, y in targets.items():
    pos_rate = y.mean() * 100
    neg_count = (y == 0).sum()
    pos_count = (y == 1).sum()
    ratio = neg_count / pos_count if pos_count > 0 else float('inf')
    print(f"{name}：盛行率 {pos_rate:.2f}%（負:正 = {ratio:.1f}:1）")

In [None]:
# 定義模型（共 8 種）
def get_models(random_state=42):
    return {
        'LR': LogisticRegression(
            class_weight='balanced',
            max_iter=1000,
            random_state=random_state
        ),
        'NB': GaussianNB(),
        'LDA': LinearDiscriminantAnalysis(),
        'DT': DecisionTreeClassifier(
            max_depth=10,
            min_samples_split=20,
            min_samples_leaf=10,
            class_weight='balanced',
            random_state=random_state
        ),
        'RF': RandomForestClassifier(
            n_estimators=100,
            max_depth=15,
            min_samples_split=10,
            class_weight='balanced',
            random_state=random_state,
            n_jobs=-1
        ),
        'XGB': xgb.XGBClassifier(
            n_estimators=100,
            max_depth=5,
            learning_rate=0.1,
            scale_pos_weight=5,
            random_state=random_state,
            eval_metric='logloss',
            verbosity=0
        ),
        'SVM': SVC(
            kernel='rbf',
            class_weight='balanced',
            probability=True,
            random_state=random_state
        ),
        'MLP': MLPClassifier(
            hidden_layer_sizes=(64, 32, 16),
            activation='relu',
            solver='adam',
            learning_rate_init=0.001,
            max_iter=500,
            random_state=random_state,
            early_stopping=True,
            validation_fraction=0.1
        )
    }

print("已定義模型：")
for name in get_models().keys():
    print(f"  - {name}")

In [None]:
def evaluate_model_cv_grouped(X, y, groups, model, n_splits=5, random_state=42):
    """
    使用 StratifiedGroupKFold 評估模型。
    
    與實驗 13 的關鍵差異：
    - 使用 groups（patient_id）確保無資料洩漏
    - 同一患者不會同時出現在訓練集與測試集
    """
    cv = StratifiedGroupKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    
    fold_metrics = {
        'auc': [], 'sensitivity': [], 'specificity': [],
        'f1': [], 'balanced_acc': [], 'y_true': [], 'y_prob': []
    }
    
    for fold, (train_idx, test_idx) in enumerate(cv.split(X, y, groups)):
        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
        
        # 驗證無資料洩漏
        train_patients = set(groups.iloc[train_idx])
        test_patients = set(groups.iloc[test_idx])
        assert len(train_patients & test_patients) == 0, f"Fold {fold} 存在資料洩漏！"
        
        # 標準化
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        # 複製模型
        from sklearn.base import clone
        model_clone = clone(model)
        
        # 調整 XGBoost 的 scale_pos_weight
        if hasattr(model_clone, 'scale_pos_weight'):
            scale = (y_train == 0).sum() / (y_train == 1).sum()
            model_clone.set_params(scale_pos_weight=scale)
        
        # 訓練
        model_clone.fit(X_train_scaled, y_train)
        
        # 預測
        y_prob = model_clone.predict_proba(X_test_scaled)[:, 1]
        y_pred = model_clone.predict(X_test_scaled)
        
        # 計算指標
        fold_metrics['auc'].append(roc_auc_score(y_test, y_prob))
        
        tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
        fold_metrics['sensitivity'].append(tp / (tp + fn) if (tp + fn) > 0 else 0)
        fold_metrics['specificity'].append(tn / (tn + fp) if (tn + fp) > 0 else 0)
        fold_metrics['f1'].append(f1_score(y_test, y_pred, zero_division=0))
        fold_metrics['balanced_acc'].append(balanced_accuracy_score(y_test, y_pred))
        
        fold_metrics['y_true'].extend(y_test.tolist())
        fold_metrics['y_prob'].extend(y_prob.tolist())
    
    # 彙整結果
    result = {}
    for metric in ['auc', 'sensitivity', 'specificity', 'f1', 'balanced_acc']:
        values = fold_metrics[metric]
        result[f'{metric}_mean'] = np.mean(values)
        result[f'{metric}_std'] = np.std(values)
        result[f'{metric}_folds'] = values
        
        ci = stats.t.interval(0.95, len(values)-1, loc=np.mean(values), scale=stats.sem(values))
        result[f'{metric}_ci_lower'] = ci[0]
        result[f'{metric}_ci_upper'] = ci[1]
    
    result['y_true_all'] = fold_metrics['y_true']
    result['y_prob_all'] = fold_metrics['y_prob']
    
    return result

print("評估函式定義完成（含 GroupKFold）")

In [None]:
# 執行 5-Fold 交叉驗證（StratifiedGroupKFold）
print("=" * 80)
print("執行 5-Fold 交叉驗證（StratifiedGroupKFold）")
print("（防止資料洩漏：同一患者不會同時出現在訓練集與測試集）")
print("=" * 80)

all_results = []
detailed_results = {}

for disease_name, y in targets.items():
    print(f"\n{'=' * 60}")
    print(f"{disease_name}")
    print(f"{'=' * 60}")
    
    detailed_results[disease_name] = {}
    
    models = get_models()
    for model_name, model in models.items():
        print(f"  訓練 {model_name}...", end=" ")
        
        result = evaluate_model_cv_grouped(X, y, groups, model)
        detailed_results[disease_name][model_name] = result
        
        all_results.append({
            '疾病': disease_name,
            '模型': model_name,
            'AUC': result['auc_mean'],
            'AUC_std': result['auc_std'],
            'AUC_CI': f"({result['auc_ci_lower']:.3f}, {result['auc_ci_upper']:.3f})",
            'Sensitivity': result['sensitivity_mean'],
            'Specificity': result['specificity_mean'],
            'F1': result['f1_mean'],
            'Balanced_Acc': result['balanced_acc_mean']
        })
        
        print(f"AUC = {result['auc_mean']:.3f} ± {result['auc_std']:.3f}")

results_df = pd.DataFrame(all_results)
print("\n5-Fold 交叉驗證（GroupKFold）執行完成！")

In [None]:
# 顯示結果
print("=" * 100)
print("5-Fold 交叉驗證結果（滑動窗口 + GroupKFold）")
print("=" * 100)

for disease in targets.keys():
    print(f"\n--- {disease} ---")
    disease_df = results_df[results_df['疾病'] == disease].copy()
    disease_df = disease_df.sort_values('AUC', ascending=False)
    
    display_df = disease_df[['模型', 'AUC', 'AUC_std', 'AUC_CI', 
                             'Sensitivity', 'Specificity', 'F1', 'Balanced_Acc']].copy()
    display_df = display_df.round(3)
    print(display_df.to_string(index=False))

In [None]:
# 與固定窗口（實驗 13）結果比較
print("=" * 80)
print("比較：滑動窗口 vs 固定窗口")
print("=" * 80)

# 實驗 13 的原始結果（硬編碼供比較）
original_results = {
    '高血壓': {'LR': 0.754, 'RF': 0.791, 'XGB': 0.789},
    '高血糖': {'LR': 0.932, 'RF': 0.930, 'XGB': 0.918},
    '高血脂': {'LR': 0.867, 'RF': 0.857, 'XGB': 0.857}
}

print("\n| 疾病 | 模型 | 固定窗口 | 滑動窗口 | 差異 |")
print("|------|------|---------|---------|------|")

for disease in targets.keys():
    for model in ['LR', 'RF', 'XGB']:
        orig = original_results[disease].get(model, '-')
        new_row = results_df[(results_df['疾病'] == disease) & (results_df['模型'] == model)]
        if len(new_row) > 0:
            new = new_row['AUC'].values[0]
            diff = new - orig if isinstance(orig, float) else '-'
            diff_str = f"{diff:+.3f}" if isinstance(diff, float) else diff
            print(f"| {disease:4} | {model:5} | {orig:.3f} | {new:.3f} | {diff_str} |")

In [None]:
# 儲存結果
results_df.to_csv('../../results/sliding_window_5fold_cv.csv', index=False)
print("已儲存：results/sliding_window_5fold_cv.csv")

# 摘要
print("\n" + "=" * 80)
print("實驗摘要")
print("=" * 80)
print(f"""
資料集：
  - 樣本數：{len(df):,}（固定窗口為 6,056）
  - 患者數：{df['patient_id'].nunique():,}
  - 每位患者平均樣本數：{len(df) / df['patient_id'].nunique():.2f}

交叉驗證策略：
  - StratifiedGroupKFold（n_splits=5）
  - 分組依據：patient_id
  - 無資料洩漏：已驗證

重點發現：
  - 高血壓：RF 最佳（AUC 0.743），整體較固定窗口下降約 0.03-0.05
  - 高血糖：LR 最佳（AUC 0.938），整體與固定窗口相近或略升
  - 高血脂：LR 最佳（AUC 0.867），整體與固定窗口一致
  - NB/LDA 與 LR 表現接近，但缺少 class_weight 導致 Sensitivity 偏低
""")

In [None]:
# 視覺化：AUC 比較圖
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

diseases = list(targets.keys())
model_names = list(get_models().keys())
colors = plt.cm.Set2(np.linspace(0, 1, len(model_names)))

for idx, disease in enumerate(diseases):
    ax = axes[idx]
    disease_data = results_df[results_df['疾病'] == disease].sort_values('AUC', ascending=True)
    
    y_pos = np.arange(len(disease_data))
    bars = ax.barh(y_pos, disease_data['AUC'], xerr=disease_data['AUC_std'],
                   color=[colors[model_names.index(m)] for m in disease_data['模型']],
                   capsize=3, alpha=0.8)
    
    ax.set_yticks(y_pos)
    ax.set_yticklabels(disease_data['模型'])
    ax.set_xlabel('AUC', fontsize=12)
    ax.set_title(f'{disease}\n（滑動窗口）', fontsize=14)
    ax.set_xlim(0.4, 1.0)
    ax.axvline(x=0.7, color='gray', linestyle='--', alpha=0.5)
    ax.axvline(x=0.8, color='green', linestyle='--', alpha=0.5)
    
    for i, (v, s) in enumerate(zip(disease_data['AUC'], disease_data['AUC_std'])):
        ax.text(v + s + 0.01, i, f'{v:.3f}', va='center', fontsize=9)

plt.tight_layout()
plt.savefig('../../results/sliding_window_auc_comparison.png', dpi=150, bbox_inches='tight')
plt.show()
print("\n已儲存：results/sliding_window_auc_comparison.png")