In [18]:
import numpy as np
import os
import scipy.io
from scipy.signal import welch
from tensorflow.keras.utils import to_categorical

# === Constants ===
sampling_rate = 250
segment_duration = 3  # in seconds
segment_length = sampling_rate * segment_duration
data_dir = "C:/Users/kavis/helloji/1/"  # EEG Session 1 folder
label_map = [1, 2, 3, 0, 2, 0, 0, 1, 0, 1, 2, 1, 1, 1, 2, 3, 2, 2, 3, 3, 0, 3, 0, 3]  # Session 1 labels


In [19]:
X_all, y_all, segment_meta = [], [], []

for filename in os.listdir(data_dir):
    if filename.endswith(".mat"):
        mat = scipy.io.loadmat(os.path.join(data_dir, filename))
        subject_id = int(filename.split('_')[0])
        prefix = [k.split('_')[0] for k in mat.keys() if '_eeg1' in k][0]

        for trial in range(1, 25):
            key = f"{prefix}_eeg{trial}"
            if key in mat:
                eeg = mat[key]
                label = label_map[trial - 1]
                for seg_idx in range(eeg.shape[1] // segment_length):
                    segment = eeg[:, seg_idx * segment_length:(seg_idx + 1) * segment_length]
                    if segment.shape[1] == segment_length:
                        X_all.append(segment)
                        y_all.append(label)
                        segment_meta.append([subject_id, trial])

X_all = np.array(X_all, dtype=np.float32)
y_all = np.array(y_all)
segment_meta = np.array(segment_meta)
y_cat = to_categorical(y_all, num_classes=4)

np.save("segment_meta.npy", segment_meta)


In [23]:
def compute_de_features(segment):
    bands = {'delta': (1, 4), 'theta': (4, 8), 'alpha': (8, 14), 'beta': (14, 31), 'gamma': (31, 50)}
    features = []
    for ch in segment:
        freqs, psd = welch(ch, fs=250, nperseg=256)
        band_de = []
        for band in bands.values():
            idx = (freqs >= band[0]) & (freqs <= band[1])
            power = np.trapz(psd[idx], freqs[idx])
            de = 0.5 * np.log2(2 * np.pi * np.e * power) if power > 0 else 0
            band_de.append(de)
        features.append(band_de)
    return np.array(features)

def compute_psd_features(X):
    bands = {'delta': (1, 4), 'theta': (4, 8), 'alpha': (8, 13), 'beta': (13, 30), 'gamma': (30, 50)}
    psd_feats = np.zeros((X.shape[0], 62, len(bands)))
    for i in range(X.shape[0]):
        for ch in range(62):
            freqs, psd = welch(X[i, ch], fs=250, nperseg=256)
            for j, (low, high) in enumerate(bands.values()):
                idx = (freqs >= low) & (freqs <= high)
                psd_feats[i, ch, j] = np.trapz(psd[idx], freqs[idx])
    return psd_feats



# Load precomputed DE and PSD feature files
X_de = np.load("X_de4.npy")     # Shape: (samples, 62, 5)
X_psd = np.load("X_psd4.npy")   # Shape: (samples, 62, 5)

# Concatenate along the last axis → shape: (samples, 62, 10)
X_feat_all = np.concatenate([X_de, X_psd], axis=-1)


In [24]:
X_eye_trial = np.load("X_eye_session1.npy")  # shape (120, 19)
segment_meta = np.load("segment_meta.npy")

X_eye_segment = np.zeros((segment_meta.shape[0], X_eye_trial.shape[1]))
for i, (subj, trial) in enumerate(segment_meta):
    trial_index = (subj - 1) * 24 + (trial - 1)
    X_eye_segment[i] = X_eye_trial[trial_index]

np.save("X_eye_4520.npy", X_eye_segment)


In [25]:
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    x = LayerNormalization(epsilon=1e-6)(inputs)
    x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = Dropout(dropout)(x)
    res = Add()([x, inputs])
    x = LayerNormalization(epsilon=1e-6)(res)
    x = Dense(ff_dim, activation="relu")(x)
    x = Dropout(dropout)(x)
    x = Dense(inputs.shape[-1])(x)
    return Add()([x, res])

def channel_attention_block(x):
    attn = Dense(1, activation='tanh')(x)
    attn = Softmax(axis=1)(attn)
    return Multiply()([x, attn])

def build_triple_branch_model(input_shape_eeg=(62, 750),
                               input_shape_feat=(62, 10),
                               input_shape_eye=(19,),
                               num_classes=4):
    # Branch 1: Raw EEG
    eeg_input = Input(shape=input_shape_eeg, name="eeg_input")
    x1 = Permute((2, 1))(eeg_input)
    x1 = SeparableConv1D(64, 5, padding='same', activation='relu')(x1)
    x1 = BatchNormalization()(x1)
    x1 = MaxPooling1D(pool_size=2)(x1)
    x1 = SeparableConv1D(128, 3, padding='same', activation='relu')(x1)
    x1 = BatchNormalization()(x1)
    x1 = MaxPooling1D(pool_size=2)(x1)
    x1 = Bidirectional(GRU(64, return_sequences=True))(x1)
    x1 = transformer_encoder(x1, head_size=64, num_heads=4, ff_dim=128, dropout=0.1)
    attn = Dense(1, activation='tanh')(x1)
    attn = Softmax(axis=1)(attn)
    x1 = Multiply()([x1, attn])
    x1 = GlobalAveragePooling1D()(x1)
    x1 = Dropout(0.3)(x1)

    # Branch 2: EEG Features
    feat_input = Input(shape=input_shape_feat, name="eeg_feat_input")
    x2 = Dense(64, activation='relu')(feat_input)
    x2 = channel_attention_block(x2)
    x2 = Flatten()(x2)
    x2 = Dropout(0.3)(x2)

    # Branch 3: Eye Features
    eye_input = Input(shape=input_shape_eye, name="eye_input")
    x3 = Dense(32, activation='relu')(eye_input)
    x3 = Dense(32, activation='relu')(x3)
    x3 = Dropout(0.3)(x3)

    # Fusion
    x = Concatenate()([x1, x2, x3])
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.4)(x)
    output = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=[eeg_input, feat_input, eye_input], outputs=output)
    model.compile(optimizer='adam',
                  loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
                  metrics=['accuracy'])
    return model


In [22]:
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import KFold
from tensorflow.keras.utils import Sequence

class EEGTripleInputGenerator(Sequence):
    def __init__(self, X1, X2, X3, y, indices, batch_size=32, shuffle=True):
        self.X1, self.X2, self.X3, self.y = X1, X2, X3, y
        self.indices = np.array(indices)
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.indices) / self.batch_size))

    def __getitem__(self, i):
        batch_idx = self.indices[i*self.batch_size:(i+1)*self.batch_size]
        return {
            "eeg_input": self.X1[batch_idx],
            "eeg_feat_input": self.X2[batch_idx],
            "eye_input": self.X3[batch_idx],
        }, self.y[batch_idx]

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

def run_kfold_training(X_eeg, X_feat, X_eye, y_cat, n_splits=5):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    val_accuracies = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(np.arange(len(y_cat)))):
        print(f"\n Fold {fold + 1}/{n_splits}")
        train_gen = EEGTripleInputGenerator(X_eeg, X_feat, X_eye, y_cat, train_idx)
        val_gen = EEGTripleInputGenerator(X_eeg, X_feat, X_eye, y_cat, val_idx, shuffle=False)

        model = build_triple_branch_model()
        es = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)
        lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1)

        model.fit(train_gen, validation_data=val_gen, epochs=30, callbacks=[es, lr], verbose=1)

        val_acc = model.evaluate(val_gen, verbose=0)[1]
        val_accuracies.append(val_acc)
        print(f" Fold {fold + 1} Accuracy: {val_acc:.4f}")

    print(f"\n Average Validation Accuracy over {n_splits} folds: {np.mean(val_accuracies):.4f}")


In [14]:
run_kfold_training(X_all, X_feat_all, X_eye_segment, y_cat, n_splits=5)



 Fold 1/5
Epoch 1/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m293s[0m 807ms/step - accuracy: 0.3219 - loss: 6.2680 - val_accuracy: 0.4521 - val_loss: 1.8381 - learning_rate: 0.0010
Epoch 2/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m323s[0m 813ms/step - accuracy: 0.3880 - loss: 3.0519 - val_accuracy: 0.4830 - val_loss: 1.3030 - learning_rate: 0.0010
Epoch 3/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m268s[0m 790ms/step - accuracy: 0.4328 - loss: 1.4790 - val_accuracy: 0.5667 - val_loss: 1.1620 - learning_rate: 0.0010
Epoch 4/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m257s[0m 758ms/step - accuracy: 0.4886 - loss: 1.4804 - val_accuracy: 0.6390 - val_loss: 1.1552 - learning_rate: 0.0010
Epoch 5/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m91s[0m 266ms/step - accuracy: 0.5308 - loss: 1.2866 - val_accuracy: 0.6822 - val_loss: 1.0054 - learning_rate: 0.0010
Epoch 6/30
[1m339/339[0m [32m━━━━━━━━

In [15]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
import matplotlib.pyplot as plt
import os


In [18]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
import matplotlib.pyplot as plt
import os

def run_kfold_training(X_eeg, X_feat, X_eye, y_cat, n_splits=5):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    val_accuracies = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(np.arange(len(y_cat)))):
        print(f"\n Fold {fold + 1}/{n_splits}")

        train_gen = EEGTripleInputGenerator(X_eeg, X_feat, X_eye, y_cat, train_idx)
        val_gen = EEGTripleInputGenerator(X_eeg, X_feat, X_eye, y_cat, val_idx, shuffle=False)

        model = build_triple_branch_model()
        es = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)
        lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1)

        model.fit(train_gen, validation_data=val_gen, epochs=30, callbacks=[es, lr], verbose=1)

        val_acc = model.evaluate(val_gen, verbose=0)[1]
        val_accuracies.append(val_acc)
        print(f" Fold {fold + 1} Accuracy: {val_acc:.4f}")

        # ✅ Compute confusion matrix and save
        y_true, y_pred = [], []
        for batch in val_gen:
            inputs, labels = batch
            preds = model.predict(inputs, verbose=0)
            y_true.extend(np.argmax(labels, axis=1))
            y_pred.extend(np.argmax(preds, axis=1))

        cm = confusion_matrix(y_true, y_pred)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Neutral", "Sad", "Fear", "Happy"])
        disp.plot(cmap='Blues', values_format='d')
        os.makedirs("confusion_matrices", exist_ok=True)
        plt.title(f"Confusion Matrix - Fold {fold + 1}")
        plt.tight_layout()
        plt.savefig(f"confusion_matrices/conf_matrix_fold_{fold+1}.png", dpi=300)
        plt.close()

        # Optional: Print precision/recall/F1
        print("\nClassification Report:")
        print(classification_report(y_true, y_pred, target_names=["Neutral", "Sad", "Fear", "Happy"]))

    print(f"\n ✅ Average Validation Accuracy over {n_splits} folds: {np.mean(val_accuracies):.4f}")


In [1]:
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc
from itertools import cycle


In [26]:
import os
import numpy as np
import tensorflow as tf
from sklearn.model_selection import KFold
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import Sequence
import matplotlib.pyplot as plt
import seaborn as sns

os.makedirs("results", exist_ok=True)  # Create results dir

class EEGTripleInputGenerator(Sequence):
    def __init__(self, X1, X2, X3, y, indices, batch_size=32, shuffle=True):
        self.X1, self.X2, self.X3, self.y = X1, X2, X3, y
        self.indices = np.array(indices)
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.indices) / self.batch_size))

    def __getitem__(self, i):
        batch_idx = self.indices[i*self.batch_size:(i+1)*self.batch_size]
        return {
            "eeg_input": self.X1[batch_idx],
            "eeg_feat_input": self.X2[batch_idx],
            "eye_input": self.X3[batch_idx],
        }, self.y[batch_idx]

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

def plot_training_curves(history, fold):
    plt.figure(figsize=(10, 4))
    plt.plot(history.history['accuracy'], label='Train Acc')
    plt.plot(history.history['val_accuracy'], label='Val Acc')
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.title(f"Training and Validation Accuracy (Fold {fold})")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f"results/train_curves_fold{fold}.png")
    plt.close()

def plot_confusion(y_true, y_pred, fold, labels):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=labels, yticklabels=labels)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title(f"Confusion Matrix - Fold {fold}")
    plt.tight_layout()
    plt.savefig(f"results/conf_matrix_fold{fold}.png")
    plt.close()

def run_kfold_training(X_eeg, X_feat, X_eye, y_cat, n_splits=5):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    val_accuracies = []
    labels = ["Neutral", "Sad", "Fear", "Happy"]

    for fold, (train_idx, val_idx) in enumerate(kf.split(np.arange(len(y_cat)))):
        print(f"\n📂 Fold {fold + 1}/{n_splits}")
        train_gen = EEGTripleInputGenerator(X_eeg, X_feat, X_eye, y_cat, train_idx)
        val_gen = EEGTripleInputGenerator(X_eeg, X_feat, X_eye, y_cat, val_idx, shuffle=False)

        model = build_triple_branch_model()

        es = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)
        lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1)

        history = model.fit(train_gen,
                            validation_data=val_gen,
                            epochs=30,
                            callbacks=[es, lr],
                            verbose=1)

        # Save training curve
        plot_training_curves(history, fold + 1)

        # Evaluate
        val_acc = model.evaluate(val_gen, verbose=0)[1]
        val_accuracies.append(val_acc)
        print(f"✅ Fold {fold + 1} Accuracy: {val_acc:.4f}")

        # Save model
        model.save(f"results/model_fold{fold+1}.keras")

        # Predict labels
        y_true = np.argmax(y_cat[val_idx], axis=1)
        y_pred = []

        for batch_x, _ in val_gen:
            preds = model.predict(batch_x, verbose=0)
            y_pred.extend(np.argmax(preds, axis=1))

        y_pred = np.array(y_pred)

        # Save classification report
        report = classification_report(y_true, y_pred, target_names=labels, digits=4)
        with open(f"results/classification_report_fold{fold+1}.txt", "w") as f:
            f.write(report)

        print(report)

        # Save confusion matrix
        plot_confusion(y_true, y_pred, fold + 1, labels)

    print(f"\n🏁 Average Validation Accuracy over {n_splits} folds: {np.mean(val_accuracies):.4f}")


In [27]:
run_kfold_training(X_all, X_feat_all, X_eye_segment, y_cat, n_splits=5)



📂 Fold 1/5
Epoch 1/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m99s[0m 249ms/step - accuracy: 0.3319 - loss: 7.1817 - val_accuracy: 0.4882 - val_loss: 1.9791 - learning_rate: 0.0010
Epoch 2/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m138s[0m 238ms/step - accuracy: 0.4277 - loss: 2.2072 - val_accuracy: 0.5516 - val_loss: 1.2409 - learning_rate: 0.0010
Epoch 3/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m87s[0m 257ms/step - accuracy: 0.4743 - loss: 1.4234 - val_accuracy: 0.5623 - val_loss: 1.1492 - learning_rate: 0.0010
Epoch 4/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m84s[0m 249ms/step - accuracy: 0.5224 - loss: 1.2817 - val_accuracy: 0.6206 - val_loss: 1.0493 - learning_rate: 0.0010
Epoch 5/30
[1m339/339[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m83s[0m 245ms/step - accuracy: 0.5654 - loss: 1.1425 - val_accuracy: 0.6656 - val_loss: 0.9720 - learning_rate: 0.0010
Epoch 6/30
[1m339/339[0m [32m━━━━━━━━━━

UnboundLocalError: cannot access local variable 'batch_outputs' where it is not associated with a value