In [5]:
# -*- coding: utf-8 -*-
"""
NeuroWell – MRI (2D slice) classifier using explicit class folder paths (Unicode-safe).
- Uses your four class folders directly (no tf.io directory scanning)
- Unicode-safe OpenCV loader via np.fromfile + cv2.imdecode
- Trains EfficientNetB0 (transfer learning)
- Saves artifacts: .h5, .pkl, .yaml, .json

Run:
  python neurowell_build_artifacts_paths.py
"""

import os
import sys
import json
import yaml
import time
import glob
import pickle
import random
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

# ---- ML / CV
import cv2
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import EfficientNetB0
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# -------------------------------------------------------
# 0) YOUR CLASS FOLDERS (exact Windows paths you provided)
# -------------------------------------------------------
CLASS_DIRS = [
    r"C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\archive\OriginalDataset\VeryMildDemented",
    r"C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\archive\OriginalDataset\NonDemented",
    r"C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\archive\OriginalDataset\ModerateDemented",
    r"C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\archive\OriginalDataset\MildDemented",
]

# Output directory
OUT_DIR = Path(r"C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# ---------------------------
# 1) Config / Hyperparams
# ---------------------------
CFG = {
    "seed": 42,
    "img_size": 224,
    "batch_size": 32,
    "epochs": 5,               # increase to 10–20 for better results
    "val_split": 0.2,
    "learning_rate": 1e-4,
    "label_smoothing": 0.0,
    "augment": True,
    "model_name": "EfficientNetB0",
    "artifact_prefix": "neurowell",
}

random.seed(CFG["seed"])
np.random.seed(CFG["seed"])
tf.random.set_seed(CFG["seed"])

# ---------------------------
# 2) Collect files & labels
# ---------------------------
def list_images(folder: str) -> List[str]:
    exts = ["*.jpg", "*.jpeg", "*.png", "*.bmp", "*.tif", "*.tiff"]
    files = []
    for e in exts:
        files.extend(glob.glob(os.path.join(folder, e)))
    return sorted(files)

def collect_dataset(class_dirs: List[str]) -> Tuple[List[str], List[str], Dict[str, int]]:
    paths, labels = [], []
    counts = {}
    for cdir in class_dirs:
        cname = Path(cdir).name  # class name from folder
        imgs = list_images(cdir)
        counts[cname] = len(imgs)
        for p in imgs:
            paths.append(p)
            labels.append(cname)
    return paths, labels, counts

paths, labels, class_counts = collect_dataset(CLASS_DIRS)
if sum(class_counts.values()) == 0:
    raise FileNotFoundError(
        "[ERROR] No images found. Check that your folders contain JPG/PNG/BMP/TIF files."
    )

# Deterministic class order
class_names_sorted = sorted(class_counts.keys())
le = LabelEncoder().fit(class_names_sorted)
y = le.transform(labels)  # numeric labels

# Train/val split (stratified)
train_paths, val_paths, train_y, val_y = train_test_split(
    paths, y, test_size=CFG["val_split"], random_state=CFG["seed"], stratify=y
)

print("[INFO] Classes:", list(le.classes_))
print("[INFO] Class counts:", class_counts)
print("[INFO] Train/Val sizes:", len(train_paths), len(val_paths))

# ---------------------------
# 3) Unicode-safe image reader
# ---------------------------
# cv2.imread has issues with Unicode on Windows; use np.fromfile + cv2.imdecode
def cv2_imread_unicode(path: str) -> np.ndarray:
    data = np.fromfile(path, dtype=np.uint8)
    img = cv2.imdecode(data, cv2.IMREAD_COLOR)  # BGR
    return img

def py_load_image(path_str: str, target_hw=(224, 224)) -> np.ndarray:
    img_bgr = cv2_imread_unicode(path_str)
    if img_bgr is None:
        raise FileNotFoundError(f"Could not read image: {path_str}")
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
    if img_rgb.ndim == 2:
        img_rgb = cv2.cvtColor(img_rgb, cv2.COLOR_GRAY2RGB)
    img_resized = cv2.resize(img_rgb, target_hw[::-1], interpolation=cv2.INTER_AREA)
    return img_resized.astype(np.float32)  # 0..255 float32

IMG_SIZE = (CFG["img_size"], CFG["img_size"])

def tf_load_and_preprocess(path, label):
    # FIX: use .numpy().decode('utf-8') for tf.string tensor
    def _py(path_tensor):
        if isinstance(path_tensor, (bytes, bytearray, np.bytes_)):
            p = path_tensor.decode("utf-8")
        else:
            p = path_tensor.numpy().decode("utf-8")
        arr = py_load_image(p, target_hw=IMG_SIZE)  # float32 0..255
        return arr
    img = tf.py_function(func=_py, inp=[path], Tout=tf.float32)
    img.set_shape((IMG_SIZE[0], IMG_SIZE[1], 3))  # static shape
    return img, label

def tf_augment(img, label):
    if CFG["augment"]:
        img = tf.image.random_flip_left_right(img)
        img = tf.image.random_brightness(img, max_delta=8.0)  # 0..255 space
        img = tf.clip_by_value(img, 0.0, 255.0)
    return img, label

# ---------------------------
# 4) Build tf.data pipelines
# ---------------------------
num_classes = len(le.classes_)
BATCH = CFG["batch_size"]

train_ds = tf.data.Dataset.from_tensor_slices((train_paths, train_y))
val_ds   = tf.data.Dataset.from_tensor_slices((val_paths,   val_y))

train_ds = (train_ds
    .shuffle(buffer_size=len(train_paths), seed=CFG["seed"], reshuffle_each_iteration=True)
    .map(tf_load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    .map(tf_augment, num_parallel_calls=tf.data.AUTOTUNE)
    .map(lambda x, y: (x, tf.one_hot(y, depth=num_classes, dtype=tf.float32)),
         num_parallel_calls=tf.data.AUTOTUNE)
    .batch(BATCH)
    .prefetch(tf.data.AUTOTUNE)
)

val_ds = (val_ds
    .map(tf_load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    .map(lambda x, y: (x, tf.one_hot(y, depth=num_classes, dtype=tf.float32)),
         num_parallel_calls=tf.data.AUTOTUNE)
    .batch(BATCH)
    .prefetch(tf.data.AUTOTUNE)
)

# ---------------------------
# 5) Build model
# ---------------------------
def build_model(num_classes: int) -> keras.Model:
    inputs = layers.Input(shape=(IMG_SIZE[0], IMG_SIZE[1], 3), dtype=tf.float32)
    # Do NOT add extra Rescaling here; EfficientNet includes preprocessing layers internally.
    try:
        base = EfficientNetB0(include_top=False, weights="imagenet", input_tensor=inputs)
    except Exception as e:
        print(f"[WARN] Could not load ImageNet weights ({e}); using random init.")
        base = EfficientNetB0(include_top=False, weights=None, input_tensor=inputs)

    base.trainable = False  # warmup

    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dropout(0.2)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)

    model = keras.Model(inputs, outputs, name="neurowell_efficientnetb0")
    opt = keras.optimizers.Adam(learning_rate=CFG["learning_rate"])
    loss = keras.losses.CategoricalCrossentropy(label_smoothing=CFG["label_smoothing"])
    model.compile(optimizer=opt, loss=loss, metrics=["accuracy"])
    return model

model = build_model(num_classes)
model.summary()

# ---------------------------
# 6) Train
# ---------------------------
callbacks = [
    keras.callbacks.ModelCheckpoint(
        filepath=str(OUT_DIR / f"{CFG['artifact_prefix']}_best.keras"),
        monitor="val_accuracy",
        save_best_only=True,
        verbose=1,
    ),
    keras.callbacks.EarlyStopping(
        monitor="val_accuracy",
        patience=3,
        restore_best_weights=True,
        verbose=1,
    ),
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=CFG["epochs"],
    callbacks=callbacks,
)

# ---------------------------
# 7) Evaluate & sample preds
# ---------------------------
eval_res = model.evaluate(val_ds, verbose=0)
metrics = {"val_loss": float(eval_res[0]), "val_accuracy": float(eval_res[1])}

# small sample of predictions for the report
sample_imgs, sample_onehot = next(iter(val_ds))
probs = model.predict(sample_imgs, verbose=0)
pred_ids = np.argmax(probs, axis=1)
true_ids = np.argmax(sample_onehot.numpy(), axis=1)

sample_records = []
for i in range(min(len(pred_ids), 12)):
    sample_records.append({
        "true": le.classes_[int(true_ids[i])],
        "pred": le.classes_[int(pred_ids[i])],
        "confidence": float(np.max(probs[i])),
    })

# ---------------------------
# 8) Save artifacts: h5 / pkl / yaml / json
# ---------------------------
# ---------------------------
# 8) Save artifacts: h5 / pkl / yaml / json
# ---------------------------

# 8a) H5 model
h5_path = OUT_DIR / f"{CFG['artifact_prefix']}_model.h5"
model.save(h5_path, include_optimizer=False)
print(f"[SAVE] Model (H5): {h5_path}")

# 8b) PKL (LabelEncoder + mappings) — keep for inference
pkl_path = OUT_DIR / "label_encoder.pkl"
classes_list_np = le.classes_                # numpy array (numpy.str_)
classes_list = [str(c) for c in classes_list_np.tolist()]  # -> pure Python str
with open(pkl_path, "wb") as f:
    pickle.dump({
        "classes_": classes_list,
        "class_to_index": {c: int(i) for i, c in enumerate(classes_list)},
        "index_to_class": {int(i): c for i, c in enumerate(classes_list)},
    }, f)
print(f"[SAVE] Label encoder (PKL): {pkl_path}")

# ---- sanitize everything for YAML/JSON (no NumPy/TensorFlow types) ----
class_counts_py = {str(k): int(v) for k, v in class_counts.items()}
metrics_py      = {str(k): float(v) for k, v in metrics.items()}
# ensure sample records are python-native
sample_records_py = [
    {
        "true": str(r["true"]) if not isinstance(r["true"], str) else r["true"],
        "pred": str(r["pred"]) if not isinstance(r["pred"], str) else r["pred"],
        "confidence": float(r["confidence"]),
    }
    for r in sample_records
]

# 8c) YAML (config + paths + classes)
yaml_obj = {
    "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
    "class_dirs": [str(p) for p in CLASS_DIRS],
    "out_dir": str(OUT_DIR),
    "classes": classes_list,          # <- pure Python strings
    "class_counts": class_counts_py,  # <- pure Python ints
    "hyperparameters": {
        "img_size": int(CFG["img_size"]),
        "batch_size": int(CFG["batch_size"]),
        "epochs": int(CFG["epochs"]),
        "learning_rate": float(CFG["learning_rate"]),
        "val_split": float(CFG["val_split"]),
        "augment": bool(CFG["augment"]),
        "model_name": str(CFG["model_name"]),
        "label_smoothing": float(CFG["label_smoothing"]),
    },
    "artifacts": {
        "model_h5": str(h5_path),
        "best_checkpoint": str(OUT_DIR / f"{CFG['artifact_prefix']}_best.keras"),
        "label_encoder_pkl": str(pkl_path),
        "report_json": str(OUT_DIR / "report.json"),
    },
    "notes": "Dataset loaded via Python/OpenCV to avoid Unicode issues in tf.io.",
}
yaml_path = OUT_DIR / "config.yaml"
with open(yaml_path, "w", encoding="utf-8") as f:
    yaml.safe_dump(yaml_obj, f, sort_keys=False, allow_unicode=True)
print(f"[SAVE] Config (YAML): {yaml_path}")

# 8d) JSON report (also keep it Python-native)
report = {
    "project": "NeuroWell",
    "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
    "class_dirs": [str(p) for p in CLASS_DIRS],
    "classes": classes_list,
    "class_counts": class_counts_py,
    "metrics": metrics_py,
    "samples": sample_records_py,
    "notes": "Prototype; not for clinical use.",
}
json_path = OUT_DIR / "report.json"
with open(json_path, "w", encoding="utf-8") as f:
    json.dump(report, f, indent=2, ensure_ascii=False)
print(f"[SAVE] Report (JSON): {json_path}")

print("\n[DONE] Artifacts generated:")
print(f"  - {h5_path}")
print(f"  - {pkl_path}")
print(f"  - {yaml_path}")
print(f"  - {json_path}")


[INFO] Classes: ['MildDemented', 'ModerateDemented', 'NonDemented', 'VeryMildDemented']
[INFO] Class counts: {'VeryMildDemented': 2240, 'NonDemented': 3200, 'ModerateDemented': 64, 'MildDemented': 896}
[INFO] Train/Val sizes: 5120 1280
Model: "neurowell_efficientnetb0"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_3 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 rescaling_5 (Rescaling)     (None, 224, 224, 3)          0         ['input_3[0][0]']             
                                                                                                  
 normalization_2 (Normaliza  (None, 224, 224, 3)          7         ['rescaling_5[0][0]']         
 tion)                               

  saving_api.save_model(


[SAVE] Model (H5): C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\neurowell_model.h5
[SAVE] Label encoder (PKL): C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\label_encoder.pkl
[SAVE] Config (YAML): C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\config.yaml
[SAVE] Report (JSON): C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\report.json

[DONE] Artifacts generated:
  - C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\neurowell_model.h5
  - C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\label_encoder.pkl
  - C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\config.yaml
  - C:\Users\NXTWAVE\Downloads\Alzheimer’s Detection\report.json
