In [None]:
import pandas as pd
import numpy as np
import keras
import matplotlib.pyplot as plt
from keras.models import  Model
from keras.layers import LSTM, Dense, Dropout, BatchNormalization, Input, Bidirectional, GRU
from keras.callbacks import LearningRateScheduler, ModelCheckpoint
from sklearn.model_selection import train_test_split
from keras.optimizers import Adam
from sklearn.metrics import classification_report, accuracy_score
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import butter, lfilter
from keras.utils import to_categorical

def butter_highpass(cutoff, sampFreq, order=5):
    nyquist = 0.5 * sampFreq
    normal_cutoff = cutoff / nyquist
    if normal_cutoff <= 0 or normal_cutoff >= 1:
        raise ValueError("Cutoff frequency must be between 0 and Nyquist frequency")
    b, a = butter(order, normal_cutoff, btype='high', analog=False)
    return b, a

def butter_lowpass(cutoff, sampFreq, order=5):
    nyquist = 0.5 * sampFreq
    normal_cutoff = cutoff / nyquist
    if normal_cutoff <= 0 or normal_cutoff >= 1:
        raise ValueError("Cutoff frequency must be between 0 and Nyquist frequency")
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a

def applyLowpassFilter(data, cutoff, sampFreq, filter_type='low', order=5):
    if filter_type == 'low':
        b, a = butter_lowpass(cutoff, sampFreq, order=order)
    else:
        raise ValueError("Invalid filter type. Use 'low' or 'high'.")
    return lfilter(b, a, data)

def applyHighpassFilter(data, cutoff, sampFreq, filter_type='high', order=5):
    if filter_type == 'high':
        b, a = butter_highpass(cutoff, sampFreq, order=order)
    else:
        raise ValueError("Invalid filter type. Use 'low' or 'high'.")
    return lfilter(b, a, data)

# Function to prepare data for LSTM input
def prepare_data(chunk, timeStep):
    scaler = MinMaxScaler(feature_range=(0,1))
    # Features and targets
    x = chunk.iloc[:, 1:-2].values
    arousal = chunk['Arousal Score'].values
    anxiety = chunk['Anxiety Score'].values
    
    if np.isnan(arousal).any() or np.isnan(anxiety).any():
        arousal = np.nan_to_num(arousal, nan=np.nanmean(arousal))
        anxiety = np.nan_to_num(anxiety, nan=np.nanmean(anxiety))
        
    arousal = np.digitize(arousal, bins=[3, 6], right=True) 
    anxiety = np.digitize(anxiety, bins=[3, 6], right=True)
    
    #Low-High Pass filter
    xFiltered = applyLowpassFilter(x, 63, 128,filter_type="low")
    xFiltered = applyHighpassFilter(xFiltered, 0.5, 128,filter_type="high")
    
    xNormalised = scaler.fit_transform(xFiltered)
    
    xReshaped = np.array([xNormalised[i:i + timeStep] for i in range(len(xNormalised) - timeStep)], dtype=np.float32)
    
    arousalReshaped = to_categorical(arousal[timeStep:].astype(int), num_classes=3)
    anxietyReshaped = to_categorical(anxiety[timeStep:].astype(int), num_classes=3)
    
     
    return xReshaped, arousalReshaped, anxietyReshaped

# Learning rate scheduler
def lr_scheduler(epoch, lr):
    return lr * 0.95 if epoch > 1 else lr

# Function to train the model in chunks
def chunkedTraining(model, outputFilePathS1, chunk_size, timeStep):
    scheduler = LearningRateScheduler(lr_scheduler)
    checkpoint = ModelCheckpoint('best_triclass_model.keras', save_best_only=True, monitor='val_loss', mode='min')

    # Placeholder for validation and test data
    xVal, arousalVal, anxietyVal = None, None, None
    xTest, arousalTest, anxietyTest = None, None, None
    
    train_loss_history = []
    val_loss_history = []
    train_arousal_accuracy_history = []
    val_arousal_accuracy_history = []
    train_anxiety_accuracy_history = []
    val_anxiety_accuracy_history = []
    
    for i, chunk in enumerate(pd.read_csv(outputFilePathS1, chunksize=chunk_size)):
        if i == 0:
            # Split the first chunk into test, validation, and training sets
            chunkTrain, chunkTemp = train_test_split(chunk, test_size=0.2)
            chunkVal, chunkTest = train_test_split(chunkTemp, test_size=0.5)
            
            # Prepare validation data
            xVal, arousalVal, anxietyVal = prepare_data(chunkVal, timeStep)
            
            # Prepare test data and save for later
            xTest, arousalTest, anxietyTest = prepare_data(chunkTest, timeStep)
            
            chunk = chunkTrain
            
        
        # Prepare training data
        xTrain, arousalTrain, anxietyTrain = prepare_data(chunk, timeStep)
        
        arousalTrain = np.reshape(arousalTrain, (-1, 1))
        anxietyTrain = np.reshape(anxietyTrain, (-1, 1))
        arousalVal = np.reshape(arousalVal, (-1, 1))
        anxietyVal = np.reshape(anxietyVal, (-1, 1))
        # Skip empty chunks
        if len(xTrain) == 0:
            print(f"Skipping empty chunk at index {i}.")
            continue
        
        # Train the model
        history = model.fit(xTrain, 
                  {'arousal': arousalTrain, 'anxiety': anxietyTrain}, 
                  epochs=5, batch_size=64, verbose=1, 
                  validation_data=(xVal, 
                                   {'arousal': arousalVal, 
                                    'anxiety': anxietyVal}), 
                  callbacks=[scheduler, checkpoint])
    
        train_loss_history.extend(history.history['loss'])
        val_loss_history.extend(history.history['val_loss'])
        if 'arousal_accuracy' in history.history:
            train_arousal_accuracy_history.extend(history.history['arousal_accuracy'])
            val_arousal_accuracy_history.extend(history.history['val_arousal_accuracy'])

        if 'anxiety_accuracy' in history.history:
            train_anxiety_accuracy_history.extend(history.history['anxiety_accuracy'])
            val_anxiety_accuracy_history.extend(history.history['val_anxiety_accuracy'])
        
    plt.figure(figsize= (10,6))    
    plt.plot(train_loss_history, label = 'Training Loss')
    plt.plot(val_loss_history, label = 'Validation Loss')
    plt.title('Model Loss During Training')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')
    plt.show()
    
    # Plotting Arousal Accuracy
    if train_arousal_accuracy_history and val_arousal_accuracy_history:
        plt.figure(figsize=(10, 6))
        plt.plot(train_arousal_accuracy_history, label='Training Arousal Accuracy')
        plt.plot(val_arousal_accuracy_history, label='Validation Arousal Accuracy')
        plt.title('Arousal Accuracy Over Training')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend(loc='lower right')
        plt.show()

# Plotting Anxiety Accuracy
    if train_anxiety_accuracy_history and val_anxiety_accuracy_history:
        plt.figure(figsize=(10, 6))
        plt.plot(train_anxiety_accuracy_history, label='Training Anxiety Accuracy')
        plt.plot(val_anxiety_accuracy_history, label='Validation Anxiety Accuracy')
        plt.title('Anxiety Accuracy Over Training')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend(loc='lower right')
        plt.show()
    # Return the model and the saved test data
    return model, xTest, arousalTest, anxietyTest

def evaluate_model(model, xTest, arousalTest, anxietyTest):
    arousalTest = arousalTest.reshape(-1, 1)  # Reshape to (9872, 1)
    anxietyTest = anxietyTest.reshape(-1, 1)
    # Evaluate the model on the test data
    results = model.evaluate(xTest, {'arousal': arousalTest, 'anxiety': anxietyTest}, verbose=1)
    
    # Print loss and accuracy
    print(f"Test Loss (Overall): {results[0]}")
    print(f"Test Loss (Arousal): {results[1]}")
    print(f"Test Loss (Anxiety): {results[2]}")
    print(f"Test Accuracy (Arousal): {results[3]}")
    print(f"Test Accuracy (Anxiety): {results[4]}")
    
    # Make predictions on the test set
    predictions = model.predict(xTest)
    arousalPreds = predictions[0].argmax(axis=1) 
    anxietyPreds = predictions[1].argmax(axis=1)  

    arousalTrue = arousalTest.argmax(axis=1)
    anxietyTrue = anxietyTest.argmax(axis=1)
    
    # Classification reports for arousal and anxiety
    print("\nClassification Report for Arousal:")
    print(classification_report(arousalTrue, arousalPreds, target_names=['Low', 'Mid', 'High']))
    
    print("Classification Report for Anxiety:")
    print(classification_report(anxietyTrue, anxietyPreds, target_names=['Low', 'Mid', 'High']))

    # Additional Metrics
    print(f"Arousal Accuracy: {accuracy_score(arousalTrue, arousalPreds)}")
    print(f"Anxiety Accuracy: {accuracy_score(anxietyTrue, anxietyPreds)}")

In [None]:
def createLstmModel(inputShape):
    modelInput = Input(shape=inputShape)
    
    x = LSTM(128, return_sequences=True, kernel_regularizer='l2')(modelInput)
    x = Dropout(0.3)(x)
    x = LSTM(128, return_sequences=True, kernel_regularizer='l2')(x)
    x = Dropout(0.3)(x)
    x = LSTM(64)(x)
    x = BatchNormalization()(x)
        
    # Output layers
    output_arousal = Dense(3, activation='softmax', name='arousal')(x)
    output_anxiety = Dense(3, activation='softmax', name='anxiety')(x)
    
    # Create and compile model
    model = Model(inputs=modelInput, outputs=[output_arousal, output_anxiety])
    model.compile(optimizer=Adam(learning_rate=0.001), 
                  loss={'arousal': 'categorical_crossentropy', 
                        'anxiety': 'categorical_crossentropy'}, 
                  metrics={'arousal': ['accuracy'], 'anxiety': ['accuracy']})
    return model

In [None]:
timeStep = 128
input_dim = 32
chunk_size = 100000
outputFilePathS1 = f'C:\\Users\\izaak\Desktop\\VRETDataAnalysis\\Test 1\\totalRawWithLikertS1.csv'
outputFilePathS2 = f'C:\\Users\\izaak\Desktop\\VRETDataAnalysis\\Test 1\\totalRawWithLikertS2.csv'
outputFilePathS3 = f'C:\\Users\\izaak\Desktop\\VRETDataAnalysis\\Test 1\\totalRawWithLikertS3.csv'

model = createLstmModel((timeStep, input_dim))
model.summary()

trainedLstmModel, xTest, arousalTest, anxietyTest = chunkedTraining(model, outputFilePathS1, chunk_size, timeStep)

In [None]:
trainedLstmModel.summary()
evaluate_model(trainedLstmModel, xTest, arousalTest, anxietyTest)

In [None]:
del xTest, arousalTest, anxietyTest 
import gc
gc.collect()

In [None]:
timeStep = 128
input_dim = 32
chunk_size = 100000

trainedLstmModel, xTest, arousalTest, anxietyTest = chunkedTraining(trainedLstmModel, outputFilePathS2, chunk_size, timeStep)

In [None]:
trainedLstmModel.summary()

evaluate_model(trainedLstmModel, xTest, arousalTest, anxietyTest)

In [None]:
del xTest, arousalTest, anxietyTest 
gc.collect()

In [None]:
timeStep = 128
input_dim = 32
chunk_size = 100000

trainedModelFull, xTest, arousalTest, anxietyTest = chunkedTraining(trainedLstmModel, outputFilePathS3, chunk_size, timeStep)

In [None]:
trainedModelFull.summary()

evaluate_model(trainedModelFull, xTest, arousalTest, anxietyTest)

In [None]:
del trainedLstmModel

In [None]:
def createGruModel(inputShape):
    modelInput = Input(shape=inputShape)
    
    x = GRU(128, return_sequences=True, kernel_regularizer='l2')(modelInput)
    x = Dropout(0.3)(x)
    x = GRU(128, return_sequences=True, kernel_regularizer='l2')(x)
    x = Dropout(0.3)(x)
    x = GRU(64)(x)
    x = BatchNormalization()(x)
    
    # Output layers
    output_arousal = Dense(3, activation='softmax', name='arousal')(x)
    output_anxiety = Dense(3, activation='softmax', name='anxiety')(x)
    
    # Create and compile model
    model = Model(inputs=modelInput, outputs=[output_arousal, output_anxiety])
    model.compile(optimizer=Adam(learning_rate=0.001), 
                  loss={'arousal': 'categorical_crossentropy', 
                        'anxiety': 'categorical_crossentropy'}, 
                  metrics={'arousal': ['accuracy'], 'anxiety': ['accuracy']})
    return model

In [None]:
timeStep = 128
input_dim = 32
chunk_size = 100000
outputFilePathS1 = f'C:\\Users\\izaak\Desktop\\VRETDataAnalysis\\Test 1\\totalRawWithLikertS1.csv'
outputFilePathS2 = f'C:\\Users\\izaak\Desktop\\VRETDataAnalysis\\Test 1\\totalRawWithLikertS2.csv'
outputFilePathS3 = f'C:\\Users\\izaak\Desktop\\VRETDataAnalysis\\Test 1\\totalRawWithLikertS3.csv'

model = createGruModel((timeStep, input_dim))
model.summary()

trainedGruModel, xTest, arousalTest, anxietyTest = chunkedTraining(model, outputFilePathS1, chunk_size, timeStep)

In [None]:
trainedGruModel.summary()
evaluate_model(trainedGruModel, xTest, arousalTest, anxietyTest)

In [None]:
del xTest, arousalTest, anxietyTest 
gc.collect()

In [None]:
timeStep = 128
input_dim = 32
chunk_size = 100000

trainedGruModel, xTest, arousalTest, anxietyTest = chunkedTraining(trainedGruModel, outputFilePathS2, chunk_size, timeStep)

In [None]:
trainedGruModel.summary()
evaluate_model(trainedGruModel, xTest, arousalTest, anxietyTest)

In [None]:
del xTest, arousalTest, anxietyTest 
gc.collect()

In [None]:
timeStep = 128
input_dim = 32
chunk_size = 100000

trainedGruModel, xTest, arousalTest, anxietyTest = chunkedTraining(trainedGruModel, outputFilePathS3, chunk_size, timeStep)

In [None]:
trainedGruModel.summary()
evaluate_model(trainedGruModel, xTest, arousalTest, anxietyTest)

In [None]:
del trainedGruModel, xTest, arousalTest, anxietyTest 
gc.collect()

In [None]:
def createBiLstmModel(inputShape):
    modelInput = Input(shape=inputShape)
    
    x = Bidirectional(LSTM(128, return_sequences=True, kernel_regularizer='l2'))(modelInput)
    x = Dropout(0.25)(x)
    x = Bidirectional(LSTM(128, return_sequences=True, kernel_regularizer='l2'))(x)
    x = Dropout(0.25)(x)
    x = Bidirectional(LSTM(64))(x)
    x = BatchNormalization()(x)
    
    # Output layers
    output_arousal = Dense(3, activation='softmax', name='arousal')(x)
    output_anxiety = Dense(3, activation='softmax', name='anxiety')(x)
    
    # Create and compile model
    model = Model(inputs=modelInput, outputs=[output_arousal, output_anxiety])
    model.compile(optimizer=Adam(learning_rate=0.001), 
                  loss={'arousal': 'categorical_crossentropy', 
                        'anxiety': 'categorical_crossentropy'}, 
                  metrics={'arousal': ['accuracy'], 'anxiety': ['accuracy']})
    return model

In [None]:
timeStep = 128
input_dim = 32
chunk_size = 100000
outputFilePathS1 = f'C:\\Users\\izaak\Desktop\\VRETDataAnalysis\\Test 1\\totalRawWithLikertS1.csv'
outputFilePathS2 = f'C:\\Users\\izaak\Desktop\\VRETDataAnalysis\\Test 1\\totalRawWithLikertS2.csv'
outputFilePathS3 = f'C:\\Users\\izaak\Desktop\\VRETDataAnalysis\\Test 1\\totalRawWithLikertS3.csv'

model = createBiLstmModel((timeStep, input_dim))
model.summary()

trainedBilstmModel, xTest, arousalTest, anxietyTest = chunkedTraining(model, outputFilePathS1, chunk_size, timeStep)

In [None]:
trainedBilstmModel.summary()
evaluate_model(trainedBilstmModel, xTest, arousalTest, anxietyTest)

In [None]:
del xTest, arousalTest, anxietyTest 
gc.collect()

In [None]:
timeStep = 128
chunk_size = 100000

trainedBilstmModel, xTest, arousalTest, anxietyTest = chunkedTraining(trainedBilstmModel, outputFilePathS2, chunk_size, timeStep)

In [None]:
trainedBilstmModel.summary()
evaluate_model(trainedBilstmModel, xTest, arousalTest, anxietyTest)

In [None]:
del xTest, arousalTest, anxietyTest 
gc.collect()

In [None]:
timeStep = 128
chunk_size = 100000

trainedBilstmModel, xTest, arousalTest, anxietyTest = chunkedTraining(trainedBilstmModel, outputFilePathS3, chunk_size, timeStep)

In [None]:
trainedBilstmModel.summary()
evaluate_model(trainedBilstmModel, xTest, arousalTest, anxietyTest)

In [None]:
del trainedBilstmModel, xTest, arousalTest, anxietyTest 
gc.collect()