In [None]:
# Importación de librerías
import numpy as np
import matplotlib.pyplot as plt
import os

# Configuración Google Drive
from google.colab import drive
drive.mount('/gdrive')

# Rutas de los archivos
ruta = '/gdrive/MyDrive/TEG-EEG/dataset_bonn/'
ruta_F = ruta + 'F/'
ruta_N = ruta + 'N/'
ruta_O = ruta + 'O/'
ruta_S = ruta + 'S/'
ruta_Z = ruta + 'Z/'

# Categorías de los subsets
LABEL_ZO = 0 # Sujetos normales (sets Z y O). 200 en total
LABEL_NF = 1 # Sujetos anormales señales epilépticas interictales (N y F). 200 en total
LABEL_S = 2  # Sujetos anormales señales epilépticas ictales (S). 100 en total

def leer_datos():
    datos_0 = []
    datos_1 = []
    datos_2 = []
    N = 0        # Cantidad de archivos

    # Leer subset F
    print('Leyendo subset F... (categoría 1')
    for filename in os.listdir(ruta_F):
        dato = np.loadtxt(ruta_F + filename)
        dato = dato[:-1] # Tomar los primeros 4.096 datos, no incluir el dato 4.097
        datos_1.append([dato, np.array([LABEL_NF])])
        N += 1

    # Leer subset N
    print('Leyendo subset N... (categoría 1')
    for filename in os.listdir(ruta_N):
        dato = np.loadtxt(ruta_N + filename)
        dato = dato[:-1] # Tomar los primeros 4.096 datos, no incluir el dato 4.097
        datos_1.append([dato, np.array([LABEL_NF])])
        N += 1

    # Leer subset Z
    print('Leyendo subset Z... (categoría 0')
    for filename in os.listdir(ruta_Z):
        dato = np.loadtxt(ruta_Z + filename)
        dato = dato[:-1] # Tomar los primeros 4.096 datos, no incluir el dato 4.097
        datos_0.append([dato, np.array([LABEL_ZO])])
        N += 1

    # Leer subset O
    print('Leyendo subset O... (categoría 0')
    for filename in os.listdir(ruta_O):
        dato = np.loadtxt(ruta_O + filename)
        dato = dato[:-1] # Tomar los primeros 4.096 datos, no incluir el dato 4.097
        datos_0.append([dato, np.array([LABEL_ZO])])
        N += 1

    # Leer subset S
    print('Leyendo subset S...(categoría 2)')
    for filename in os.listdir(ruta_S):
        dato = np.loadtxt(ruta_S + filename)
        dato = dato[:-1] # Tomar los primeros 4.096 datos, no incluir el dato 4.097
        datos_2.append([dato, np.array([LABEL_S])])
        N += 1

    print(f'Lectura terminada. {N} registros en total')

    return datos_0, datos_1, datos_2

datos_0, datos_1, datos_2 = leer_datos()


# Convertir datos_0, datos_1 y datos_2 de listas
# a arreglos de NumPy

# Los arreglos resultantes tendran N (datos) x 4.097 columnas.
# Las primeras 4.096 columnas son la señal EEG, la última columna es la
# categoría

def list_to_array(lista):
    N = len(lista)
    nx = 4096
    ny = 1

    array = np.zeros((N,nx+ny),dtype='float32')
    for i, item in enumerate(lista):
        x = item[0] # Señal EEG
        y = item[1] # Categoría
        xy = np.concatenate((x,y), axis=0)
        array[i] = xy

    return array

datos_0_arr = list_to_array(datos_0)
datos_1_arr = list_to_array(datos_1)
datos_2_arr = list_to_array(datos_2)

print(datos_0_arr.shape)
print(datos_1_arr.shape)
print(datos_2_arr.shape)


# Extraer subsets de entrenamiento, validación y prueba
def extraer_subsets(datos, pct_trn=0.8, pct_val=0.1, pct_tst=0.1, seed=23):
    # Definir número de datos a extraer por cada subset
    N = datos.shape[0]  # Cantidad total de datos
    Ntrn = int(pct_trn*N) # Cantidad de datos de entrenamiento
    Nval = int(pct_val*N) # Cantidad de datos de validación
    Ntst = N-Ntrn-Nval    # Cantidad de datos de prueba

    # Mezclar aleatoriamente las filas del set de datos
    np.random.seed(seed)
    np.random.shuffle(datos)

    # Seleccionar los subsets
    datos_trn = datos[0:Ntrn]
    datos_val = datos[Ntrn:Ntrn+Nval]
    datos_tst = datos[Ntrn+Nval:]

    return datos_trn, datos_val, datos_tst

# Ejecutar la función
train_0, val_0, test_0 = extraer_subsets(datos_0_arr)
train_1, val_1, test_1 = extraer_subsets(datos_1_arr)
train_2, val_2, test_2 = extraer_subsets(datos_2_arr)

print(train_0.shape)
print(val_0.shape)
print(test_0.shape)
print(train_1.shape)
print(val_1.shape)
print(test_1.shape)
print(train_2.shape)
print(val_2.shape)
print(test_2.shape)


# Concatenar los arreglos train_0, 1 y 2 en train y lo mismo para validación
# y entrenamiento

train = np.concatenate((train_0,train_1,train_2))
val = np.concatenate((val_0, val_1, val_2))
test = np.concatenate((test_0, test_1, test_2))
print(train.shape)
print(val.shape)
print(test.shape)

# Conformar los sets x_train/y_train, x_val/y_val, x_test/y_test

def sets_xy(datos, seed=23):
    # Mezclar datos aleatoriamente
    # no hace falta mezclar
    #np.random.seed(seed)
    #np.random.shuffle(datos)
    # get all columns except the last one
    x = datos[:,0:-1]
    y = datos[:,-1]

    return x,y

x_train, y_train = sets_xy(train)
x_val, y_val = sets_xy(val)
x_test, y_test = sets_xy(test)

# Estandarización

def estandarizar(X, mu=None, sigma=None):

    if mu and sigma:
        X_s = (X-mu)/sigma
    else:
        # Calcular media y desviación
        mu = np.mean(X)
        sigma = np.std(X)

        # Estandarizar
        X_s = (X - mu)/sigma

    return X_s, mu, sigma

# Estandarizar los sets
x_train_s, mu, sigma = estandarizar(x_train)
x_val_s, _, _ = estandarizar(x_val, mu, sigma)
x_test_s, _, _ = estandarizar(x_test, mu, sigma)

print(x_test_s.mean())
print(x_test_s.std())

# Ventaneo
def ventanear(X,Y):
    X_v = []
    Y_v = []

    wsize = 128
    #nwind = 4096/wsize

    for x, y in zip(X, Y):
        # Ventanear el registro "x"
        # en teoria como son 400 registros por 16 ventaneos da un total 6400
        x_v = np.reshape(x,(32,wsize))      # 16x256
        y_v = np.repeat(y,32).reshape(32,1) # 16x1

        X_v.append(x_v)
        Y_v.append(y_v)

    # Y convertir las listas X_v, Y_v a arreglos NumPy
    X_v = np.vstack(X_v)
    Y_v = np.vstack(Y_v)

    np.random.shuffle(X_v)
    np.random.shuffle(Y_v)

    # flatten lo convierte en 1 dimension

    return X_v, Y_v.flatten()


print(x_train_s.shape)
print(y_train.shape)

x_train_v, y_train_v = ventanear(x_train_s,y_train)
x_val_v, y_val_v = ventanear(x_val_s,y_val)
x_test_v, y_test_v = ventanear(x_test_s,y_test)

print(x_train_v.shape)
print(x_val_v.shape)
print(x_test_v.shape)
print(y_train_v.shape)
print(y_val_v.shape)
print(y_test_v.shape)


# Además se convertirán Y_v_train, Y_v_test al formato one-hot
from tensorflow.keras.utils import to_categorical

y_train_v_oh = to_categorical(y_train_v,3)
y_val_v_oh = to_categorical(y_val_v,3)
y_test_v_oh = to_categorical(y_test_v,3)

print(y_train_v_oh.shape)
print(y_val_v_oh.shape)
print(y_test_v_oh.shape)

# La función np.random.normal permite añadir el ruido Gaussiano
# con las características deseadas a cada segmento de la señal

# Se debe añadir ruido con el mismo promedio de la señal original
# (loc=0.0) y con una desviación del 10% (0.1) de la señal
# original (scale=0.1)
noise_train = np.random.normal(loc=0.0, scale=0.1, size=x_train_v.shape)
noise_val = np.random.normal(loc=0.0, scale=0.1, size=x_val_v.shape)
# al de prueba investigar si se a;ade ruido
noise_test = np.random.normal(loc=0.0, scale=0.1, size=x_test_v.shape)

x_train_n = x_train_v + noise_train
x_val_n = x_val_v + noise_val
#x_test_n = x_test_v + noise_test
x_test_n = x_test_v

print(x_train_n.shape)
#print(x_train_n[2])

x_train_debug = x_train_n[:10]
print(x_train_debug.shape)


# 30-07-23 with all hyperparameters ok
import tensorflow as tf

func1 = 'relu'
func2 = 'linear'

tf.random.set_seed(157) # Reproducibilidad del entrenamiento

# Definimos la entrada
entrada = tf.keras.layers.Input(shape=(128,))

# Agregamos una capa de convolución con 32 filtros, tamaño de kernel de 3, y activación relu
entrada_convolucional = tf.keras.layers.Reshape((128, 1))(entrada)
conv1 = tf.keras.layers.Conv1D(16, 3, padding='same', activation=func1)(entrada_convolucional)

# Agregamos una capa de pooling con tamaño de pool de 2
maxpool1 = tf.keras.layers.MaxPooling1D(2)(conv1)
maxpool1 = tf.keras.layers.Dropout(0.05)(maxpool1)

encoder = tf.keras.layers.Dense(128, activation = func1,
                                name='bottleneck')(maxpool1)
encoder = tf.keras.layers.ActivityRegularization(l1=0.004)(encoder)

# Aplicamos dropout para intentar reducir el overfitting
encoder = tf.keras.layers.Dropout(0.05)(encoder)


# Agregamos una capa de Reshape para aplanar la salida y pasarla a la capa conv1d_transpose_52
transpuesta1 = tf.keras.layers.Conv1DTranspose(16, 3, padding='same', activation=func1)(encoder)
transpuesta1 = tf.keras.layers.UpSampling1D(2)(transpuesta1)

decoder_output = tf.keras.layers.Conv1DTranspose(1, 3, padding='same', activation=func2)(transpuesta1)

# Aplicamos dropout para intentar reducir el overfitting
decoder_output = tf.keras.layers.Dropout(0.05)(decoder_output)
decoder_output = tf.keras.layers.Reshape((128,))(decoder_output)
decoder_output = tf.keras.layers.Dense(128, activation=func2)(decoder_output)

# Definimos el autoencoder como el modelo que toma la entrada y devuelve la salida del decoder
autoencoder = tf.keras.models.Model(inputs=entrada, outputs=decoder_output)
autoencoder.summary()

optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

autoencoder.compile(
    loss='mse',
    optimizer=optimizer
    )

historial = autoencoder.fit(
    x = x_train_n,
    y = x_train_n,
    batch_size=128,
    epochs = 30,
    validation_data = (x_val_n,x_val_n),
    verbose=2
)

import matplotlib.pyplot as plt
plt.plot(historial.history['loss'], label='Train')
plt.plot(historial.history['val_loss'], label='Validation')
plt.legend();

In [None]:
# Find the bottleneck layer index
bottleneck_index = [index for index, layer in enumerate(autoencoder.layers) if layer.name == 'bottleneck'][0]

# Create the encoder model
encoder_model = tf.keras.Model(inputs=autoencoder.input, outputs=autoencoder.layers[bottleneck_index].output)

# Define the classification model
classifier_input = tf.keras.Input(shape=encoder_model.output_shape[1:])
x = classifier_input

# If the encoder output is 3D, flatten it for the Dense layer
x = tf.keras.layers.Flatten()(x)

x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.Dropout(0.5)(x)
x = tf.keras.layers.Dense(3, activation='softmax')(x)  # Assuming 3 classes
classifier_model = tf.keras.Model(inputs=classifier_input, outputs=x)

# Compile the classifier model
classifier_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

classifier_model.summary()

# Prepare the training data (encode the features using the encoder model)
x_train_encoded = encoder_model.predict(x_train_n)
x_test_encoded = encoder_model.predict(x_val_n)

# Train the classifier
classifier_history = classifier_model.fit(x_train_encoded, y_train_v_oh, 
                                          epochs=50,batch_size=128,
                                          validation_data=(x_test_encoded, y_val_v_oh))


In [None]:
# using LSTM layer:

# Define the classification model
classifier_input = tf.keras.Input(shape=encoder_model.output_shape[1:])
x = classifier_input

# LSTM Layer
x = tf.keras.layers.LSTM(64, return_sequences=True)(x)
x = tf.keras.layers.BatchNormalization()(x)

# Flatten the output for the Dense layer (if necessary)
x = tf.keras.layers.Flatten()(x)

x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.Dropout(0.5)(x)

# Output layer
x = tf.keras.layers.Dense(3, activation='softmax')(x)  # Assuming 3 classes
classifier_model = tf.keras.Model(inputs=classifier_input, outputs=x)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Compile the classifier model
classifier_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

classifier_model.summary()

# Prepare the training data (encode the features using the encoder model)
x_train_encoded = encoder_model.predict(x_train_n)
x_test_encoded = encoder_model.predict(x_val_n)

# Train the classifier
classifier_history = classifier_model.fit(x_train_encoded, y_train_v_oh, 
                                          epochs=50,batch_size=128,
                                          validation_data=(x_test_encoded, y_val_v_oh))
