# Instalación librería

# Carga librerías

In [125]:
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from keras.regularizers import l2
import tensorflow as tf

import numpy as np
import pandas as pd

from SignalProcessor.Filter import Filter
from SignalProcessor.CSPMulticlass import CSPMulticlass
from SignalProcessor.FeatureExtractor import FeatureExtractor
from SignalProcessor.RavelTransformer import RavelTransformer


## Clasificadores LDA y SVM
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.svm import SVC

from sklearn.pipeline import Pipeline
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score, classification_report, precision_recall_fscore_support
import pickle
from sklearn.preprocessing import label_binarize

from TrialsHandler.TrialsHandler import TrialsHandler
from TrialsHandler.Concatenate import Concatenate
from sklearn.preprocessing import StandardScaler

from sklearn.metrics import roc_curve, auc

# Subida de datos

In [126]:
np.random.seed(42)
tf.random.set_seed(42)

### ********** Cargamos los datos **********
file = "data/sujeto_11/eegdata/sesion1/sn1_ts0_ct0_r1.npy"
eventosFile = "data/sujeto_11/eegdata/sesion1/sn1_ts0_ct0_r1_events.txt"
rawEEG_1 = np.load(file)
eventos_1 = pd.read_csv(eventosFile, sep = ",")

file = "data/sujeto_11/eegdata/sesion2/sn2_ts0_ct0_r1.npy"
eventosFile = "data/sujeto_11/eegdata/sesion2/sn2_ts0_ct0_r1_events.txt"
rawEEG_2 = np.load(file)
eventos_2 = pd.read_csv(eventosFile, sep = ",")

#Creamos objetos para manejar los trials
th_1 = TrialsHandler(rawEEG_1, eventos_1, tinit = 0, tmax = 4, reject=None, sample_rate=250.)
th_2 = TrialsHandler(rawEEG_2, eventos_2, tinit = 0, tmax = 4, reject=None, sample_rate=250.)

dataConcatenada = Concatenate([th_1, th_2])#concatenamos datos

channelsSelected = [0,1,2,3,6,7]

#me quedo con channelsSelected
dataConcatenada.trials = dataConcatenada.trials[:,channelsSelected,:]

# Estas son las clases que quieres mantener
        # "Mano Izquierda",1
        # "Mano Derecha",2
        # "Ambas Manos",3
        # "Pies",4
        # "Rest"5

desired_classes = [1, 2, 3, 4, 5]

num_classes = len(desired_classes)

# Filtramos los ensayos y etiquetas para mantener solo las clases deseadas
filtered_indices = np.isin(dataConcatenada.labels, desired_classes)
trials = dataConcatenada.trials[filtered_indices]
labels = dataConcatenada.labels[filtered_indices]

if 3 not in desired_classes:
  if (5 in desired_classes) and (4 in desired_classes):
    labels[labels == 4] = 3
    labels[labels == 5] = 4
  elif (5 in desired_classes):
    labels[labels == 5] = 3

print(trials.shape)
print(labels.shape)

labels = labels-1

eeg_train, eeg_test, labels_train, labels_test = train_test_split(trials, labels, test_size=0.2, stratify=labels)

### ********** Instanciamos los diferentes objetos que usaremos en el pipeline**********

fm = 250. #frecuencia de muestreo
filtro = Filter(highcut = 30)
csp = CSPMulticlass(n_components=2, method = "ovo", n_classes = num_classes, reg = 0.01)
featureExtractor = FeatureExtractor(method = "welch", sample_rate = fm, axisToCompute=2, band_values=[8,12])
ravelTransformer = RavelTransformer()

Se han extraido 75 trials
Se han extraido 8 canales
Se han extraido 1000 muestras por trial
Se han extraido 75 trials
Se han extraido 8 canales
Se han extraido 1000 muestras por trial
(150, 6, 1000)
(150,)


# CRNN

In [127]:
from sklearn.base import TransformerMixin
from sklearn.preprocessing import StandardScaler
class ChannelScaler(TransformerMixin):
    def __init__(self):
        self.scalers = []

    def fit(self, X, y=None):
        n_channels = X.shape[1]
        self.scalers = [StandardScaler() for _ in range(n_channels)]
        for i in range(n_channels):
            self.scalers[i].fit(X[:, i, :])
        return self

    def transform(self, X, y=None):
        n_channels = X.shape[1]
        X_scaled = np.empty_like(X)
        for i in range(n_channels):
            X_scaled[:, i, :] = self.scalers[i].transform(X[:, i, :])
        return X_scaled

In [128]:
from CNNEEG import CRNN_EEGNet
# Filtramos los ensayos y etiquetas para mantener solo las clases deseadas
from keras.utils import to_categorical

pipeline = Pipeline([
    ('pasabanda', filtro),
    ('standardnormalization', ChannelScaler())])

# crnn_eegnet.fit(X_train, y_train, epochs=7000, validation_data=(X_val, y_val), patience=3000)
pipeline.fit(trials, labels)

transformed_trials = pipeline.transform(trials)
print(transformed_trials.shape)

labels_onehot = to_categorical(labels, num_classes = num_classes)

# # Dividir en conjunto de entrenamiento y validación
X_train, X_val, y_train, y_val = train_test_split(transformed_trials, labels_onehot, test_size=0.3, random_state=42, stratify= labels_onehot)
X_val, X_test, y_val, y_test = train_test_split(X_val, y_val, test_size=0.5, random_state=42, stratify= y_val)

# X_train = np.expand_dims(X_train, axis=-1)
# X_val = np.expand_dims(X_val, axis=-1)
# X_test = np.expand_dims(X_test, axis=-1)

input_shape = X_train.shape[1:]  # Tomar la forma de los datos, excluyendo la dimensión de los ejemplos

crnn_eegnet = CRNN_EEGNet(input_shape= input_shape, num_classes = num_classes)
crnn_eegnet.compile()

fitness = crnn_eegnet.fit(X_train, y_train, epochs = 100, validation_data = (X_val, y_val), patience = 15)

(150, 6, 1000)
Epoch 1/100
Epoch 1: val_loss improved from inf to 1.63480, saving model to best_model.h5
Epoch 2/100


  saving_api.save_model(


Epoch 2: val_loss did not improve from 1.63480
Epoch 3/100
Epoch 3: val_loss did not improve from 1.63480
Epoch 4/100
Epoch 4: val_loss did not improve from 1.63480
Epoch 5/100
Epoch 5: val_loss did not improve from 1.63480
Epoch 6/100
Epoch 6: val_loss did not improve from 1.63480
Epoch 7/100
Epoch 7: val_loss did not improve from 1.63480
Epoch 8/100
Epoch 8: val_loss did not improve from 1.63480
Epoch 9/100
Epoch 9: val_loss did not improve from 1.63480
Epoch 10/100
Epoch 10: val_loss did not improve from 1.63480
Epoch 11/100
Epoch 11: val_loss did not improve from 1.63480
Epoch 12/100
Epoch 12: val_loss did not improve from 1.63480
Epoch 13/100
Epoch 13: val_loss did not improve from 1.63480
Epoch 14/100
Epoch 14: val_loss did not improve from 1.63480
Epoch 15/100
Epoch 15: val_loss did not improve from 1.63480
Epoch 16/100

Epoch 16: val_loss did not improve from 1.63480
Epoch 16: early stopping


In [129]:
# 1. Obtener las predicciones del modelo CRNN_EEGNet
probs = crnn_eegnet.model.predict(X_test)  # Esto nos da las probabilidades

y_pred = np.argmax(probs, axis=1)
y_true = np.argmax(y_test, axis=1)  # Convertir y_test de formato one-hot a índices de clases

print(classification_report(y_true, y_pred), end="\n\n")

cm_crnn = crnn_eegnet
# Matriz de confusión
cm_crnn = confusion_matrix(y_true, y_pred)
print(cm_crnn)

# Métricas individuales
precision_crnn, recall_crnn, f1score_crnn, _ = precision_recall_fscore_support(y_true, y_pred, average='macro')

# Accuracy
acc_crnn = accuracy_score(y_true, y_pred)
acc_crnn = np.round(acc_crnn, decimals=2) * 100
print(f"El accuracy del modelo CRNN_EEGNet es de {acc_crnn}")

# Calcular la curva ROC y el AUC
n_classes = len(np.unique(y_true))
roc_aucs = []

for i in range(n_classes):
    fpr, tpr, _ = roc_curve(y_true == i, probs[:, i])

    if not np.any(np.isnan(tpr)): # Solo calcula AUC si no hay NaNs
        roc_aucs.append(auc(fpr, tpr))

auc_CRNN = np.mean(roc_aucs)
print(f"El AUC promedio del modelo CRNN_EEGNet es de {auc_CRNN:.2f}")

              precision    recall  f1-score   support

           0       0.50      0.25      0.33         4
           1       1.00      0.20      0.33         5
           2       0.75      0.60      0.67         5
           3       0.17      0.25      0.20         4
           4       0.30      0.60      0.40         5

    accuracy                           0.39        23
   macro avg       0.54      0.38      0.39        23
weighted avg       0.56      0.39      0.40        23


[[1 0 0 0 3]
 [0 1 0 3 1]
 [0 0 3 1 1]
 [1 0 0 1 2]
 [0 0 1 1 3]]
El accuracy del modelo CRNN_EEGNet es de 39.0
El AUC promedio del modelo CRNN_EEGNet es de 0.70


# Creación de DataFrames

In [130]:
df = pd.DataFrame(columns=["Accuracy", "Precision", "Recall", "F1-Score", "AUC"])
df.loc["CRNN"] = [acc_crnn,precision_crnn, recall_crnn, f1score_crnn, auc_CRNN]  # We only have accuracy for CRNN in this case

print(df)

# Save and return the best estimator for each pipeline
best_estimators = {}
best_estimators['CRNN'] = crnn_eegnet

#Matrices de confusión

cm = {'CRNN': cm_crnn}

      Accuracy  Precision  Recall  F1-Score       AUC
CRNN      39.0   0.543333    0.38  0.386667  0.703918


In [131]:
# DataFrame with accuracy and other metrics
results_df_path = "Resultados/results_df.csv"
df.to_csv(results_df_path)

# Save best estimators
best_estimators_paths = {
    'CRNN': "Resultados/best_crnn.pkl"
}

for model_name, path in best_estimators_paths.items():
    with open(path, 'wb') as file:
        pickle.dump(best_estimators[model_name], file)

#Guardando matrices de confusión
with open("Resultados/cm.pkl", 'wb') as file:
        pickle.dump(cm, file)