# Labo 3 - ARN

In [None]:
import pandas as pd

mice_eeg = pd.read_csv('./pw3_data/EEG_mouse_data_1.csv')
mice_eeg2 = pd.read_csv('./pw3_data/EEG_mouse_data_2.csv')
df = pd.concat([mice_eeg, mice_eeg2], ignore_index=True)

#mice_eeg3 = pd.read_csv('./pw3_data/EEG_mouse_data_test.csv')

## 1. Introduction

We take the first 25 features as they are contain the biggest occurences count.

In [None]:
input_data = df.iloc[:, 1:26].to_numpy()
input_data

# Normalisation


In [None]:
# Cut in 3 folds (1/3 for test and 2/3 for training)
# shuffle d'abord avant de découper
#  normaliser tout fit
# partager

import keras
from keras import layers
from sklearn.model_selection import KFold
import matplotlib.pyplot as pl

pl.clf()

keras.utils.set_random_seed(123)
kf = KFold(n_splits=3, shuffle=True)
for i, (train_index, test_index) in enumerate(kf.split(df)):
    print(str(i) + ' ' + str(train_index) + ' ' + str(test_index))
# See training 2 cells below

In [None]:
# Configure and setup our MLP
# Source: From the notebook 5. See 'Changed: '
def create_model():
  mlp = keras.Sequential([
      layers.Input(shape=(25,)), # Changed: shape=(25,) instead of 2
      layers.Dense(20, activation="leaky_relu"), # Try different numbers of hidden neurons here (e.g. 2, 4, 8, 32, 128)
      # Changed: todo try to change the amount of layers and perceptrons per layer
      layers.Dense(3, activation="softmax")   # output
  ])

  # Experiment with hyperparameters here:
  # momentum: [0, 0.8, 0.9, 0.99]
  # learning_rate: [0.1, 0.01, 0.001, 0.0001]
  mlp.compile(
      optimizer="rmsprop",
      loss=keras.losses.CategoricalFocalCrossentropy(),
  )

  return mlp

mlp = create_model()
mlp.summary()


In [None]:
history_list = []
trained_mlp = []

from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
scaler = StandardScaler()

encoder = OneHotEncoder(sparse_output=False)
encoded_states = encoder.fit_transform(pd.DataFrame(df['state']))  # Transform and encode the states

print(encoded_states)
for i, (train_index, test_index) in enumerate(kf.split(input_data)):
  print(str(i) + ' ' + str(train_index) + ' ' + str(test_index))
  
  # We need to create a new model every time otherwise fit will continue previous training
  mlp = create_model()
  
  train_input = scaler.fit_transform(input_data[train_index])
  test_input = scaler.fit_transform(input_data[test_index])
  
  history = mlp.fit(
      x=train_input, 
      y=encoded_states[train_index],  # Use the encoded states for training
      validation_data=(test_input, encoded_states[test_index]),
      epochs=100
  )

  history_list.append(history)
  trained_mlp.append(mlp)


In [None]:
trained_mlp[0]

# Plot training history

In [None]:
import numpy as np

train_losses = np.array([history.history['loss'] for history in history_list])
val_losses = np.array([history.history['val_loss'] for history in history_list])

# Calculate mean and standard deviation for training and validation losses
mean_train_loss = np.mean(train_losses, axis=0)
std_train_loss = np.std(train_losses, axis=0)
mean_val_loss = np.mean(val_losses, axis=0)
std_val_loss = np.std(val_losses, axis=0)

# Plot mean and standard deviation for training loss
pl.plot(mean_train_loss, label='Training Loss (Mean)')
pl.fill_between(range(len(mean_train_loss)), mean_train_loss - std_train_loss, mean_train_loss + std_train_loss, alpha=0.3, label='Training Loss (Std)')

# Plot mean and standard deviation for validation loss
pl.plot(mean_val_loss, label='Validation Loss (Mean)')
pl.fill_between(range(len(mean_val_loss)), mean_val_loss - std_val_loss, mean_val_loss + std_val_loss, alpha=0.3, label='Validation Loss (Std)')

# Add labels and legend
pl.xlabel('Epochs')
pl.ylabel('Loss')
pl.legend()

# Display the plot
pl.show()

# Performances

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix, f1_score
import seaborn as sns

def plot_confusion_matrix(confusion_matrix, title):
    # Plot confusion matrix
    pl.figure(figsize=(8, 6))
    sns.heatmap(confusion_matrix.astype(int), annot=True, fmt="d", cmap="Blues", cbar=False,
                xticklabels=["rem","n-rem","awake"], yticklabels=["rem","n-rem", "awake"])
    pl.title(title)
    pl.xlabel('Predicted')
    pl.ylabel('True')
    pl.show()

f1_scores = []
mean_confusion_matrix = np.zeros((3, 3))

for i, (train_index, test_index) in enumerate(kf.split(input_data)):
    
    # Normalize
    train_input = scaler.fit_transform(input_data[train_index])
    test_input = scaler.transform(input_data[test_index])

    # Transform target variable
    train_output = encoder.transform(df['state'].iloc[train_index].values.reshape(-1, 1))
    test_output = encoder.transform(df['state'].iloc[test_index].values.reshape(-1, 1))
    
    # Get feature names from encoder categories
    feature_names = encoder.get_feature_names_out(['state'])
    train_output_df = pd.DataFrame(train_output, columns=feature_names)
    test_output_df = pd.DataFrame(test_output, columns=feature_names)

    # Evaluate the trained model on the test fold
    predictions = trained_mlp[i].predict(test_input)
    true_labels = np.argmax(test_output, axis=1)  # Get the index of the maximum value as the true labels
    predicted_labels = np.argmax(predictions, axis=1)  # Get the index of the maximum value as the predicted labels

    # Compute confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)
    mean_confusion_matrix += cm

    # Compute confusion matrix and plot
    plot_confusion_matrix(cm, f'Confusion Matrix - Fold {i + 1}')

    # Compute F1 score
    f1 = f1_score(true_labels, predicted_labels, average='macro')  # Use macro averaging for multi-class classification
    f1_scores.append(f1)
    print(f"F1 Score - Fold {i + 1}: {f1}")

# Plot mean confusion matrix
plot_confusion_matrix(mean_confusion_matrix, 'Global confusion matrix')

# Calculate and display the mean F1 score across all folds
mean_f1_score = np.mean(f1_scores)
print(f"Mean F1 Score across all folds: {mean_f1_score}")


# Test data predictions
Running the best of our 3 models on test data for the competition.

In [None]:
testdf = pd.read_csv('./pw3_data/EEG_mouse_data_test.csv')

# Take the first 25 columns only
testdf = testdf.iloc[:, 0:25]
# Normalize test set (to have the same kind of inputs on the model)
test_data = scaler.transform(testdf.to_numpy()) 

# Analysis
# train_output = encoder.transform(df['state'].iloc[train_index].values.reshape(-1, 1))
# print(df['state'])
# print(train_output)
# Note: when inspecting this output, I deduced that n is at index 0, r at index 1 and w at index 0

# Takethe best model based on the maximum f1_score
best_model_index = f1_scores.index(max(f1_scores))
print("best_model_index ", best_model_index)

# Run predictions on test data
print("Running predictions on test data")
test_predictions = trained_mlp[best_model_index].predict(test_data)

# Transform predictions intos labels index and then labels
print(test_predictions)
predicted_labels = np.argmax(test_predictions, axis=1)  # Get the index of the maximum value as the predicted labels
print(predicted_labels)
labels = ['n', 'r', 'w'] # because n is at index 0, r at index 1 and w at index 0
predicted_labels_state = [labels[i] for i in predicted_labels]
# Printing 20 values in the middle to check for state coherence
print(predicted_labels[1500:1520])
print(predicted_labels_state[1500:1520])

# Write predictions nparray to file
with open('test_pred.npy', 'wb') as f:
    np.save(f, predicted_labels_state)

print("file test_pred.npy saved !")