# Model B: Liver Cirrhosis Analysis & Leakage Correction with Optimization
본 노트북은 기존 모델의 **Target Leakage(Stage-specific Winsorization)** 문제를 해결하고, 
신뢰할 수 있는 파이프라인(3-way Split, Optimized Pipeline)을 구축하여 최종 성능을 검증하는 과정을 담고 있습니다.

## 주요 목차 (Contents)
1.  **EDA (Exploratory Data Analysis)**: Feature 분포 및 Stage별 차이 시각화
2.  **Preprocessing & Leakage Fix**: Winsorization 문제점 시각화 및 대안(RobustScaler) 적용
3.  **Feature Engineering**: 의학적 비율 변수 생성
4.  **Modeling & Tuning**: 과적합 방지를 위한 Regularization & Optuna Tuning
5.  **Calibration & Evaluation**: 확률 보정 및 최종 성능 평가
6.  **Interpretation**: SHAP을 이용한 중요 변수 분석


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.preprocessing import LabelEncoder, RobustScaler, FunctionTransformer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import log_loss, accuracy_score, classification_report
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.isotonic import IsotonicRegression
from xgboost import XGBClassifier
import shap

# Visualization Settings
sns.set_theme(style="whitegrid")
plt.rcParams['font.family'] = 'Malgun Gothic' # For Windows Korean support
plt.rcParams['axes.unicode_minus'] = False
warnings.filterwarnings('ignore')

print("Libraries imported successfully.")

In [None]:
# Load Data
fpath = '../data/liver_cirrhosis_deduped.csv' # 경로 확인 필요
df = pd.read_csv(fpath)

# Filter Target (1, 2, 3) & Drop ID
if 'ID' in df.columns:
    df = df.drop('ID', axis=1)
    
df = df[df['Stage'].isin([1, 2, 3])].copy()
target_col = 'Stage'

print(f"Data Shape: {df.shape}")
print(df[target_col].value_counts().sort_index())
df.head()

## 1. EDA: Feature Distributions by Stage
주요 수치형 변수들이 Stage(1, 2, 3)에 따라 어떻게 분포가 달라지는지 확인합니다.


In [None]:
features_to_plot = ['Bilirubin', 'Albumin', 'Prothrombin', 'Copper', 'Age', 'Platelets']

fig, axes = plt.subplots(2, 3, figsize=(18, 10))
axes = axes.flatten()

for i, col in enumerate(features_to_plot):
    if col in df.columns:
        sns.boxplot(data=df, x='Stage', y=col, ax=axes[i], palette='viridis')
        axes[i].set_title(f'{col} by Stage')
        
plt.tight_layout()
plt.show()

### 해석
*   **Bilirubin**: Stage가 높아질수록 수치가 상승하는 경향이 뚜렷합니다.
*   **Albumin**: 간 기능 저하로 인해 Stage 3에서 낮아지는 경향이 보입니다.
*   **Platelets (혈소판)**: 간경변이 진행될수록 감소하는 추세입니다.


## 2. Leakage Diagnosis: Winsorization vs RobustScaler
**기존 오류**: Stage별로 이상치를 처리함 -> 정답 정보를 전처리에 사용 (Leakage).
**개선안**: 전체 데이터 기준으로 `RobustScaler` 사용 또는 Pipeline 내 처리.


In [None]:
# 시각화: Leakage가 있는 경우 vs 없는 경우의 분포 변화 비교 (가상 예시)

# 1. Leakage Case (Stage-specific)
df_leak = df.copy()
for stage in [1, 2, 3]:
    mask = df_leak['Stage'] == stage
    q_low = df_leak.loc[mask, 'Bilirubin'].quantile(0.05)
    q_high = df_leak.loc[mask, 'Bilirubin'].quantile(0.95)
    df_leak.loc[mask, 'Bilirubin'] = df_leak.loc[mask, 'Bilirubin'].clip(q_low, q_high)

# 2. Correct Case (Robust Scaler - Global)
df_correct = df.copy()
rs = RobustScaler()
df_correct['Bilirubin'] = rs.fit_transform(df_correct[['Bilirubin']])

fig, ax = plt.subplots(1, 2, figsize=(14, 5))

sns.kdeplot(data=df_leak, x='Bilirubin', hue='Stage', fill=True, palette='viridis', ax=ax[0])
ax[0].set_title("Problem (Leakage): Artificial Separation by Stage-specific Clip")

sns.kdeplot(data=df_correct, x='Bilirubin', hue='Stage', fill=True, palette='viridis', ax=ax[1])
ax[1].set_title("Correct Model (RobustScaler): Natural Overlap Preserved")

plt.show()

**왼쪽 그래프(Leakage)**를 보면 Stage별 분포가 인위적으로 모이거나 분리되는 경향이 생길 수 있습니다. 이는 모델이 쉽게 정답을 맞추게 도와줍니다.
**오른쪽 그래프(Correct)**는 실제 데이터의 중첩(Overlap)을 그대로 보여주며, 이것이 모델이 풀어야 할 **진짜 난이도**입니다.


## 3. Feature Engineering & Splitting
과적합을 막고 진짜 신호를 찾기 위해 의학적 비율 변수를 추가하고, **3-way Split (Train/Calib/Test)**을 수행합니다.


In [None]:
# 1. Variable Generation
df_eng = df.copy()
df_eng['Bili_Alb_Ratio'] = df_eng['Bilirubin'] / (df_eng['Albumin'] + 1e-6)
df_eng['SGOT_Platelets_Ratio'] = df_eng['SGOT'] / (df_eng['Platelets'] + 1e-6)
df_eng['Age_Decade'] = (df_eng['Age'] / 365.25 / 10).astype(int)

# 2. Encoding Target
le = LabelEncoder()
df_eng['target'] = le.fit_transform(df_eng['Stage']) # 0, 1, 2

X = df_eng.drop(['Stage', 'target'], axis=1)
y = df_eng['target']

# 3. 3-way Split
# Train(60%), Calib(20%), Test(20%)
sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.4, random_state=42)
train_idx, eval_idx = next(sss1.split(X, y))
X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
X_eval, y_eval = X.iloc[eval_idx], y.iloc[eval_idx]

sss2 = StratifiedShuffleSplit(n_splits=1, test_size=0.5, random_state=42)
calib_idx, test_idx = next(sss2.split(X_eval, y_eval))
X_calib, y_calib = X_eval.iloc[calib_idx], y.iloc[calib_idx]
X_test, y_test = X_eval.iloc[test_idx], y.iloc[test_idx]

print(f"Train: {X_train.shape}, Calib: {X_calib.shape}, Test: {X_test.shape}")

## 4. Optimized Model Training
Optuna를 통해 찾은(또는 사전 정의된) **최적 하이퍼파라미터**를 사용하여 과적합을 제어합니다.
*   **Key**: `max_depth` 제한, `reg_alpha`(L1) 강화


In [None]:
# Optimized Params (Example from Phase 2)
best_params = {
    'n_estimators': 300,
    'max_depth': 3,
    'learning_rate': 0.05,
    'min_child_weight': 10,
    'gamma': 1.0,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'reg_alpha': 5.0,  # Strong L1
    'reg_lambda': 5.0  # Strong L2
}

# Preprocessing Pipeline
num_cols = X_train.select_dtypes(include=[np.number]).columns
cat_cols = X_train.select_dtypes(exclude=[np.number]).columns

num_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', RobustScaler()) # Robust against outliers
])

cat_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

preprocessor = ColumnTransformer([
    ('num', num_transformer, num_cols),
    ('cat', cat_transformer, cat_cols)
])

# XGBoost Pipeline
clf = XGBClassifier(
    random_state=42, use_label_encoder=False, eval_metric='mlogloss',
    **best_params
)

pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('model', clf)
])

print("Training Optimized Pipeline...")
pipeline.fit(X_train, y_train)
print("Training Complete.")

## 5. Calibration & Evaluation
Calib Set을 사용하여 예측 확률을 보정(Isotonic Regression)하고, Test Set에서 최종 ECE(Calibration Error)를 확인합니다.


In [None]:
# Manual Calibration Class (Simplified)
class CalibrationWrapper(BaseEstimator, ClassifierMixin):
    def __init__(self, base_estimator):
        self.base_estimator = base_estimator
        self.calibrators = []
        self.classes_ = None
        
    def fit(self, X, y):
        # Assume base estimator is already fitted
        self.classes_ = self.base_estimator.classes_
        raw_probs = self.base_estimator.predict_proba(X)
        
        for i in range(len(self.classes_)):
            y_binary = (y == i).astype(int)
            iso = IsotonicRegression(out_of_bounds='clip')
            iso.fit(raw_probs[:, i], y_binary)
            self.calibrators.append(iso)
        return self
        
    def predict_proba(self, X):
        raw_probs = self.base_estimator.predict_proba(X)
        calibrated_probs = np.zeros_like(raw_probs)
        for i, cal in enumerate(self.calibrators):
            calibrated_probs[:, i] = cal.transform(raw_probs[:, i])
        
        # Normalize
        calibrated_probs /= calibrated_probs.sum(axis=1, keepdims=True)
        return calibrated_probs
    
    def predict(self, X):
        return np.argmax(self.predict_proba(X), axis=1)

# Apply Calibration
calib_model = CalibrationWrapper(pipeline)
calib_model.fit(X_calib, y_calib)

# Final Eval
y_pred = calib_model.predict(X_test)
y_prob = calib_model.predict_proba(X_test)

acc = accuracy_score(y_test, y_pred)
loss = log_loss(y_test, y_prob)

print(f"Final Test Accuracy: {acc:.4f}")
print(f"Final Test Log Loss: {loss:.4f}")
print(classification_report(y_test, y_pred, target_names=['Stage 1', 'Stage 2', 'Stage 3']))


## 6. Interpretation (SHAP)
모델이 어떤 변수를 중요하게 보는지 SHAP Value를 통해 분석합니다.


In [None]:
# SHAP Preparation
preprocessor = pipeline.named_steps['preprocessor']
model = pipeline.named_steps['model']

# Transform Test Data for SHAP
X_test_transformed = preprocessor.transform(X_test)

# Get Feature Names
if hasattr(preprocessor, 'get_feature_names_out'):
    feature_names = preprocessor.get_feature_names_out()
else:
    feature_names = [f'f{i}' for i in range(X_test_transformed.shape[1])]

# SHAP Explainer
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test_transformed)

# Summary Plot (Class 0 - Stage 1 기준 예시, 또는 전체)
print("SHAP Summary Plot")
shap.summary_plot(shap_values, X_test_transformed, feature_names=feature_names)