In [None]:
# NOTE: bath normalization and residual connection are especially important for deep networks

# TODO: do I only need to normalize the test data? 
#       Or only take the mean and std of the training data and apply it to all data?

# TODO: maybe data augmentation helps to improve the model performance

# TODO: one hot encode the labels, convert input data into a tensor

### Import libraries and load data

In [None]:
import nibabel as nib
import nilearn as nil
import scipy.ndimage as ndi
import matplotlib.pyplot as plt
import os
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, regularizers, callbacks
import keras_tuner as kt

In [None]:
# path to data
#  - on Windows: C:/Users/tahendry/Desktop/Masterthesis_Reto/
#  - on Linux:   ../data/

try:
    print(os.uname())
    data_path = "../data/"
except:
    print("Windows")
    data_path = "C:/Users/tahendry/Desktop/Masterthesis_Reto/"

In [None]:
# read in the excel-file with the labels
label_file = "Conn_IDs_Matching.xlsx"

# read excel with only the first three columns
label_df = (pd.read_excel(os.path.join(data_path, label_file),
                            usecols=[0, 1, 2])
            .replace({"Cond": {1: 0}})
            .replace({"Cond": {2: 1}})
            )

label_df.head()

In [None]:
# read MVPA data
path_content = os.listdir(os.path.join(data_path, "Denoised_Data_6mm", "MVPA_data"))

# make two lists with pre (Condition002) and post (Condition003) data of first component
comp1_pre = sorted([x for x in path_content 
                    if "Component001" in x 
                    and "Condition002" in x])
comp1_post = sorted([x for x in path_content 
                    if "Component001" in x 
                    and "Condition003" in x])

print(comp1_pre[:5])

### Prepare data for keras

In [None]:
# create a dataset with the difference of pre and post data
comp1_diff = []
for pre, post in zip(comp1_pre, comp1_post):
    pre_vol = nib.load(os.path.join(data_path, "Denoised_Data_6mm", "MVPA_data", pre))
    post_vol = nib.load(os.path.join(data_path, "Denoised_Data_6mm", "MVPA_data", post))
    pre_vol_data = pre_vol.get_fdata()
    post_vol_data = post_vol.get_fdata()
    diff_vol_data = post_vol_data - pre_vol_data
    comp1_diff.append(diff_vol_data)

# check the shape of the data
print(comp1_diff[0].shape)

# check the type of the data
print(type(comp1_diff[0]))

# takes about 4 mins to run on Dell

In [None]:
# stack the data to later use it as input for the CNN
# note: the first dimension is the number of samples
print(f"shape of one list element before stacking: {comp1_diff[0].shape=}")
inpt_comp1_diff = np.stack(comp1_diff, axis=0)

# expand the dimensions to fit the input shape of the CNN
# note: the last dimension is the number of channels
inpt_comp1_diff = np.expand_dims(inpt_comp1_diff, axis=-1)

# normalize the input data (zero mean, unit variance)
# note: a CNN works best with normalized data
inpt_comp1_diff = (inpt_comp1_diff - inpt_comp1_diff.mean()) / inpt_comp1_diff.std()

print(f"{inpt_comp1_diff.shape=}",
      f"{inpt_comp1_diff.mean()=}",
      f"{inpt_comp1_diff.std()=}", sep="\n")

In [None]:
# split the data into training and test data
# use 80% of the data for training and 20% for testing
x_train, x_test, y_train, y_test = train_test_split(inpt_comp1_diff, 
                                                    label_df["Cond"], 
                                                    test_size=0.2,
                                                    random_state=42)

# check the shape of the data
print(f"{x_train.shape=}",
        f"{x_test.shape=}",
        f"{y_train.shape=}",
        f"{y_test.shape=}", sep="\n")

# prepare for k-fold cross-validation
# note: StratifiedKFold ensures that the proportion of samples of each class is the same in each fold
nbr_of_folds = 12
kfold = StratifiedKFold(n_splits=nbr_of_folds, shuffle=True, random_state=42)

### Built the first model

In [None]:
# built a keras CNN model with functional API
def build_and_compile_model(summary=False):
    inputs = keras.Input(shape=(inpt_comp1_diff.shape[1:]))
    x = layers.Conv3D(filters=16, kernel_size=3, 
                        kernel_regularizer=regularizers.l2(0.002),
                        activation="relu") (inputs)
    x = layers.MaxPooling3D(pool_size=3, 
                            strides=3, 
                            padding='valid') (x)
    x = layers.Conv3D(filters=32, kernel_size=3, 
                        kernel_regularizer=regularizers.l2(0.002), 
                        activation="relu") (x)
    # x = layers.MaxPooling3D(pool_size=3, 
    #                         strides=3, 
    #                         padding='valid')(x)
    # x = layers.Conv3D(filters=32, kernel_size=3, 
    #                     kernel_regularizer=regularizers.l2(0.002), 
    #                     activation="relu")(x)
    x = layers.Flatten() (x)
    x = layers.Dense(128, activation="relu",
                        kernel_regularizer=regularizers.l2(0.002)) (x)
    x = layers.Dense(64, activation="relu",
                        kernel_regularizer=regularizers.l2(0.002)) (x)
    outputs = layers.Dense(1, activation="sigmoid") (x)

    # define the model
    model = keras.Model(inputs=inputs, outputs=outputs)

    if summary == True:
        model.summary()

    # compile the model
    my_optimizer = keras.optimizers.RMSprop(learning_rate=0.0001)
    model.compile(optimizer=my_optimizer,
                    loss='binary_crossentropy',
                    metrics=["Accuracy"])

    return model

### cross validation

In [None]:
# callback to stop training when loss does not improve anymore and save the best model
callback_list = [callbacks.EarlyStopping(monitor="val_loss", patience=5),
                 callbacks.ModelCheckpoint(filepath="./MVPA_CNN_model_cv.keras", 
                                            monitor="val_loss",
                                            save_best_only=True)]

# create empty lists to store the results of the k-fold cross-validation
val_loss = []
val_acc = []
history_list = []

# k-fold cross-validation
for fold, (train_idx, test_idx) in enumerate(kfold.split(x_train, y_train)):
    if fold == 0:
        print(f"cross validation started...", 
                f"size of the training data: {len(train_idx)}",
                f"size of the test data: {len(test_idx)}", 
                sep="\n")

    # create the folds
    print(f"Fold {fold+1} of {nbr_of_folds}")
    x_train_fold = x_train[train_idx]
    x_test_fold = x_train[test_idx]
    y_train_fold = y_train.iloc[train_idx]
    y_test_fold = y_train.iloc[test_idx]

    # print shape of the folds
    # print(f"{x_train_fold.shape=}",
    #     f"{x_test_fold.shape=}",
    #     f"{y_train_fold.shape=}",
    #     f"{y_test_fold.shape=}", sep="\n")

    # build and compile the model
    model = build_and_compile_model()

    # train the model
    history = model.fit(x_train_fold, y_train_fold,
                        epochs=20,
                        batch_size=4,
                        validation_data=(x_test_fold, y_test_fold),
                        callbacks=callback_list,
                        verbose=1)
    history_list.append(history)

    # evaluate the model
    fold_loss = model.evaluate(x_test_fold, y_test_fold, verbose=0)[0]
    fold_acc = model.evaluate(x_test_fold, y_test_fold, verbose=0)[1]
    val_loss.append(fold_loss)
    val_acc.append(fold_acc)
    print(f"Loss: {fold_loss}", f"Accuracy: {fold_acc}", sep="\n")

# print the mean of the validation loss and accuracy
print(f"Mean validation loss: {np.mean(val_loss)}",
        f"Mean validation accuracy: {np.mean(val_acc)}", sep="\n")
        
# save the model
# model.save("X:/MasterThesis_Reto/3d_cnn_model")

In [None]:
# create subplot to show all folds from cross-validation
fig, axs = plt.subplots(3, 4, figsize=(12, 6))
axs = axs.ravel()
for i, history in enumerate(history_list):
    axs[i].plot(history.history["loss"][1:])
    axs[i].plot(history.history["val_loss"][1:])
    axs[i].plot(history.history["Accuracy"][1:])
    axs[i].plot(history.history["val_Accuracy"][1:])
    axs[i].set_title(f"Fold {i+1}")
    axs[i].set_ylabel("loss")
    axs[i].set_xlabel("epoch")
    axs[i].grid()

fig.tight_layout()
fig.legend(["loss", "val_loss", "accuracy", "val_Accuracy"])


### train model with the entire test data (no val. data)

In [None]:
# train with entire training data (no validation data)
# callback to stop training when loss does not improve anymore and save the best model
callback_list = [callbacks.EarlyStopping(monitor="loss", patience=5),
                 callbacks.ModelCheckpoint(filepath="./MVPA_CNN_model.keras", 
                                            monitor="loss",
                                            save_best_only=True)]

# build and compile the model
model = build_and_compile_model()

# train the model
history = model.fit(x_train, y_train,
                    epochs=20,
                    batch_size=4,
                    callbacks=callback_list,
                    verbose=1)

In [None]:
# load the best model
model = keras.models.load_model("./MVPA_CNN_model.keras")

# evaluate the model
model.evaluate(x_test, y_test, verbose=0)

# plot the training history
fig = plt.figure(figsize=(8, 4))
plt.plot(history.history["loss"])
# plt.plot(history.history["val_loss"])
plt.plot(history.history["Accuracy"])
# plt.plot(history.history["val_Accuracy"])
plt.title("training history")
plt.ylabel("loss")
plt.xlabel("epoch")
plt.legend(["loss", "val_loss", "accuracy", "val_Accuracy"], loc="upper left")
plt.grid()
plt.show()

### hyperparameter tuning

In [None]:
# create a function to build and compile the model for hyperparameter tuning

def build_and_compile_model_test(hp):
    """
    Builds model and sets up hyperparameter space to search. Model is compiled.
    
    Parameters
    ----------
    hp : HyperParameter object
        Configures hyperparameters to tune.
        
    Returns
    -------
    model : model
        Compiled keras model with hyperparameters to tune.
    """
    inputs = keras.Input(shape=(inpt_comp1_diff.shape[1:]))
    
    # Tune the number of hidden CNN layers (pair of Conv3D and MaxPool) and filters + kernel_size in each hidden layer.
    # Number of hidden layers: 1 - 5
    # Number of filters: 16 - 512 with stepsize of 32
    # Kernel size: 3 - 5 with stepsize of 2
        
    x = keras.layers.Conv3D(
        filters=hp.Int("filters", min_value=16, max_value=512, step=32),
        kernel_size=3,
        activation="relu", 
        padding="same")(inputs)

    x = keras.layers.MaxPooling3D(pool_size=(2, 2, 2))(x)
    
    # Flatten the output of the last hidden layer
    x = keras.layers.Flatten()(x)

    # Tune the number and units in the dense layer including dropout
    # Number of dense layers: 1 - 3
    # Number of units: 16 - 512 with stepsize of 32
    x = keras.layers.Dense(units=64,
                            activation="relu")(x)

    # Add output layer
    outputs = keras.layers.Dense(1, activation="sigmoid")(x)
    # TODO: maybe try with a softmax activation function

    # define the model
    model = keras.Model(inputs=inputs, outputs=outputs)
    
    # Define optimizer, loss, and metrics
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001),
                  loss='binary_crossentropy',
                  metrics=["Accuracy"])
    
    return model

In [None]:
# create a function to build and compile the model for hyperparameter tuning

def build_and_compile_model(hp):
    """
    Builds model and sets up hyperparameter space to search. Model is compiled.
    
    Parameters
    ----------
    hp : HyperParameter object
        Configures hyperparameters to tune.
        
    Returns
    -------
    model : model
        Compiled keras model with hyperparameters to tune.
    """
    inputs = keras.Input(shape=(inpt_comp1_diff.shape[1:]))
    
    # Tune the number of hidden CNN layers (pair of Conv3D and MaxPool) and filters + kernel_size in each hidden layer.
    # Number of hidden layers: 1 - 5
    # Number of filters: 16 - 512 with stepsize of 32
    # Kernel size: 3 - 5 with stepsize of 2
    for i in range(1, hp.Int("num_layers", 2, 6)):
        
        x = keras.layers.Conv3D(
            filters=hp.Int("filters_" + str(i), min_value=16, max_value=512, step=32),
            kernel_size=hp.Int("kernel_size_" + str(i), min_value=3, max_value=5, step=2),
            # kernel_regularizer=hp.Float("l2_regularization_" + str(i), min_value=0.0001, max_value=0.01, step=0.0001),
            activation="relu", 
            padding="same")(inputs)

        x = keras.layers.MaxPooling3D(pool_size=(2, 2, 2))(x)

        # tune dropout layer with values from 0 - 0.3 with stepsize of 0.1
        x = keras.layers.Dropout(hp.Float("dropout_" + str(i), 
                                    min_value=0, max_value=0.3, step=0.1))(x)
    
    # Flatten the output of the last hidden layer
    x = keras.layers.Flatten()(x)

    # Tune the number and units in the dense layer including dropout
    # Number of dense layers: 1 - 3
    # Number of units: 16 - 512 with stepsize of 32
    for i in range(1, hp.Int("num_dense_layers", 2, 4)):
        x = keras.layers.Dense(units=hp.Int("units_" + str(i), min_value=16, max_value=512, step=32),
                               activation="relu")(x)
        
        # tune dropout layer with values from 0 - 0.3 with stepsize of 0.1
        x = keras.layers.Dropout(hp.Float("dense_dropout_" + str(i), 
                                    min_value=0, max_value=0.3, step=0.1))(x)

    # Add output layer
    outputs = keras.layers.Dense(1, activation="sigmoid")(x)
    # TODO: maybe try with a softmax activation function

    # define the model
    model = keras.Model(inputs=inputs, outputs=outputs)

    # Tune learning rate for Adam optimizer with values from 0.01, 0.001, or 0.00001
    hp_learning_rate = hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-5])
    
    # Define optimizer, loss, and metrics
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
                  loss='binary_crossentropy',
                  metrics=["Accuracy"])
    
    return model

In [None]:
# Instantiate the tuner
tuner = kt.BayesianOptimization(build_and_compile_model_test,
                                objective="val_Accuracy",
                                max_trials=10,
                                executions_per_trial=2,
                                directory="kt_dir",
                                project_name="kt_BayesianOptimization",
                                overwrite=True)

# Display search space summary
tuner.search_space_summary()

In [None]:
# start the search for the best hyperparameters

stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_Accuracy', patience=5)

tuner.search(x_train, y_train, 
                epochs=20, 
                batch_size=4,
                validation_data=(x_test, y_test),
                callbacks=[stop_early], 
                verbose=2)

In [None]:
# Get the optimal hyperparameters from the results
best_hps = tuner.get_best_hyperparameters()[0]

# Build model
h_model = tuner.hypermodel.build(best_hps)

# Train the hypertuned model
h_model.fit(x_train, y_train, epochs=20, validation_split=0.2, callbacks=[stop_early], verbose=2)

# Evaluate model on test set
h_model.evaluate(x_test, y_test)


In [None]:
best_hps = tuner.get_best_hyperparameters()
best_hps

In [None]:
# train with entire training data (no validation data)
# callback to stop training when loss does not improve anymore and save the best model
callback_list = [callbacks.EarlyStopping(monitor="loss", patience=5),
                 callbacks.ModelCheckpoint(filepath="./MVPA_CNN_model.keras", 
                                            monitor="loss",
                                            save_best_only=True)]

# build and compile the model
model = build_and_compile_model()

# train the model
history = model.fit(x_train, y_train,
                    epochs=20,
                    batch_size=4,
                    callbacks=callback_list,
                    verbose=1)