In [6]:
import keras
import keras_tuner as kt
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import glob
from sklearn.preprocessing import StandardScaler, LabelEncoder
import tensorflow as tf
from sklearn.metrics import confusion_matrix,classification_report
from pathlib import Path
from scipy.io import loadmat
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

In [5]:
SAMPLING_RATE = 128
N_SAMPLES_PER_EPOCH = 5 * SAMPLING_RATE

def get_data_files():
    files =  list(Path('./data').glob('*.mat'))
    return files

def bis_to_groups(bis):
    if bis < 40 and bis >= 0:
        return '0-40'
    elif bis >= 40 and bis < 65:
        return '40-65'
    elif bis >= 65 and bis < 85:
        return '65-85'
    elif bis >= 85 and bis <= 100:
        return '85-100'

def mat_to_df(file):
    data = loadmat(file)
    eeg_data = data['EEG'].flatten()
    bis_data = data['bis'].flatten()
    bis_mapped = list(map(bis_to_groups, bis_data[bis_data != -1][:len(eeg_data)//N_SAMPLES_PER_EPOCH]))
    eeg_epochs = eeg_data[:len(bis_mapped)*N_SAMPLES_PER_EPOCH].reshape(len(bis_mapped), N_SAMPLES_PER_EPOCH)
    df = pd.DataFrame(eeg_epochs)
    df['bis'] = bis_mapped
    return df

def get_all_data():
    return pd.concat([mat_to_df(file) for file in get_data_files()])

def standardize_data(x_train,x_valid, x_test):
    scaler = StandardScaler()
    scaler.fit(np.concatenate([x_train, x_valid]))
    x_train, x_valid, x_test = [scaler.transform(x).reshape(-1, N_SAMPLES_PER_EPOCH, 1) for x in [x_train, x_valid, x_test]]
    return x_train, x_valid, x_test

def get_train_valid_test_data():
    df = get_all_data()
    train, validate, test =  np.split(df.sample(frac=1, random_state=42), [int(0.7*len(df)), int(0.85*len(df))])
   
    x_train, x_valid, x_test = standardize_data(*[df.drop('bis', axis=1).to_numpy() for df in [train, validate, test]])
    y_train, y_valid, y_test = [df['bis'].to_numpy() for df in [train, validate, test]]

    print(train.shape, validate.shape, test.shape)
    print(x_train.shape, x_valid.shape, x_test.shape)
    print(y_train.shape, y_valid.shape, y_test.shape)
    
    return x_train, x_valid, x_test, y_train, y_valid, y_test

def one_hot_encode(y_train, y_valid, y_test):
    encoder = LabelEncoder()
    y_test = encoder.fit_transform(y_test)
    y_train = encoder.transform(y_train)
    y_valid = encoder.transform(y_valid)
    y_train, y_valid, y_test = [keras.utils.to_categorical(y) for y in [y_train, y_valid, y_test]]
    return y_train, y_valid, y_test, encoder

def save_test_data(x_test, y_test, model_name):
    Path('./test_data').mkdir(exist_ok=True)
    np.savez(f'./test_data/{model_name}.npz', x_test=x_test, y_test=y_test)

def load_test_data(model_name):
    loaded_data = np.load(f'./test_data/{model_name}.npz', allow_pickle=True)
    x_test_loaded = loaded_data['x_test']
    y_test_loaded = loaded_data['y_test']
    return x_test_loaded, y_test_loaded

In [5]:
def make_model(input_shape, num_classes=4):
    input_layer = keras.layers.Input(input_shape)

    conv1 = keras.layers.Conv1D(
        filters=64, kernel_size=3, padding="same")(input_layer)
    conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.ReLU()(conv1)

    conv2 = keras.layers.Conv1D(
        filters=64, kernel_size=3, padding="same")(conv1)
    conv2 = keras.layers.BatchNormalization()(conv2)
    conv2 = keras.layers.ReLU()(conv2)

    conv3 = keras.layers.Conv1D(
        filters=64, kernel_size=3, padding="same")(conv2)
    conv3 = keras.layers.BatchNormalization()(conv3)
    conv3 = keras.layers.ReLU()(conv3)

    gap = keras.layers.GlobalAveragePooling1D()(conv3)

    output_layer = keras.layers.Dense(num_classes, activation="softmax")(gap)

    return keras.models.Model(inputs=input_layer, outputs=output_layer)

def compile_model(model):
    model.compile(
        optimizer="adam",
        loss="categorical_crossentropy",
        metrics=[keras.metrics.CategoricalAccuracy(name='accuracy'),
                 keras.metrics.AUC(name='auc'),
                 keras.metrics.Precision(name='precision'),
                 keras.metrics.Recall(name='recall')],
    )
    return model


def get_class_weights(y_train):
    y_train_labels = np.argmax(y_train, axis=1)
    class_weights = compute_class_weight(
        class_weight="balanced",
        classes=np.unique(y_train_labels),
        y=y_train_labels
    )
    class_weights_dict = dict(enumerate(class_weights))
    print("Class Weights:", class_weights_dict)
    return class_weights_dict


def fit_model(model, model_name,  x_train, y_train, x_valid, y_valid, epochs=10, balance_classes=False):
    class_weights_dict = get_class_weights(y_train) if balance_classes else None
    callbacks = [
        keras.callbacks.ModelCheckpoint(
            f"./models/{model_name}.keras", save_best_only=True, monitor="val_loss"
        ),
        keras.callbacks.ReduceLROnPlateau(
            monitor="val_loss", factor=0.5, patience=10, min_lr=0.0001
        ),
        keras.callbacks.CSVLogger(
            f"./training_csv/{model_name}_training.csv", separator=",", append=False)
    ]

    model.fit(
        x_train,
        y_train,
        validation_data=(x_valid, y_valid),
        epochs=epochs,
        callbacks=callbacks,
        class_weight=class_weights_dict
    )

def evaluate_model(model, model_name, x_test, y_test):
    results = model.evaluate(x_test, y_test, verbose=2, return_dict=True)
    df = pd.DataFrame([results])
    df.to_csv(f"./evals/{model_name}_eval.csv")

def plot_confusion_matrix(model, model_name, X_test, y_test, le):
    y_predicted = le.inverse_transform(np.argmax(model.predict(X_test), axis=1))
    y_true = le.inverse_transform(np.argmax(y_test, axis=1))
    report = classification_report(y_true, y_predicted, output_dict=True)
    report_normal = classification_report(y_true, y_predicted)
    print(report_normal)
    df = pd.DataFrame(report).transpose()
    df.to_csv(f"./CMs/{model_name}_CM.csv")

    cm = confusion_matrix(y_true, y_predicted, labels=le.classes_)
    plt.figure(figsize=(10, 10))
    sns.heatmap(cm, annot=True, fmt='d',cmap = 'Blues',  xticklabels=le.classes_, yticklabels=le.classes_)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.savefig(f'./CMs/{model_name}_CM.png')
    plt.show()

def all_vs_all_classification(epochs=100, balance_classes=False):
    num_classes = 4
    balance = "balanced" if balance_classes else "unbalanced"
    model_name = f"all_VS_all_z_norm_{balance}_{epochs}e"
    x_train, x_valid, x_test, y_train_og, y_valid_og, y_test_og = get_train_valid_test_data()
    save_test_data(x_test, y_test_og, model_name)
    y_train, y_valid, y_test, encoder = one_hot_encode(y_train_og, y_valid_og, y_test_og)
    
    model = compile_model(make_model(x_train.shape[1:], num_classes))
    fit_model(model, model_name, x_train, y_train, x_valid, y_valid, epochs=epochs, balance_classes=balance_classes)

    model = keras.models.load_model(f"./models/{model_name}.keras")
    evaluate_model(model, model_name, x_test, y_test)
    plot_confusion_matrix(model, model_name, x_test, y_test, encoder)


In [None]:
all_vs_all_classification(epochs=100, balance_classes=False)
all_vs_all_classification(epochs=100, balance_classes=True)

In [22]:
def build_model(hp):
    input_shape = (640, 1) 
    num_classes = 4     

    input_layer = keras.layers.Input(input_shape)
    x = input_layer

    for i in range(hp.Int("num_conv_layers", min_value=1, max_value=9)): 
        x = keras.layers.Conv1D(
            filters=hp.Int(f"filters_{i}", min_value=16, max_value=128, step=16),
            kernel_size=hp.Int(f"kernel_size_{i}", min_value=1, max_value=7),
            padding="same"
        )(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.ReLU()(x)

    x = keras.layers.GlobalAveragePooling1D()(x)

    output_layer = keras.layers.Dense(
        num_classes, activation="softmax"
    )(x)

    model = keras.models.Model(inputs=input_layer, outputs=output_layer)

    model.compile(
        optimizer=keras.optimizers.Adam(
            hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4])
        ),
        loss="categorical_crossentropy",
        metrics=["categorical_crossentropy"]
    )
    return model


def run_hypertuning():
    x_train, x_valid, x_test, y_train_og, y_valid_og, y_test_og = get_train_valid_test_data()
    y_train, y_valid, y_test, encoder = one_hot_encode(y_train_og, y_valid_og, y_test_og)
    x_val = np.concatenate([x_valid, x_test])
    y_val = np.concatenate([y_valid, y_test])

    class_weights = get_class_weights(y_train)
    tuner = kt.Hyperband(
        build_model,
        objective="val_categorical_crossentropy",
        max_epochs=100,      
        factor=3,           
        directory=f"hyperparameter_tuning_z_norm_unbalanced",   
        project_name="cnn_tuning_with_val_loss"
        )

    tuner.search(
        x_train, y_train,
        validation_data=(x_val, y_val),
        epochs=100,
        batch_size=32,
        class_weight=class_weights
        )

    best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
    best_model = tuner.hypermodel.build(best_hps)
    print(best_model.summary())

In [None]:
run_hypertuning()

In [19]:
def get_best_model():
    tuner = kt.Hyperband(
        build_model,
        objective="val_categorical_crossentropy",
        directory=f"hyperparameter_tuning_z_norm_unbalanced",   
        project_name="cnn_tuning_with_val_loss"
        )
    model = tuner.hypermodel.build(tuner.get_best_hyperparameters(num_trials=2)[-1])
    return model

def train_best_model(balanced=True, epochs=100):
    balance = "balanced" if balanced else "unbalanced"
    best_model_name = f"best_model_z_norm_{balance}_{epochs}e"
    x_train, x_valid, x_test, y_train_og, y_valid_og, y_test_og = get_train_valid_test_data()
    y_train, y_valid, y_test, encoder = one_hot_encode(y_train_og, y_valid_og, y_test_og)
    save_test_data(x_test, y_test_og, best_model_name)

    best_model = get_best_model()
    best_model.compile(
        optimizer="adam",
        loss="categorical_crossentropy",
        metrics=[keras.metrics.CategoricalAccuracy(name='accuracy'),
                 keras.metrics.AUC(name='auc'),
                 keras.metrics.Precision(name='precision'),
                 keras.metrics.Recall(name='recall')],
    )
    fit_model(best_model, best_model_name, x_train, y_train, x_valid, y_valid, epochs, balance_classes=balanced)
    evaluate_model(best_model, best_model_name, x_test, y_test)
    plot_confusion_matrix(best_model, best_model_name, x_test, y_test, encoder)

In [None]:
train_best_model(balanced=False, epochs=100)
train_best_model(balanced=True, epochs=100)