In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, RandomizedSearchCV, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, label_binarize
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import VarianceThreshold, RFE
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, roc_auc_score, roc_curve, precision_recall_curve, auc
from sklearn.ensemble import StackingClassifier, RandomForestClassifier
from sklearn.svm import SVC
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from imblearn.over_sampling import SMOTE
from boruta import BorutaPy
import random
import matplotlib.pyplot as plt
import warnings
from scipy import stats

warnings.filterwarnings("ignore")

# Improved fitness function combining accuracy and AUC-ROC

class XGBFitness:
    def __init__(self, X, y, cv=10, **xgb_params):
        self.X = X
        self.y = y
        self.cv = cv
        self.xgb_params = xgb_params

    def __call__(self, binary_mask):
        if np.count_nonzero(binary_mask) == 0:
            return 1.0
        X_selected = self.X[:, binary_mask == 1]
        clf = XGBClassifier(
            eval_metric='logloss',
            verbosity=0,
            use_label_encoder=False,
            **self.xgb_params
        )
        accuracy_scores = cross_val_score(clf, X_selected, self.y, cv=self.cv, scoring='accuracy', n_jobs=-1)
        auc_scores = cross_val_score(clf, X_selected, self.y, cv=self.cv, scoring='roc_auc', n_jobs=-1)
        return 1 - (0.7 * np.mean(accuracy_scores) + 0.3 * np.mean(auc_scores))


# Binary Grey Wolf Optimizer with Differential Evolution (RBGWO-DE)

class RBGWO_DE:
    def __init__(self, obj_func, trans_func, problem_size=50, domain_range=(-1, 1),
                 log=True, epoch=100, pop_size=10, lsa_epoch=20):
        self.obj_func = obj_func
        self.trans_func = trans_func
        self.problem_size = problem_size
        self.domain_range = domain_range
        self.log = log
        self.epoch = epoch
        self.pop_size = pop_size
        self.lsa_epoch = lsa_epoch
        self.solution = None
        self.best_fitness = float("inf")

    def _binary_conversion(self, pos):
        transfer_val = abs(np.tanh(pos))  # v-shape transfer
        return np.where(np.random.rand(*pos.shape) < transfer_val, 1, 0)

    def _de_local_search(self, position):
        best_pos = position.copy()
        best_fit = self.obj_func(self._binary_conversion(best_pos))
        for _ in range(self.lsa_epoch):
            scale = np.random.uniform(0.2, 0.8)
            mutant = best_pos + scale * (np.random.uniform(-1, 1, self.problem_size))
            mutant = np.clip(mutant, self.domain_range[0], self.domain_range[1])
            bin_mutant = self._binary_conversion(mutant)
            fit = self.obj_func(bin_mutant)
            if fit < best_fit:
                best_pos = mutant
                best_fit = fit
        return best_pos, best_fit

    def train(self):
        print("Starting RBGWO-DE optimization...")
        pop = np.random.uniform(self.domain_range[0], self.domain_range[1],
                                (self.pop_size, self.problem_size))
        bin_pop = np.array([self._binary_conversion(ind) for ind in pop])
        fitness = np.array([self.obj_func(ind) for ind in bin_pop])
        alpha, beta, delta = [np.zeros(self.problem_size)] * 3
        alpha_score, beta_score, delta_score = [float("inf")] * 3

        for epoch in range(self.epoch):
            a = 2 - epoch * (2 / self.epoch)
            for i in range(self.pop_size):
                if fitness[i] < alpha_score:
                    delta, delta_score = beta, beta_score
                    beta, beta_score = alpha, alpha_score
                    alpha, alpha_score = pop[i].copy(), fitness[i]
                elif fitness[i] < beta_score:
                    delta, delta_score = beta, beta_score
                    beta, beta_score = pop[i].copy(), fitness[i]
                elif fitness[i] < delta_score:
                    delta, delta_score = pop[i].copy(), fitness[i]

            for i in range(self.pop_size):
                r1, r2 = random.random(), random.random()
                A1, C1 = 2 * a * r1 - a, 2 * r2
                D_alpha = abs(C1 * alpha - pop[i])
                X1 = alpha - A1 * D_alpha
                r1, r2 = random.random(), random.random()
                A2, C2 = 2 * a * r1 - a, 2 * r2
                D_beta = abs(C2 * beta - pop[i])
                X2 = beta - A2 * D_beta
                r1, r2 = random.random(), random.random()
                A3, C3 = 2 * a * r1 - a, 2 * r2
                D_delta = abs(C3 * delta - pop[i])
                X3 = delta - A3 * D_delta
                new_pos = (X1 + X2 + X3) / 3.0
                new_pos = np.clip(new_pos, self.domain_range[0], self.domain_range[1])
                new_pos, new_fit = self._de_local_search(new_pos)
                pop[i] = new_pos
                fitness[i] = new_fit

            best_idx = np.argmin(fitness)
            self.solution = self._binary_conversion(pop[best_idx])
            self.best_fitness = fitness[best_idx]
            if self.log:
                print(f"Epoch {epoch+1}/{self.epoch}, Best Fitness: {self.best_fitness}")

        print("RBGWO-DE optimization finished.")
        return self.solution, self.best_fitness


# Main processing

print("1. Loading dataset...")
try:
    data = pd.read_csv("/kaggle/input/alzheimers-disease-dataset/alzheimers_disease_data.csv", encoding='utf-8', sep=',')
except Exception as e:
    print(f"Error reading CSV file: {e}")
    raise

print("Dataset columns:", data.columns.tolist())

print("2. Dropping unnecessary columns...")
for col_to_drop in ['DoctorInCharge', 'PatientID']:
    if col_to_drop in data.columns:
        print(f"Dropping '{col_to_drop}'")
        data = data.drop(columns=[col_to_drop])

print("3. Checking class distribution...")
print(data['Diagnosis'].value_counts())

# Define numerical and categorical columns
numerical_cols = [
    'Age', 'BMI', 'AlcoholConsumption', 'PhysicalActivity', 'DietQuality',
    'SleepQuality', 'SystolicBP', 'DiastolicBP', 'CholesterolTotal',
    'CholesterolLDL', 'CholesterolHDL', 'CholesterolTriglycerides',
    'MMSE', 'FunctionalAssessment', 'ADL'
]
categorical_cols = [
    'Gender', 'Ethnicity', 'EducationLevel', 'Smoking', 'FamilyHistoryAlzheimers',
    'CardiovascularDisease', 'Diabetes', 'Depression', 'HeadInjury', 'Hypertension',
    'MemoryComplaints', 'BehavioralProblems', 'Confusion', 'Disorientation',
    'PersonalityChanges', 'DifficultyCompletingTasks', 'Forgetfulness'
]

# Verify columns
missing_numerical = [col for col in numerical_cols if col not in data.columns]
missing_categorical = [col for col in categorical_cols if col not in data.columns]
if missing_numerical or missing_categorical:
    print(f"Missing numerical columns: {missing_numerical}")
    print(f"Missing categorical columns: {missing_categorical}")
    raise SystemExit("Dataset doesn't contain expected columns.")

print("4. Handling missing values...")
num_imputer = SimpleImputer(strategy='mean')
cat_imputer = SimpleImputer(strategy='most_frequent')
data[numerical_cols] = num_imputer.fit_transform(data[numerical_cols])
data[categorical_cols] = cat_imputer.fit_transform(data[categorical_cols])

# Check for outliers
print("5. Checking for outliers...")
z_scores = stats.zscore(data[numerical_cols])
data = data[(z_scores < 3.5).all(axis=1)]
print(f"Dataset size after outlier removal: {data.shape}")

print("6. Separating features and target...")
X = data.drop(columns=['Diagnosis'])
y = data['Diagnosis']

print("7. Preprocessing pipeline...")
preprocessor = ColumnTransformer(
    transformers=[
        ('num', Pipeline([
            ('imputer', SimpleImputer(strategy='mean')),
            ('scaler', StandardScaler()),
            ('variance', VarianceThreshold(threshold=0.01))
        ]), numerical_cols),
        ('cat', OneHotEncoder(drop='first', handle_unknown='ignore'), categorical_cols)
    ])
X_processed = preprocessor.fit_transform(X)

print("8. Splitting data into train and test sets...")
X_train, X_test, y_train, y_test = train_test_split(X_processed, y, test_size=0.2, random_state=42, stratify=y)

print("9. Balancing training data with SMOTE...")
smote = SMOTE(random_state=42)
X_train_balanced, y_train_balanced = smote.fit_resample(X_train, y_train)

print("10. BorutaPy feature selection...")
rf_boruta = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
boruta_selector = BorutaPy(rf_boruta, n_estimators='auto', perc=70, verbose=2, random_state=42, max_iter=30)
boruta_selector.fit(X_train_balanced, y_train_balanced)

boruta_mask = boruta_selector.support_ | boruta_selector.support_weak_
X_boruta = X_train_balanced[:, boruta_mask]
X_test_boruta = X_test[:, boruta_mask]
selected_features = np.array(preprocessor.get_feature_names_out())[boruta_mask]
print(f"Boruta selected {X_boruta.shape[1]} features (including tentative):")
print(selected_features)

print("11. Defining fitness function for XGBoost...")
xgb_params = {
    'n_estimators': 100,
    'max_depth': 5,
    'learning_rate': 0.1,
    'reg_lambda': 1.0,
    'gamma': 0.1
}
fitness_function = XGBFitness(X_boruta, y_train_balanced, cv=10, **xgb_params)  # cv reduced for speed

print("12. Initializing and running RBGWO-DE optimizer...")
optimizer = RBGWO_DE(
    obj_func=fitness_function,
    trans_func=lambda x: abs(np.tanh(x)),
    problem_size=X_boruta.shape[1],
    domain_range=(-10, 10),
    epoch=5,        # keep small for testing; increase for production
    pop_size=20,
    lsa_epoch=10,
    log=True
)
best_solution, best_fitness = optimizer.train()

print("13. Applying RFE on selected features...")
X_train_selected = X_boruta[:, best_solution == 1]
n_features_rfe = min(10, X_train_selected.shape[1])
if n_features_rfe < 1:
    raise SystemExit("No features selected by optimizer/Boruta/RFE pipeline.")
xgb_base = XGBClassifier(**xgb_params)
rfe = RFE(estimator=xgb_base, n_features_to_select=n_features_rfe)
X_train_rfe = rfe.fit_transform(X_train_selected, y_train_balanced)
rfe_mask = rfe.support_
X_test_rfe = X_test_boruta[:, best_solution == 1][:, rfe_mask]
final_features = selected_features[best_solution == 1][rfe_mask]
print(f"RFE selected {X_train_rfe.shape[1]} features:")
print(final_features)


# 14. Hyperparameter tuning for XGBoost

xgb_param_dist = {
    'n_estimators': [50, 100, 200, 300, 500],
    'max_depth': [3, 5, 7, 9, 12],
    'learning_rate': [0.01, 0.05, 0.1, 0.3, 0.5],
    'reg_lambda': [0.01, 0.1, 1.0, 10.0, 100.0],
    'gamma': [0, 0.1, 0.5, 1.0],
    'subsample': [0.5, 0.6, 0.8, 1.0],
    'colsample_bytree': [0.5, 0.6, 0.8, 1.0]
}
xgb = XGBClassifier(eval_metric='logloss', verbosity=0, use_label_encoder=False, random_state=42)
xgb_search = RandomizedSearchCV(xgb, xgb_param_dist, n_iter=50, cv=10, scoring='accuracy', n_jobs=-1, random_state=42)
xgb_search.fit(X_train_rfe, y_train_balanced)
best_xgb = xgb_search.best_estimator_
print("Best XGB params:", xgb_search.best_params_)


# 15. Hyperparameter tuning for RandomForest

rf_param_dist = {
    'n_estimators': [50, 100, 200, 300],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}
rf = RandomForestClassifier(random_state=42)
rf_search = RandomizedSearchCV(rf, rf_param_dist, n_iter=50, cv=10, scoring='accuracy', n_jobs=-1, random_state=42)
rf_search.fit(X_train_rfe, y_train_balanced)
best_rf = rf_search.best_estimator_
print("Best RF params:", rf_search.best_params_)


# 16. Hyperparameter tuning for SVM

svm_param_dist = {
    'C': [0.1, 1, 10, 100],
    'kernel': ['rbf', 'linear'],
    'gamma': ['scale', 'auto', 0.01, 0.1, 1.0]
}
svm = SVC(probability=True, random_state=42)
svm_search = RandomizedSearchCV(svm, svm_param_dist, n_iter=50, cv=10, scoring='accuracy', n_jobs=-1, random_state=42)
svm_search.fit(X_train_rfe, y_train_balanced)
best_svm = svm_search.best_estimator_
print("Best SVM params:", svm_search.best_params_)


# 17. Hyperparameter tuning for LightGBM

lgbm_param_dist = {
    'n_estimators': [50, 100, 200],
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.05, 0.1],
    'reg_lambda': [0.1, 1.0, 10.0],
    'subsample': [0.6, 0.8, 1.0],
    'colsample_bytree': [0.6, 0.8, 1.0]
}
lgbm = LGBMClassifier(random_state=42)
lgbm_search = RandomizedSearchCV(lgbm, lgbm_param_dist, n_iter=50, cv=10, scoring='accuracy', n_jobs=-1, random_state=42)
lgbm_search.fit(X_train_rfe, y_train_balanced)
best_lgbm = lgbm_search.best_estimator_
print("Best LGBM params:", lgbm_search.best_params_)


# 18. Training StackingClassifier ensemble

stacking_clf = StackingClassifier(
    estimators=[
        ('xgb', best_xgb),
        ('rf', best_rf),
        ('svm', best_svm),
        ('lgbm', best_lgbm)
    ],
    final_estimator=XGBClassifier(eval_metric='logloss', use_label_encoder=False, random_state=42),
    cv=10,
    n_jobs=-1,
    passthrough=False
)
stacking_clf.fit(X_train_rfe, y_train_balanced)


# 19. Evaluating ensemble model

# Train set evaluation
y_train_pred = stacking_clf.predict(X_train_rfe)
train_accuracy = accuracy_score(y_train_balanced, y_train_pred)
print("Train Accuracy:", train_accuracy)

# Test set evaluation
y_test_pred = stacking_clf.predict(X_test_re := X_test_rfe)  # assign for clarity
test_accuracy = accuracy_score(y_test, y_test_pred)
print("Test Accuracy:", test_accuracy)
print("Classification Report:\n", classification_report(y_test, y_test_pred))
print("Confusion Matrix:\n", confusion_matrix(y_test, y_test_pred))


# AUC-ROC and Precision-Recall (binary & multiclass safe)

print("Calculating AUC / ROC / Precision-Recall...")
# predict_proba (StackingClassifier uses final estimator; base estimators must provide predict_proba)
if not hasattr(stacking_clf, "predict_proba"):
    raise RuntimeError("stacking_clf has no predict_proba; ensure base estimators and final estimator support predict_proba.")

y_pred_proba = stacking_clf.predict_proba(X_test_rfe)
print("Shapes -> y_test:", y_test.shape, ", y_pred_proba:", y_pred_proba.shape)

classes = stacking_clf.classes_
n_classes = y_pred_proba.shape[1]

if n_classes == 2:
    # binary case: make y_true binary vector for the positive class (classes[1])
    positive_class = classes[1]
    y_true_bin = (y_test == positive_class).astype(int)
    auc_roc = roc_auc_score(y_true_bin, y_pred_proba[:, 1])
    fpr, tpr, _ = roc_curve(y_true_bin, y_pred_proba[:, 1])
    precision_vals, recall_vals, _ = precision_recall_curve(y_true_bin, y_pred_proba[:, 1])

    # ROC
    plt.figure()
    plt.plot(fpr, tpr, label=f'ROC curve (AUC = {auc_roc:.4f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve (binary)')
    plt.legend(loc="lower right")
    plt.savefig('roc_curve.png')
    plt.close()

    # Precision-Recall
    plt.figure()
    plt.plot(recall_vals, precision_vals, label='Precision-Recall curve')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve (binary)')
    plt.legend(loc="lower left")
    plt.savefig('precision_recall_curve.png')
    plt.close()

    print(f"Binary AUC-ROC: {auc_roc:.4f}")

else:
    # multiclass case: binarize y_test using same class order as stacking_clf.classes_
    y_test_bin = label_binarize(y_test, classes=classes)
    auc_roc_macro = roc_auc_score(y_test_bin, y_pred_proba, multi_class='ovr', average='macro')
    auc_roc_weighted = roc_auc_score(y_test_bin, y_pred_proba, multi_class='ovr', average='weighted')
    print(f"Multi-class AUC-ROC (macro OVR): {auc_roc_macro:.4f}")
    print(f"Multi-class AUC-ROC (weighted OVR): {auc_roc_weighted:.4f}")

    # Per-class ROC
    fpr = dict(); tpr = dict(); roc_auc_dict = dict()
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_pred_proba[:, i])
        roc_auc_dict[i] = auc(fpr[i], tpr[i])

    # micro-average
    fpr["micro"], tpr["micro"], _ = roc_curve(y_test_bin.ravel(), y_pred_proba.ravel())
    roc_auc_dict["micro"] = auc(fpr["micro"], tpr["micro"])

    # Plot ROC
    plt.figure()
    plt.plot(fpr["micro"], tpr["micro"], label=f'micro-average ROC (AUC = {roc_auc_dict["micro"]:.4f})', linewidth=2)
    for i in range(n_classes):
        plt.plot(fpr[i], tpr[i], label=f'Class {classes[i]} (AUC = {roc_auc_dict[i]:.4f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve (multi-class)')
    plt.legend(loc="lower right")
    plt.savefig('roc_curve.png')
    plt.close()

    # Precision-Recall per class + AUC(PR)
    precision = dict(); recall = dict(); pr_auc = dict()
    plt.figure()
    for i in range(n_classes):
        precision[i], recall[i], _ = precision_recall_curve(y_test_bin[:, i], y_pred_proba[:, i])
        pr_auc[i] = auc(recall[i], precision[i])
        plt.plot(recall[i], precision[i], label=f'Class {classes[i]} (AUC = {pr_auc[i]:.4f})')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve (multi-class)')
    plt.legend(loc="lower left")
    plt.savefig('precision_recall_curve.png')
    plt.close()

print("All steps completed successfully.")

# Additional evaluation: confusion matrices, train/val curves (XGB/LGBM), stacking final eval

import itertools
from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay, PrecisionRecallDisplay
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import os

if not os.path.isdir("eval_plots"):
    os.makedirs("eval_plots")

def plot_confusion(y_true, y_pred, labels, title, filename):
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    fig, ax = plt.subplots(figsize=(6,6))
    disp.plot(ax=ax, cmap=plt.cm.Blues, values_format='d')
    ax.set_title(title)
    plt.tight_layout()
    fig.savefig(filename)
    plt.close(fig)

def plot_train_val_curves_from_xgb(clf, X_tr, y_tr, X_val, y_val, label_prefix, filename_prefix):
    # clf should be an XGBClassifier with n_estimators set
    eval_set = [(X_tr, y_tr), (X_val, y_val)]
    # clone params to ensure verbosity off, etc.
    params = clf.get_xgb_params()
    # train a fresh xgb to capture evals_result (use same params)
    import xgboost as xgb
    dtrain = xgb.DMatrix(X_tr, label=y_tr)
    dval = xgb.DMatrix(X_val, label=y_val)
    num_boost_round = clf.get_params().get("n_estimators", 100)
    xgb_params = params.copy()
    # ensure objective appropriate for multiclass/binary
    if len(np.unique(y_tr)) > 2:
        xgb_params.setdefault("objective", "multi:softprob")
        xgb_params["num_class"] = len(np.unique(y_tr))
        eval_metric = "mlogloss"
    else:
        xgb_params.setdefault("objective", "binary:logistic")
        eval_metric = "logloss"
    xgb_params["verbosity"] = 0
    # use early stopping to find best_iteration
    bst = xgb.train(xgb_params, dtrain, num_boost_round=num_boost_round,
                    evals=[(dtrain, "train"), (dval, "validation")],
                    early_stopping_rounds=20, evals_result={}, verbose_eval=False)
    results = bst.eval_history if hasattr(bst, "eval_history") else bst.attributes()
    # safer: get evals_result via bst.eval_set if present
    ev = bst.eval_result() if hasattr(bst, "eval_result") else None

    evals_result = bst.eval_result() if hasattr(bst, "eval_result") else getattr(bst, "evals_result", None)
    # xgboost python API versions differ; robustly try to get evals_result
    try:
        evals_result = bst.eval_result()
    except Exception:
        try:
            evals_result = bst.evals_result()
        except Exception:
            evals_result = getattr(bst, "evals_result", None)

    if evals_result is None:
        # fallback: use sklearn wrapper with learning_monitor
        print(f"Could not extract evals_result for {label_prefix}; skipping curve plot.")
        return None

    # determine metric name (first key in evals_result['train'])
    train_metrics = evals_result.get("train", {})
    val_metrics = evals_result.get("validation", {})
    metric_name = list(train_metrics.keys())[0]
    train_vals = train_metrics[metric_name]
    val_vals = val_metrics[metric_name]

    epochs = list(range(1, len(train_vals)+1))

    # Plot metric (logloss or mlogloss)
    fig, ax = plt.subplots()
    ax.plot(epochs, train_vals, label=f"train {metric_name}")
    ax.plot(epochs, val_vals, label=f"val {metric_name}")
    ax.axvline(bst.best_iteration+1 if hasattr(bst, "best_iteration") else None, color='grey', linestyle='--',
               label=f"best_iteration={getattr(bst, 'best_iteration', 'NA')}")
    ax.set_xlabel("Boosting round")
    ax.set_ylabel(metric_name)
    ax.set_title(f"{label_prefix} {metric_name} per boosting round")
    ax.legend()
    plt.tight_layout()
    fig.savefig(f"{filename_prefix}_{metric_name}.png")
    plt.close(fig)
    return getattr(bst, "best_iteration", None)

def plot_train_val_curves_from_lgbm(clf, X_tr, y_tr, X_val, y_val, label_prefix, filename_prefix):
    # fit a fresh lgb model with early stopping to capture evals_result
    import lightgbm as lgb
    params = clf.get_params().copy()
    num_boost_round = params.get("n_estimators", 100)
    lgb_train = lgb.Dataset(X_tr, label=y_tr)
    lgb_val = lgb.Dataset(X_val, label=y_val, reference=lgb_train)
    # adapt objective
    if len(np.unique(y_tr)) > 2:
        params.setdefault("objective", "multiclass")
        params["num_class"] = len(np.unique(y_tr))
        metric = "multi_logloss"
    else:
        params.setdefault("objective", "binary")
        metric = "binary_logloss"
    params["verbose"] = -1
    evals_result = {}
    bst = lgb.train(params, lgb_train, num_boost_round=num_boost_round,
                    valid_sets=[lgb_train, lgb_val],
                    valid_names=["train","validation"],
                    early_stopping_rounds=20,
                    evals_result=evals_result,
                    verbose_eval=False)
    # extract
    metric_name = list(evals_result["train"].keys())[0]
    train_vals = evals_result["train"][metric_name]
    val_vals = evals_result["validation"][metric_name]
    epochs = list(range(1, len(train_vals)+1))

    fig, ax = plt.subplots()
    ax.plot(epochs, train_vals, label=f"train {metric_name}")
    ax.plot(epochs, val_vals, label=f"val {metric_name}")
    ax.axvline(bst.best_iteration, color='grey', linestyle='--', label=f"best_iteration={bst.best_iteration}")
    ax.set_xlabel("Boosting round")
    ax.set_ylabel(metric_name)
    ax.set_title(f"{label_prefix} {metric_name} per boosting round")
    ax.legend()
    plt.tight_layout()
    fig.savefig(f"{filename_prefix}_{metric_name}.png")
    plt.close(fig)
    return bst.best_iteration

# 1) Confusion matrices for stacking (train & test)
print("Plotting confusion matrices for stacking classifier (train & test)...")
labels = stacking_clf.classes_
plot_confusion(y_train_balanced, y_train_pred, labels=labels, title="Confusion Matrix - Train (Stacking)",
               filename="eval_plots/confusion_train_stacking.png")
plot_confusion(y_test, y_test_pred, labels=labels, title="Confusion Matrix - Test (Stacking)",
               filename="eval_plots/confusion_test_stacking.png")
print("Saved confusion matrices to eval_plots/")

# 2) Retrain best_xgb and best_lgbm (on X_train_rfe) with a small validation split to record curves
print("Training XGBoost and LightGBM again with validation split to capture training curves and best_iteration...")
X_tr_sub, X_val_sub, y_tr_sub, y_val_sub = train_test_split(X_train_rfe, y_train_balanced, test_size=0.2, random_state=42, stratify=y_train_balanced)

best_iter_xgb = None
try:
    if isinstance(best_xgb, XGBClassifier):
        print("Retraining best_xgb to extract evals_result...")
        best_iter_xgb = plot_train_val_curves_from_xgb(best_xgb, X_tr_sub, y_tr_sub, X_val_sub, y_val_sub,
                                                       label_prefix="XGBoost (best_xgb)",
                                                       filename_prefix="eval_plots/xgb")
        print("XGBoost best_iteration:", best_iter_xgb)
except Exception as e:
    print("XGBoost curve plotting failed:", e)

best_iter_lgbm = None
try:
    import lightgbm as lgb
    if isinstance(best_lgbm, LGBMClassifier):
        print("Retraining best_lgbm to extract evals_result...")
        best_iter_lgbm = plot_train_val_curves_from_lgbm(best_lgbm, X_tr_sub, y_tr_sub, X_val_sub, y_val_sub,
                                                         label_prefix="LightGBM (best_lgbm)",
                                                         filename_prefix="eval_plots/lgbm")
        print("LightGBM best_iteration:", best_iter_lgbm)
except Exception as e:
    print("LightGBM curve plotting failed:", e)

# 3) If final estimator of stacking is XGB, retrain final estimator to inspect its curves as well
try:
    final_est = stacking_clf.final_estimator_
    if isinstance(final_est, XGBClassifier):
        print("Retraining stacking final estimator (XGB) on same X_train_rfe to capture its curves...")
        # For stacking final, train on whole X_train_rfe (could also use sub-split)
        fi_best_iter = plot_train_val_curves_from_xgb(final_est, X_tr_sub, y_tr_sub, X_val_sub, y_val_sub,
                                                      label_prefix="Stacking final XGB",
                                                      filename_prefix="eval_plots/stacking_final_xgb")
        print("Stacking final XGB best_iteration:", fi_best_iter)
except Exception as e:
    print("Stacking final estimator retrain failed or is not XGB:", e)

# 4) ROC and PR plots already computed earlier, but let's create nice displays for stacking predictions
print("Plotting ROC and Precision-Recall for stacking predictions...")
y_pred_proba = stacking_clf.predict_proba(X_test_rfe)
n_classes = y_pred_proba.shape[1]

if n_classes == 2:
    # binary
    positive_class = stacking_clf.classes_[1]
    y_true_bin = (y_test == positive_class).astype(int)
    RocCurveDisplay.from_predictions(y_true_bin, y_pred_proba[:,1])
    plt.title("ROC - Stacking (binary)")
    plt.savefig("eval_plots/roc_stacking_binary.png")
    plt.close()

    PrecisionRecallDisplay.from_predictions(y_true_bin, y_pred_proba[:,1])
    plt.title("Precision-Recall - Stacking (binary)")
    plt.savefig("eval_plots/pr_stacking_binary.png")
    plt.close()
else:
    # multiclass: plot per-class ROC and PR
    y_test_bin = label_binarize(y_test, classes=stacking_clf.classes_)
    # ROC per class
    fig = plt.figure(figsize=(8,6))
    for i, cls in enumerate(stacking_clf.classes_):
        RocCurveDisplay.from_predictions(y_test_bin[:, i], y_pred_proba[:, i], name=f"Class {cls}")
    plt.title("ROC per class - Stacking (multiclass)")
    plt.legend()
    plt.savefig("eval_plots/roc_stacking_multiclass.png")
    plt.close()

    # PR per class
    fig = plt.figure(figsize=(8,6))
    for i, cls in enumerate(stacking_clf.classes_):
        PrecisionRecallDisplay.from_predictions(y_test_bin[:, i], y_pred_proba[:, i], name=f"Class {cls}")
    plt.title("Precision-Recall per class - Stacking (multiclass)")
    plt.legend()
    plt.savefig("eval_plots/pr_stacking_multiclass.png")
    plt.close()

print("Saved ROC/PR plots to eval_plots/")

# 5) Print and save classification reports
from sklearn.metrics import classification_report
train_report = classification_report(y_train_balanced, y_train_pred, digits=4)
test_report = classification_report(y_test, y_test_pred, digits=4)
print("Train Classification Report:\n", train_report)
print("Test Classification Report:\n", test_report)
with open("eval_plots/classification_reports.txt", "w", encoding="utf-8") as f:
    f.write("Train Classification Report\n")
    f.write(train_report + "\n\n")
    f.write("Test Classification Report\n")
    f.write(test_report + "\n\n")

print("All additional evaluation artifacts saved in ./eval_plots/")


In [None]:

# Show & save plots (confusion, ROC/PR, train/val curves, boxplots)

import os, sys
import matplotlib.pyplot as plt
from IPython import get_ipython
from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay, PrecisionRecallDisplay

# helper: show only if running inside an interactive env (not blocking scripts)
def maybe_show():
    try:
        if get_ipython() is not None:
            plt.show()
        else:
            # likely script / headless: skip showing to avoid blocking
            pass
    except Exception:
        pass

output_dir = "eval_plots"
os.makedirs(output_dir, exist_ok=True)

# 1) Confusion matrices (train & test) — save + show
labels = stacking_clf.classes_

# train confusion
cm_train_fig, ax = plt.subplots(figsize=(6,6))
disp_train = ConfusionMatrixDisplay.from_predictions(y_train_balanced, y_train_pred, display_labels=labels, ax=ax, cmap=plt.cm.Blues)
ax.set_title("Confusion Matrix - Train (Stacking)")
plt.tight_layout()
train_cm_path = os.path.join(output_dir, "confusion_train_stacking.png")
cm_train_fig.savefig(train_cm_path)
maybe_show()
plt.close(cm_train_fig)

# test confusion
cm_test_fig, ax = plt.subplots(figsize=(6,6))
disp_test = ConfusionMatrixDisplay.from_predictions(y_test, y_test_pred, display_labels=labels, ax=ax, cmap=plt.cm.Blues)
ax.set_title("Confusion Matrix - Test (Stacking)")
plt.tight_layout()
test_cm_path = os.path.join(output_dir, "confusion_test_stacking.png")
cm_test_fig.savefig(test_cm_path)
maybe_show()
plt.close(cm_test_fig)

print(f"Saved confusion matrices to {output_dir}/")

# 2) ROC & Precision-Recall (save + show)
y_pred_proba = stacking_clf.predict_proba(X_test_rfe)
n_classes = y_pred_proba.shape[1]

if n_classes == 2:
    positive_class = stacking_clf.classes_[1]
    y_true_bin = (y_test == positive_class).astype(int)

    roc_fig = plt.figure()
    RocCurveDisplay.from_predictions(y_true_bin, y_pred_proba[:,1])
    plt.title("ROC - Stacking (binary)")
    roc_path = os.path.join(output_dir, "roc_stacking_binary.png")
    plt.tight_layout()
    plt.savefig(roc_path)
    maybe_show()
    plt.close()

    pr_fig = plt.figure()
    PrecisionRecallDisplay.from_predictions(y_true_bin, y_pred_proba[:,1])
    plt.title("Precision-Recall - Stacking (binary)")
    pr_path = os.path.join(output_dir, "pr_stacking_binary.png")
    plt.tight_layout()
    plt.savefig(pr_path)
    maybe_show()
    plt.close()
else:
    y_test_bin = label_binarize(y_test, classes=stacking_clf.classes_)

    # ROC per class
    roc_fig = plt.figure(figsize=(8,6))
    for i, cls in enumerate(stacking_clf.classes_):
        RocCurveDisplay.from_predictions(y_test_bin[:, i], y_pred_proba[:, i], name=f"Class {cls}")
    plt.title("ROC per class - Stacking (multiclass)")
    plt.legend()
    roc_path = os.path.join(output_dir, "roc_stacking_multiclass.png")
    plt.tight_layout()
    plt.savefig(roc_path)
    maybe_show()
    plt.close()

    # PR per class
    pr_fig = plt.figure(figsize=(8,6))
    for i, cls in enumerate(stacking_clf.classes_):
        PrecisionRecallDisplay.from_predictions(y_test_bin[:, i], y_pred_proba[:, i], name=f"Class {cls}")
    plt.title("Precision-Recall per class - Stacking (multiclass)")
    plt.legend()
    pr_path = os.path.join(output_dir, "pr_stacking_multiclass.png")
    plt.tight_layout()
    plt.savefig(pr_path)
    maybe_show()
    plt.close()

print(f"Saved ROC/PR plots to {output_dir}/")

# 3) Train/validation curves for XGBoost
# If the earlier retrain functions returned best iterations saved as variables, we can re-run them here.
# In case those functions weren't called earlier (or failed), try best-effort to retrain briefly to get curves.

from sklearn.model_selection import train_test_split
X_tr_sub, X_val_sub, y_tr_sub, y_val_sub = train_test_split(X_train_rfe, y_train_balanced, test_size=0.2, random_state=42, stratify=y_train_balanced)

def safe_plot_xgb_curves(clf, X_tr, y_tr, X_val, y_val, fname_prefix):
    try:
        import xgboost as xgb
        params = clf.get_xgb_params()
        num_round = clf.get_params().get("n_estimators", 100)
        dtrain = xgb.DMatrix(X_tr, label=y_tr)
        dval = xgb.DMatrix(X_val, label=y_val)
        if len(np.unique(y_tr)) > 2:
            params.setdefault("objective", "multi:softprob")
            params["num_class"] = len(np.unique(y_tr))
        else:
            params.setdefault("objective", "binary:logistic")
        params["verbosity"] = 0
        evals_result = {}
        bst = xgb.train(params, dtrain, num_boost_round=num_round,
                        evals=[(dtrain, "train"), (dval, "validation")],
                        early_stopping_rounds=20,
                        evals_result=evals_result,
                        verbose_eval=False)
        # plot metric
        metric = list(evals_result["train"].keys())[0]
        train_vals = evals_result["train"][metric]
        val_vals = evals_result["validation"][metric]
        epochs = range(1, len(train_vals)+1)
        fig, ax = plt.subplots()
        ax.plot(epochs, train_vals, label=f"train {metric}")
        ax.plot(epochs, val_vals, label=f"val {metric}")
        if hasattr(bst, "best_iteration"):
            ax.axvline(bst.best_iteration+1, color='grey', linestyle='--', label=f"best_iter={bst.best_iteration+1}")
        ax.set_xlabel("Boosting round")
        ax.set_ylabel(metric)
        ax.set_title("XGBoost train/val")
        ax.legend()
        path = os.path.join(output_dir, f"{fname_prefix}_xgb_{metric}.png")
        fig.savefig(path)
        maybe_show()
        plt.close(fig)
        print("Saved XGBoost curve:", path)
    except Exception as e:
        print("XGBoost curve plotting skipped/failed:", e)

def safe_plot_lgbm_curves(clf, X_tr, y_tr, X_val, y_val, fname_prefix):
    try:
        import lightgbm as lgb
        params = clf.get_params().copy()
        num_round = params.get("n_estimators", 100)
        ltrain = lgb.Dataset(X_tr, label=y_tr)
        lval = lgb.Dataset(X_val, label=y_val, reference=ltrain)
        if len(np.unique(y_tr)) > 2:
            params.setdefault("objective", "multiclass")
            params["num_class"] = len(np.unique(y_tr))
        else:
            params.setdefault("objective", "binary")
        params["verbose"] = -1
        evals_result = {}
        bst = lgb.train(params, ltrain, num_boost_round=num_round,
                        valid_sets=[ltrain, lval],
                        valid_names=["train","validation"],
                        early_stopping_rounds=20,
                        evals_result=evals_result,
                        verbose_eval=False)
        metric = list(evals_result["train"].keys())[0]
        train_vals = evals_result["train"][metric]
        val_vals = evals_result["validation"][metric]
        epochs = range(1, len(train_vals)+1)
        fig, ax = plt.subplots()
        ax.plot(epochs, train_vals, label=f"train {metric}")
        ax.plot(epochs, val_vals, label=f"val {metric}")
        ax.axvline(bst.best_iteration, color='grey', linestyle='--', label=f"best_iter={bst.best_iteration}")
        ax.set_xlabel("Boosting round")
        ax.set_ylabel(metric)
        ax.set_title("LightGBM train/val")
        ax.legend()
        path = os.path.join(output_dir, f"{fname_prefix}_lgbm_{metric}.png")
        fig.savefig(path)
        maybe_show()
        plt.close(fig)
        print("Saved LightGBM curve:", path)
    except Exception as e:
        print("LightGBM curve plotting skipped/failed:", e)

# call safe plotting on best models if available
try:
    if 'best_xgb' in globals():
        safe_plot_xgb_curves(best_xgb, X_tr_sub, y_tr_sub, X_val_sub, y_val_sub, "best_xgb")
    if 'best_lgbm' in globals():
        safe_plot_lgbm_curves(best_lgbm, X_tr_sub, y_tr_sub, X_val_sub, y_val_sub, "best_lgbm")
    # if final estimator is xgb, try it as well
    if isinstance(stacking_clf.final_estimator_, XGBClassifier):
        safe_plot_xgb_curves(stacking_clf.final_estimator_, X_tr_sub, y_tr_sub, X_val_sub, y_val_sub, "stacking_final_xgb")
except Exception as e:
    print("Error while plotting train/val curves:", e)

# 4) Boxplots for final selected features (grouped by class) — save + show
try:
    if len(final_features) > 0:
        df_features = pd.DataFrame(X_train_rfe, columns=final_features)
        df_features['label'] = y_train_balanced
        # per-feature boxplot grouped by label
        for feat in final_features:
            fig = plt.figure(figsize=(6,4))
            # pandas boxplot grouped by label
            box = df_features.boxplot(column=feat, by='label')
            plt.title(f"{feat} by class")
            plt.suptitle("")  # remove automatic suptitle
            plt.xlabel("Class")
            plt.ylabel(feat)
            path = os.path.join(output_dir, f"boxplot_{feat}.png")
            fig = plt.gcf()
            fig.savefig(path)
            maybe_show()
            plt.close()
        print("Saved boxplots for final features to", output_dir)
    else:
        print("No final_features available for boxplots.")
except Exception as e:
    print("Boxplot generation failed:", e)

# 5) Save classification reports
from sklearn.metrics import classification_report
train_report = classification_report(y_train_balanced, y_train_pred, digits=4)
test_report = classification_report(y_test, y_test_pred, digits=4)
with open(os.path.join(output_dir, "classification_reports.txt"), "w", encoding="utf-8") as f:
    f.write("Train Classification Report\n")
    f.write(train_report + "\n\n")
    f.write("Test Classification Report\n")
    f.write(test_report + "\n\n")

print("All additional evaluation artifacts saved in ./" + output_dir + "/")
