In [None]:
# kernel_search_metrics.py
"""
Grid search over kernel sizes (conv1, conv2) with Stratified K-Fold evaluation.
Saves aggregated metrics (mean + std over folds) to kernel_search_results.csv.

Notes:
- Uses user's loader (expects PNG scalograms in the two folders).
- Metrics: balanced accuracy, precision, recall, F1 (binary averaging, schizophrenia = positive class (1)).
- Uses TF/Keras for model; scikit-learn for metrics and cross-validation.
"""

import os
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras.preprocessing import image
from tensorflow.keras import layers, models, regularizers, callbacks, optimizers
import tensorflow as tf

# Path to uploaded notebook (provided in history) â€” kept here per request.
UPLOADED_NOTEBOOK_PATH = "/mnt/data/DS 340W Project.ipynb"

# ---------------------------
# 0. User-editable config
# ---------------------------
KERNEL_SIZES = [(3,3), (5,5), (7,7)]
N_SPLITS = 5  # Stratified K-Fold splits
RANDOM_SEED = 42
BATCH_SIZE = 16
EPOCHS = 20
DROPOUT_P = 0.1
RESULTS_CSV = "kernel_search_results.csv"
TARGET_SIZE = (224, 224)  # as in your loader
POS_LABEL = 1  # schizophrenia class is positive
VERBOSE = 1

# ---------------------------
# 1. Try to mount Google Drive (Colab)
# ---------------------------
try:
    from google.colab import drive
    drive.mount('/content/drive')
    print("Mounted Google Drive at /content/drive")
except Exception:
    print("google.colab.drive.mount not available. If not running in Colab, ensure the dataset paths below exist.")

# Update these paths if needed (keeps your original)
healthy_save_path = '/content/drive/MyDrive/DS340W/Data/dataset2/healthy'
schizophrenia_save_path = '/content/drive/MyDrive/DS340W/Data/dataset2/schizophrenic'

healthy_folder = healthy_save_path
schizophrenia_folder = schizophrenia_save_path

# ---------------------------
# 2. Load & preprocess images (your loader adapted)
# ---------------------------
def load_images(folder, label_for_folder):
    images, labels = [], []
    if not os.path.exists(folder):
        raise FileNotFoundError(f"Folder not found: {folder}")
    for filename in sorted(os.listdir(folder)):
        if filename.lower().endswith(".png"):
            img = image.load_img(os.path.join(folder, filename), target_size=TARGET_SIZE)
            img_array = image.img_to_array(img) / 255.0
            images.append(img_array)
            labels.append(label_for_folder)
    if len(images) == 0:
        raise ValueError(f"No PNG images found in folder: {folder}")
    return np.array(images), np.array(labels)

print("Loading healthy images...")
X_healthy, y_healthy = load_images(healthy_folder, 0)
print("Loading schizophrenia images...")
X_schizophrenia, y_schizophrenia = load_images(schizophrenia_folder, 1)

# Merge and shuffle
X = np.concatenate((X_healthy, X_schizophrenia), axis=0)
y = np.concatenate((y_healthy, y_schizophrenia), axis=0)
rng = np.random.RandomState(RANDOM_SEED)
perm = rng.permutation(len(y))
X = X[perm]
y = y[perm]

print(f"Loaded dataset: X.shape={X.shape}, y.shape={y.shape}, #HC={np.sum(y==0)}, #SCZ={np.sum(y==1)}")

# ---------------------------
# 3. Model builder
# ---------------------------
from tensorflow.keras import regularizers

def build_model(kernel_size_1=(3,3), kernel_size_2=(3,3), input_shape=None, dropout_p=0.1):
    if input_shape is None:
        input_shape = X.shape[1:]
    inp = layers.Input(shape=input_shape)

    x = layers.Conv2D(filters=4, kernel_size=kernel_size_1, strides=(2,2),
                      padding='same', activation='relu')(inp)
    x = layers.MaxPool2D(pool_size=(2,2))(x)
    x = layers.Dropout(dropout_p)(x)

    x = layers.Conv2D(filters=8, kernel_size=kernel_size_2, strides=(2,2),
                      padding='same', activation='relu', kernel_regularizer=regularizers.l2(0.01))(x)
    x = layers.GlobalMaxPool2D()(x)
    x = layers.Dropout(dropout_p)(x)

    x = layers.Dense(50, activation='relu')(x)
    out = layers.Dense(2, activation='softmax')(x)

    model = models.Model(inputs=inp, outputs=out)
    model.compile(optimizer=optimizers.Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Try to allow GPU memory growth if GPUs available
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for g in gpus:
            tf.config.experimental.set_memory_growth(g, True)
    except Exception:
        pass

# ---------------------------
# 4. Grid search with Stratified K-Fold
# ---------------------------
from sklearn.model_selection import StratifiedKFold
skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_SEED)

all_results = []

total_runs = len(KERNEL_SIZES) * len(KERNEL_SIZES)
run_counter = 0

for ks1, ks2 in [(a,b) for a in KERNEL_SIZES for b in KERNEL_SIZES]:
    run_counter += 1
    print(f"\n=== Kernel run {run_counter}/{total_runs}: conv1={ks1}, conv2={ks2} ===")

    # Collect fold metrics
    fold_bal_acc = []
    fold_prec = []
    fold_rec = []
    fold_f1 = []
    fold_epochs = []

    fold_idx = 0
    for train_idx, val_idx in skf.split(X, y):
        fold_idx += 1
        print(f"  Fold {fold_idx}/{N_SPLITS}")

        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]

        # Keras expects categorical labels
        y_train_cat = tf.keras.utils.to_categorical(y_train, num_classes=2)
        y_val_cat = tf.keras.utils.to_categorical(y_val, num_classes=2)

        # Build model for this fold
        model = build_model(kernel_size_1=ks1, kernel_size_2=ks2, input_shape=X_train.shape[1:], dropout_p=DROPOUT_P)

        # Callbacks
        es = callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=0)

        history = model.fit(
            X_train, y_train_cat,
            validation_data=(X_val, y_val_cat),
            epochs=EPOCHS,
            batch_size=BATCH_SIZE,
            callbacks=[es],
            verbose=VERBOSE
        )

        fold_epochs.append(len(history.history['loss']))

        # Predict
        y_pred_prob = model.predict(X_val, verbose=0)
        y_pred = np.argmax(y_pred_prob, axis=1)

        # Compute metrics (binary averaging; positive label = POS_LABEL)
        bal_acc = balanced_accuracy_score(y_val, y_pred)
        prec = precision_score(y_val, y_pred, pos_label=POS_LABEL, zero_division=0)
        rec = recall_score(y_val, y_pred, pos_label=POS_LABEL, zero_division=0)
        f1 = f1_score(y_val, y_pred, pos_label=POS_LABEL, zero_division=0)

        print(f"    fold bal_acc={bal_acc:.4f}, prec={prec:.4f}, rec={rec:.4f}, f1={f1:.4f}")

        fold_bal_acc.append(bal_acc)
        fold_prec.append(prec)
        fold_rec.append(rec)
        fold_f1.append(f1)

        # Clean up to reduce GPU memory growth between folds
        tf.keras.backend.clear_session()

    # Aggregate per kernel combo (mean + std)
    result = {
        "kernel_size_conv1": f"{ks1[0]}x{ks1[1]}",
        "kernel_size_conv2": f"{ks2[0]}x{ks2[1]}",
        "bal_acc_mean": float(np.mean(fold_bal_acc)),
        "bal_acc_std": float(np.std(fold_bal_acc)),
        "precision_mean": float(np.mean(fold_prec)),
        "precision_std": float(np.std(fold_prec)),
        "recall_mean": float(np.mean(fold_rec)),
        "recall_std": float(np.std(fold_rec)),
        "f1_mean": float(np.mean(fold_f1)),
        "f1_std": float(np.std(fold_f1)),
        "mean_train_epochs": float(np.mean(fold_epochs))
    }
    all_results.append(result)

# ---------------------------
# 5. Save results and print summary
# ---------------------------
df = pd.DataFrame(all_results)
df = df.sort_values(by="f1_mean", ascending=False).reset_index(drop=True)
df.to_csv(RESULTS_CSV, index=False)
print(f"\nGrid search complete. Aggregated results saved to: {RESULTS_CSV}")
print(df)

# Also show the uploaded notebook path (per your developer request)
print("\nUploaded notebook path (from conversation history):")
print(UPLOADED_NOTEBOOK_PATH)
