In [6]:
# === evaluate_neurofit_topk_cm_fixed.py ===
import os, numpy as np, pandas as pd, matplotlib.pyplot as plt, seaborn as sns, joblib
from collections import Counter
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.models import load_model

BASE_DIR  = r"C:\Users\sagni\Downloads\Neuro Fit"
CSV_PATH  = os.path.join(BASE_DIR, "archive", "human_cognitive_performance.csv")
H5_PATH   = os.path.join(BASE_DIR, "neurofit_model.h5")
PKL_PATH  = os.path.join(BASE_DIR, "neurofit_preprocess.pkl")
HIST_CSV  = os.path.join(BASE_DIR, "training_history.csv")  # optional

class ColumnSelector(BaseEstimator, TransformerMixin):
    def __init__(self, column): self.column = column
    def fit(self, X, y=None): return self
    def transform(self, X): return X[[self.column]]

class To1DString(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None): return self
    def transform(self, X):
        if isinstance(X, pd.DataFrame):
            return X.iloc[:, 0].astype(str).values
        return np.asarray(X).astype(str).ravel()

class DateTimeExpand(BaseEstimator, TransformerMixin):
    def __init__(self, columns): self.columns = columns
    def fit(self, X, y=None): return self
    def transform(self, X):
        outs = []
        for c in self.columns:
            s = pd.to_datetime(X[c], errors="coerce")
            outs.append(pd.DataFrame({
                f"{c}_year":  s.dt.year.fillna(0).astype(int),
                f"{c}_month": s.dt.month.fillna(0).astype(int),
                f"{c}_day":   s.dt.day.fillna(0).astype(int),
                f"{c}_dow":   s.dt.dayofweek.fillna(0).astype(int),
                f"{c}_hour":  s.dt.hour.fillna(0).astype(int),
            }))
        return pd.concat(outs, axis=1) if outs else np.empty((len(X), 0))

def ensure_dense_if_small(X, max_feats=50000):
    if hasattr(X, "toarray") and X.shape[1] <= max_feats:
        return X.toarray()
    return X

def plot_curve(epochs, y_tr, y_va, title, ylabel, out_path):
    plt.figure(figsize=(7,5))
    plt.plot(epochs, y_tr, label="Train")
    if y_va is not None: plt.plot(epochs, y_va, label="Val")
    plt.title(title); plt.xlabel("Epoch"); plt.ylabel(ylabel)
    plt.legend(); plt.grid(True, linestyle="--", alpha=0.6)
    plt.tight_layout(); plt.savefig(out_path, dpi=150); plt.close()
    print(f"[SAVE] {out_path}")

def plot_cm(cm_mat, labels, title, path, fmt="d", cmap="Blues"):
    plt.figure(figsize=(8,7))
    sns.heatmap(cm_mat, annot=True, fmt=fmt, cmap=cmap,
                xticklabels=labels, yticklabels=labels)
    plt.title(title); plt.xlabel("Predicted"); plt.ylabel("True")
    plt.tight_layout(); plt.savefig(path, dpi=150); plt.close()
    print(f"[SAVE] {path}")

# --- load artifacts ---
if not os.path.exists(PKL_PATH): raise FileNotFoundError(PKL_PATH)
if not os.path.exists(H5_PATH):  raise FileNotFoundError(H5_PATH)
bundle = joblib.load(PKL_PATH)
print("[INFO] Keys in PKL:", list(bundle.keys()))
preprocess     = bundle.get("preprocess")
target_col     = bundle.get("target_col")
datetime_cols  = bundle.get("datetime_cols", [])
label_encoder  = bundle.get("label_encoder", None)
if label_encoder is None:
    print("[WARN] No label_encoder in PKL – will fit one from dataset (order may differ from training)")

# --- load data ---
df = pd.read_csv(CSV_PATH)
for c in datetime_cols:
    if c in df.columns: df[c] = pd.to_datetime(df[c], errors="coerce")
if target_col not in df.columns:
    raise KeyError(f"Target column '{target_col}' not in CSV. Available: {list(df.columns)}")

X_df = df.drop(columns=[target_col])
y_raw = df[target_col].astype(str)
if label_encoder is None: label_encoder = LabelEncoder().fit(y_raw)
y = label_encoder.transform(y_raw)
class_names = [str(c) for c in label_encoder.classes_]
n_classes = len(class_names)

counts = Counter(y)
print("[INFO] #classes:", n_classes, " | #samples:", len(y))
print("[INFO] Class distribution (encoded) – showing first 10:", dict(list(counts.items())[:10]))
use_stratify = min(counts.values()) >= 2 if n_classes > 1 else False
if not use_stratify: print("[WARN] Some classes have <2 samples; using non-stratified split.")

X_train_df, X_test_df, y_train, y_test = train_test_split(
    X_df, y, test_size=0.2, random_state=42, stratify=y if use_stratify else None
)

X_test = preprocess.transform(X_test_df)
X_test = ensure_dense_if_small(X_test)

# >>> FIX: load without compiling (avoids 'mse' deserialization error)
model = load_model(H5_PATH, compile=False)

# --- optional curves ---
if os.path.exists(HIST_CSV):
    hist = pd.read_csv(HIST_CSV)
    epochs = hist["epoch"] if "epoch" in hist.columns else np.arange(1, len(hist)+1)
    acc_col = "accuracy" if "accuracy" in hist.columns else ("acc" if "acc" in hist.columns else None)
    val_acc_col = "val_accuracy" if "val_accuracy" in hist.columns else ("val_acc" if "val_acc" in hist.columns else None)
    if acc_col:
        plot_curve(epochs, hist[acc_col],
                   hist[val_acc_col] if (val_acc_col and val_acc_col in hist.columns) else None,
                   "Model Accuracy", "Accuracy", os.path.join(BASE_DIR, "accuracy_curve.png"))
    if "loss" in hist.columns:
        plot_curve(epochs, hist["loss"],
                   hist["val_loss"] if "val_loss" in hist.columns else None,
                   "Model Loss", "Loss", os.path.join(BASE_DIR, "loss_curve.png"))
else:
    print("[INFO] training_history.csv not found – skipping curves")

# --- predictions + top-k ---
probs = model.predict(X_test, verbose=0)
if probs.ndim == 1:
    probs = np.stack([1 - probs, probs], axis=1)

num_model_classes = probs.shape[1]
if num_model_classes != n_classes:
    print(f"[WARN] Model outputs {num_model_classes} classes, encoder has {n_classes}. Metrics may not align.")

top1_preds = np.argmax(probs, axis=1)
top1_acc = (top1_preds == y_test).mean()

def topk_acc(probs, y_true, k=5):
    k = min(k, probs.shape[1])
    topk = np.argpartition(-probs, kth=k-1, axis=1)[:, :k]
    return np.mean([y_true[i] in topk[i] for i in range(len(y_true))])

top5_acc = topk_acc(probs, y_test, k=5)
print(f"\n=== Top-K Accuracy ===\nTop-1 accuracy: {top1_acc:.4f}\nTop-5 accuracy: {top5_acc:.4f}")
with open(os.path.join(BASE_DIR, "topk_metrics.txt"), "w", encoding="utf-8") as f:
    f.write(f"Top-1 accuracy: {top1_acc:.6f}\nTop-5 accuracy: {top5_acc:.6f}\n")
print(f"[SAVE] {os.path.join(BASE_DIR, 'topk_metrics.txt')}")

# --- Confusion Matrix on Top-N only ---
TOP_N = 20
from sklearn.metrics import confusion_matrix
test_counts = Counter(y_test)
topN_ids = [c for c, _ in test_counts.most_common(TOP_N)]
other_id = max(n_classes, probs.shape[1]) + 1

def map_to_topN(y_arr, top_ids, other_label):
    return np.array([yi if yi in top_ids else other_label for yi in y_arr], dtype=int)

y_test_top = map_to_topN(y_test, topN_ids, other_id)
y_pred_top = map_to_topN(top1_preds, topN_ids, other_id)
labels_for_cm_ids = topN_ids + [other_id]
labels_for_cm_names = [class_names[i] for i in topN_ids] + ["Other"]

cm = confusion_matrix(y_test_top, y_pred_top, labels=labels_for_cm_ids)
cm_norm = cm.astype(float) / (cm.sum(axis=1, keepdims=True) + 1e-12)

plot_cm(cm, labels_for_cm_names, f"Confusion Matrix (Top {TOP_N} + Other)",
        os.path.join(BASE_DIR, "cm_topN_counts.png"), fmt="d", cmap="Blues")
plot_cm(cm_norm, labels_for_cm_names, f"Confusion Matrix (Top {TOP_N} + Other, row-norm)",
        os.path.join(BASE_DIR, "cm_topN_norm.png"), fmt=".2f", cmap="Greens")

# optional: per-class report for Top-N only
mask_top = (y_test_top != other_id)
if mask_top.any():
    rep = classification_report(
        y_test_top[mask_top], y_pred_top[mask_top],
        target_names=[class_names[i] for i in topN_ids],
        zero_division=0
    )
    with open(os.path.join(BASE_DIR, "classification_report_topN.txt"), "w", encoding="utf-8") as f:
        f.write(rep)
    print(rep)
    print(f"[SAVE] {os.path.join(BASE_DIR, 'classification_report_topN.txt')}")
else:
    print("[INFO] No samples from Top-N classes in test split; skipped Top-N report.")


[INFO] Keys in PKL: ['preprocess', 'task_type', 'target_col', 'numeric_cols', 'cat_cols', 'text_cols', 'datetime_cols']
[WARN] No label_encoder in PKL – will fit one from dataset (order may differ from training)
[INFO] #classes: 9492  | #samples: 80000
[INFO] Class distribution (encoded) – showing first 10: {2763: 14, 4646: 8, 2349: 8, 6362: 5, 8183: 4, 2263: 6, 5128: 13, 7931: 14, 4278: 14, 3585: 11}
[WARN] Some classes have <2 samples; using non-stratified split.
[SAVE] C:\Users\sagni\Downloads\Neuro Fit\loss_curve.png
[WARN] Model outputs 1 classes, encoder has 9492. Metrics may not align.

=== Top-K Accuracy ===
Top-1 accuracy: 0.0022
Top-5 accuracy: 0.0022
[SAVE] C:\Users\sagni\Downloads\Neuro Fit\topk_metrics.txt
[SAVE] C:\Users\sagni\Downloads\Neuro Fit\cm_topN_counts.png
[SAVE] C:\Users\sagni\Downloads\Neuro Fit\cm_topN_norm.png
              precision    recall  f1-score   support

       100.0       0.05      1.00      0.09        35
         0.0       0.00      0.00      0.0