In [48]:
from sklearn.model_selection import KFold
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
import json
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras import layers, models, regularizers, optimizers, callbacks
from tensorflow.keras.metrics import BinaryAccuracy, AUC, Precision, Recall, Metric

In [49]:
MAIN_PATH = os.path.dirname(os.getcwd())
DATA_PATH = MAIN_PATH + "/data/numpy"
MODEL_PATH = MAIN_PATH + "/models"
LOG_PATH = MAIN_PATH + "/logs"

BATCH_SIZE = 32
SHUFFLE_BUFFER_SIZE = 1024
NUM_FOLDS = 5

BEST_VAL_SCORE = 0
BEST_MODEL = None
HISTORY = []  # Initialize history_list

In [50]:
def plot_history_metrics(history_dict: dict):
    total_plots = len(history_dict)
    cols = total_plots // 2
    rows = total_plots // cols
    if total_plots % cols != 0:
        rows += 1

    pos = range(1, total_plots + 1)
    plt.figure(figsize=(15, 10))
    for i, (key, value) in enumerate(history_dict.items()):
        plt.subplot(rows, cols, pos[i])
        plt.plot(range(len(value)), value)
        plt.title(str(key))
    plt.show()

In [51]:
def load_df():
    df = pd.read_csv(MAIN_PATH + "/data/result_df.csv")
    return df

In [52]:
def Clean_missing_values(numpy_data):
    numpy_data['x_train'], numpy_data['y_train'] = Remove_missing_values(numpy_data['x_train'], numpy_data['y_train'])
    numpy_data['x_val'], numpy_data['y_val'] = Remove_missing_values(numpy_data['x_val'], numpy_data['y_val'])
    numpy_data['x_test_1'], numpy_data['y_test_1'] = Remove_missing_values(numpy_data['x_test_1'], numpy_data['y_test_1'])
    numpy_data['x_test_2'], numpy_data['y_test_2'] = Remove_missing_values(numpy_data['x_test_2'], numpy_data['y_test_2'])
    
    return numpy_data

def Remove_missing_values(x_data, y_data):
    # Check if y_data contains missing values (NaNs) and remove corresponding x_data rows
    valid_indices = ~np.isnan(y_data)  # Find valid (non-NaN) indices in y_data
    x_clean = x_data[valid_indices]
    y_clean = y_data[valid_indices]
    return x_clean, y_clean

In [53]:
def gather_numpy_files(data_path):
    numpy_data = {}
    
    for file_name in os.listdir(data_path):
        if file_name.endswith('.npy'):
            file_path = os.path.join(data_path, file_name)
            numpy_data[file_name[:-4]] = np.load(file_path)  # Store in dict
    
    # Clean data by removing rows where y_* contains missing values
    numpy_data = Clean_missing_values(numpy_data)

    return numpy_data

In [54]:
def calculate_class_weights(df, label_column):
    vals_dict = {}
    for i in df[label_column]:
        if i in vals_dict.keys():
            vals_dict[i] += 1
        else:
            vals_dict[i] = 1
    total = sum(vals_dict.values())
    weight_dict = {k: (1 - (v / total)) for k, v in vals_dict.items()}

    print(f"Weight dict for model: {weight_dict}")
    return weight_dict

In [55]:
def create_model():
    input_layer = keras.Input(shape=(32, 1))
    
    x = layers.Conv1D(filters=32, kernel_size=3, activation="relu", padding="same", kernel_regularizer=regularizers.l2(0.001))(input_layer)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    
    x = layers.Conv1D(filters=64, kernel_size=3, activation="relu", padding="same", kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    
    x = layers.Conv1D(filters=128, kernel_size=3, activation="relu", padding="same", kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    
    x = layers.Flatten()(x)
    x = layers.Dense(512, activation="relu", kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.Dropout(0.5)(x)
    
    x = layers.Dense(256, activation="relu", kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.Dropout(0.5)(x)
    
    output_layer = layers.Dense(1, activation="sigmoid")(x)
    
    model = keras.Model(inputs=input_layer, outputs=output_layer)
    
    return model


In [56]:
class F1Score(Metric):
    def __init__(self, name='f1_score', **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.precision = Precision()
        self.recall = Recall()

    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)

    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()

    def result(self):
        precision = self.precision.result()
        recall = self.recall.result()
        return 2 * ((precision * recall) / (precision + recall + tf.keras.backend.epsilon()))

In [57]:
def Compile_model():
    model = create_model()
    optimizer = keras.optimizers.Adam(amsgrad=True, learning_rate=0.001)
    loss = keras.losses.BinaryCrossentropy()
    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=[
            keras.metrics.BinaryAccuracy(name='binary_accuracy'),
            keras.metrics.AUC(name='auc'),
            keras.metrics.Precision(name='precision'),
            keras.metrics.Recall(name='recall'),
            F1Score(name='f1_score')
        ],
    )
    return model

In [58]:
def SplitDatasetForFolds(train_index, validation_index, fold_nr, numpy_data):
    print(f"Training fold {fold_nr}...")

    # Split the data into train sets for this fold.
    x_train_fold = numpy_data['x_train'][train_index]
    y_train_fold = numpy_data['y_train'][train_index]

    print(f"x_val shape: {numpy_data['x_val'].shape}")
    print(f"y_val shape: {numpy_data['y_val'].shape}")
    
    # Ensure to use only the training set indices
    x_validation_fold = numpy_data['x_val'][:len(validation_index)]  # Taking the first `len(validation_index)` samples
    y_validation_fold = numpy_data['y_val'][:len(validation_index)]

    # Create tf.data.Datasets
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train_fold, y_train_fold))
    validation_dataset = tf.data.Dataset.from_tensor_slices((x_validation_fold, y_validation_fold))
    test_dataset_subject1 = tf.data.Dataset.from_tensor_slices((numpy_data['x_test_1'], numpy_data['y_test_1']))	
    test_dataset_subject2 = tf.data.Dataset.from_tensor_slices((numpy_data['x_test_2'], numpy_data['y_test_2']))
    
    # Shuffling and batching the datasets
    train_dataset = train_dataset.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
    validation_dataset = validation_dataset.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
    test_dataset_subject1 = test_dataset_subject1.batch(BATCH_SIZE)
    test_dataset_subject2 = test_dataset_subject2.batch(BATCH_SIZE)

    return train_dataset, validation_dataset, test_dataset_subject1, test_dataset_subject2


In [59]:
def convert_to_native(data):
    """Recursively convert numpy types to native python types."""
    if isinstance(data, dict):
        return {key: convert_to_native(value) for key, value in data.items()}
    elif isinstance(data, list):
        return [convert_to_native(item) for item in data]
    elif isinstance(data, np.ndarray):
        return data.tolist()  # Convert numpy array to list
    elif isinstance(data, (np.float32, np.float64)):
        return data.item()  # Convert single value numpy float to Python float
    else:
        return data  

In [60]:
def save_history_to_json(history, fold_number, best_model):
    """Save the training history and best model path to a JSON file."""
    try:
        history_data = {
            "history": convert_to_native(history.history),
            "best_model": best_model
        }
        
        history_file_path = os.path.join(LOG_PATH, f"history_fold_{fold_number}.json")
        with open(history_file_path, 'w') as json_file:
            json.dump(history_data, json_file)  # Write the history and best model to a JSON file
        print(f"History and best model saved to {history_file_path}")
    except Exception as e:
        print(f"Error saving history: {e}")

In [61]:
def Train_fold(train_index, val_index, fold_number, numpy_data, weight_dict):
    global BEST_VAL_SCORE, BEST_MODEL, HISTORY # Use global variables

    # Split data into training and validation sets for this fold.
    train_dataset, validation_dataset, test_sj1, test_sj2 = SplitDatasetForFolds(train_index, val_index, fold_number, numpy_data)

    # Create and compile the model
    model = Compile_model()

    # Set up callbacks
    callbacks = [
        keras.callbacks.ModelCheckpoint(os.path.join(MODEL_PATH, f"best_model_fold_{fold_number}.keras"), save_best_only=True, monitor="val_binary_accuracy"),
        # keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.0001)
    ]

    # Train the model
    history = model.fit(
        train_dataset,
        epochs=15,
        validation_data=(numpy_data['x_val'], numpy_data['y_val']),
        callbacks=callbacks,
        class_weight=weight_dict
    )

    # Append history
    HISTORY.append(history.history)

    # Save the history to a JSON file
    save_history_to_json(history, fold_number, BEST_MODEL)

    # Check if this model has the best validation accuracy so far
    if history.history['val_f1_score'][-1] > BEST_VAL_SCORE:
        BEST_VAL_SCORE = history.history['val_f1_score'][-1]
        BEST_MODEL = os.path.join(MODEL_PATH, f"best_model_fold_{fold_number}.keras")

    print(f"Training fold {fold_number} completed\n")

In [62]:
def Cross_validation_training(numpy_data, weight_dict):
    global fold_number
    fold_number = 1
    
    # Initialize KFold with the number of splits
    kfold = KFold(n_splits=NUM_FOLDS, shuffle=True, random_state=42)
    try:
        for train_index, val_index in kfold.split(numpy_data['x_train']):         
            print(f"Train indices: {train_index}")
            print(f"Validation indices: {val_index}")
            print(f"Max validation index: {max(val_index)}")
            
            # Train fold
            Train_fold(train_index, val_index, fold_number, numpy_data, weight_dict)
            fold_number += 1

        print("Cross-validation training completed.\n")
    except Exception as e:
        print(f"An error occurred during cross-validation training: {e}")


In [63]:
def main():
    df = load_df();
    numpy_data = gather_numpy_files(DATA_PATH)

    # Calculate weights
    weight_dict = calculate_class_weights(df, 'downsampled_label')

    # Create model
    convolutional_model = create_model()
    convolutional_model.summary()

    # Train model
    Cross_validation_training(numpy_data, weight_dict)


    return numpy_data;

In [None]:
numpy = main()