In [None]:
#Importar las librerías necesarias
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from scipy.io import loadmat
from scipy.signal import spectrogram
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, Bidirectional, LSTM, Activation
from keras.callbacks import EarlyStopping
from keras.utils import to_categorical
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import GridSearchCV
from keras.optimizers import Adam
from keras.layers import TimeDistributed
from sklearn.base import BaseEstimator, ClassifierMixin
import os
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.signal as signal
import optuna
import librosa
import librosa.display


In [None]:
pathTrain = "Datasets\Train"
pathTest = "Datasets\Test"

In [None]:
##############################Cargar las matrices por gesto TRAIN#######################################
# Listado de nombres de archivos .mat que quieres cargar
nombres_archivos = ['Reposo', 'Extension', 'Flexion','DesvCubital', 'DesvRadial', 'Agarre','Abduccion', 'Aduccion', 'Supinacion','Pronacion']
matrices_normalizadas_train = []

# Carga los archivos .mat y realiza la normalización de las matrices con z-score
for nombre_archivo in nombres_archivos:
    nombre_mat = nombre_archivo+'.mat'
    ruta_archivo = os.path.join(pathTrain, nombre_mat)  # Ruta completa del archivo
    data = loadmat(ruta_archivo)
    datos = data[nombre_archivo]
    datos = datos[:,[0,2]] #Se cargan los canales 1 y 3
    
    # Calcular la media y la desviación estándar a lo largo del eje deseado (por ejemplo, eje 0)
    media = np.mean(datos, axis=0)
    desviacion_estandar = np.std(datos, axis=0)
    
    # Normalizar los datos utilizando z-score
    datos_normalizados = (datos - media) / desviacion_estandar
    
    #Almacenar las matrices resultantes en una lista
    matrices_normalizadas_train.append(datos_normalizados)

In [None]:
#Se verifica que todas las matrices resultantes tengan los tamaños correspondientes
def obtener_tamanos_matrices(lista_matrices):
    tamanos = []
    for matriz in lista_matrices:
        tamanos.append(matriz.shape)
    return tamanos

tamanos = obtener_tamanos_matrices(matrices_normalizadas_train)
print(tamanos)

In [None]:
def createSpectogram(data, id, folder):
    # Apply Short-Time Fourier Transform (STFT)
    stft = librosa.stft(data)

    # Convert complex STFT to magnitude spectrogram
    spectrogram = np.abs(stft)

    # Convert magnitude spectrogram to dB scale
    spectrogram_db = librosa.amplitude_to_db(spectrogram, ref=np.max)

    spectrogram_matrix = np.array(spectrogram_db)

    # Display the spectrogram
    plt.close()
    plt.figure(figsize=(10, 6))
    librosa.display.specshow(spectrogram_db, sr=2000, x_axis='time', y_axis='log')
    plt.axis('off')  # Remove axis labels for a clean image
    spectrogram_filename = f'{folder}\\spectrogram{id}.png'
    plt.savefig(spectrogram_filename, bbox_inches='tight', pad_inches=0)
    plt.close()

    return spectrogram_matrix

In [None]:
# Crea una lista para almacenar los espectrogramas apilados de cada gesto
stacked_spectrograms_train = []

# Parámetros de la ventana deslizante
window_size = 600  # Tamaño de la ventana deslizante
overlap = 0.5  # Superposición entre ventanas (50%)

idx = 0

for gesture_matrix in matrices_normalizadas_train:
    # Obtén las dimensiones de los espectrogramas
    num_spectrograms = int(np.floor((gesture_matrix.shape[0] - window_size) / (window_size * (1 - overlap)))) + 1
    spectrogram_length = int(window_size / 2) + 1  # Longitud de los espectrogramas (la mitad de la ventana deslizante)

    # Crea una matriz tridimensional para almacenar los espectrogramas apilados de un gesto
    stacked_gesture_spectrograms = np.zeros((num_spectrograms, spectrogram_length, 2))

    
    # Aplica ventanas deslizantes y calcula los espectrogramas
    for i in range(num_spectrograms):
        if(idx<37186):
            idx+=2
        else:
            start = int(i * window_size * (1 - overlap))
            end = start + window_size

            # Aplica la ventana deslizante a las señales EMG
            windowed_signals = gesture_matrix[start:end, :]

            spectrogram_channel1 = createSpectogram(windowed_signals[:,0],idx,"Datasets\\Train_spectogram\\")
            idx+=1
            spectrogram_channel2 = createSpectogram(windowed_signals[:,1],idx,"Datasets\\Train_spectogram\\")
            idx+=1


# La lista stacked_spectrograms ahora contiene las matrices tridimensionales de los espectrogramas apilados de cada gesto
# Cada elemento de la lista representa un gesto y tiene una forma (N, longitud_fija, 2), donde N es el número de espectrogramas y longitud_fija es la longitud común de los espectrogramas

In [None]:
#Se verifica que todas los espectrogramas tengan los tamaños correspondientes
def obtener_tamanos_matrices(lista_matrices):
    tamanos = []
    for matriz in lista_matrices:
        tamanos.append(matriz.shape)
    return tamanos

tamanos = obtener_tamanos_matrices(stacked_spectrograms_train)
print(tamanos)

In [None]:
##############################Cargar las matrices por gesto TEST#######################################
# Listado de nombres de archivos .mat que quieres cargar
nombres_archivos = ['Reposo', 'Extension', 'Flexion','DesvCubital', 'DesvRadial', 'Agarre','Abduccion', 'Aduccion', 'Supinacion','Pronacion']
matrices_normalizadas_test = []

# Carga los archivos .mat y realiza la normalización de las matrices con z-score
for nombre_archivo in nombres_archivos:
    nombre_mat = nombre_archivo+'.mat'
    ruta_archivo = os.path.join(pathTest, nombre_mat)  # Ruta completa del archivo
    data = loadmat(ruta_archivo)
    datos = data[nombre_archivo]
    datos = datos[:,[0,2]] #Se cargan los canales 1 y 3
    
    # Calcular la media y la desviación estándar a lo largo del eje deseado (por ejemplo, eje 0)
    media = np.mean(datos, axis=0)
    desviacion_estandar = np.std(datos, axis=0)
    
    # Normalizar los datos utilizando z-score
    datos_normalizados = (datos - media) / desviacion_estandar
    
    #Almacenar las matrices resultantes en una lista
    matrices_normalizadas_test.append(datos_normalizados)

In [None]:
#Se verifica que todas las matrices resultantes tengan los tamaños correspondientes
def obtener_tamanos_matrices(lista_matrices):
    tamanos = []
    for matriz in lista_matrices:
        tamanos.append(matriz.shape)
    return tamanos

tamanos = obtener_tamanos_matrices(matrices_normalizadas_test)
print(tamanos)

In [None]:
# Crea una lista para almacenar los espectrogramas apilados de cada gesto
stacked_spectrograms_test = []

# Parámetros de la ventana deslizante
window_size = 600  # Tamaño de la ventana deslizante
overlap = 0.5  # Superposición entre ventanas (50%)

idx = 0

for gesture_matrix in matrices_normalizadas_test:
    # Obtén las dimensiones de los espectrogramas
    num_spectrograms = int(np.floor((gesture_matrix.shape[0] - window_size) / (window_size * (1 - overlap)))) + 1
    spectrogram_length = int(window_size / 2) + 1  # Longitud de los espectrogramas (la mitad de la ventana deslizante)

    # Crea una matriz tridimensional para almacenar los espectrogramas apilados de un gesto
    stacked_gesture_spectrograms = np.zeros((num_spectrograms, spectrogram_length, 2))

    # Aplica ventanas deslizantes y calcula los espectrogramas
    for i in range(num_spectrograms):
        start = int(i * window_size * (1 - overlap))
        end = start + window_size

        # Aplica la ventana deslizante a las señales EMG
        windowed_signals = gesture_matrix[start:end, :]

        spectrogram_channel1 = createSpectogram(windowed_signals[:,0],idx,"Datasets\\Test_spectogram\\")
        idx+=1
        spectrogram_channel2 = createSpectogram(windowed_signals[:,1],idx,"Datasets\\Test_spectogram\\")
        idx+=1


# La lista stacked_spectrograms ahora contiene las matrices tridimensionales de los espectrogramas apilados de cada gesto
# Cada elemento de la lista representa un gesto y tiene una forma (N, longitud_fija, 2), donde N es el número de espectrogramas y longitud_fija es la longitud común de los espectrogramas

In [None]:
#Se verifica que todas los espectrogramas tengan los tamaños correspondientes
def obtener_tamanos_matrices(lista_matrices):
    tamanos = []
    for matriz in lista_matrices:
        tamanos.append(matriz.shape)
    return tamanos

tamanos = obtener_tamanos_matrices(stacked_spectrograms_test)
print(tamanos)

In [None]:
# Apilar las matrices de espectrogramas en una sola matriz tridimensional para los datos de entrenamiento
X_train = np.concatenate(stacked_spectrograms_train, axis=0)
print("Dimensiones de X_train:",X_train.shape)

# Apilar las matrices de espectrogramas en una sola matriz tridimensional para los datos de prueba
X_test = np.concatenate(stacked_spectrograms_test, axis=0)
print("Dimensiones de X_test:",X_test.shape)  # Verificar la forma de stacked_gestures_test

T=X_train.shape[2]
F=X_train.shape[1]

In [None]:
#seed = 42  # Set the desired seed value
#np.random.seed(seed)  # Set the seed for random number generation
#shape = X_train.shape
#random_indices = np.random.permutation(shape[axis])
#reordered_matrix = np.take(matrix, random_indices, axis=axis)

In [None]:
num_etiquetas = 10  # Número de etiquetas a asignar
muestras_por_etiqueta_train = 6399  # Número de muestras por etiqueta train
muestras_por_etiqueta_test = 1599  # Número de muestras por etiqueta test

# Codificar las etiquetas en formato one-hot
y_train = np.repeat(np.arange(num_etiquetas), muestras_por_etiqueta_train)
y_train = to_categorical(y_train, num_etiquetas)

y_test = np.repeat(np.arange(num_etiquetas), muestras_por_etiqueta_test)
y_test = to_categorical(y_test, num_etiquetas)

# Verificar las dimensiones de y_train y y_test
print("Dimensiones de y_train:", y_train.shape)
print("Dimensiones de y_test:", y_test.shape)


In [None]:
#### Dividir datos de entrenamiento en entrenamiento y validación
#X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)


In [None]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

In [None]:
# Función objetivo para la optimización de hiperparámetros
def objective():
    units_lstm = 256
    units_oculta = 1024
    dropout = 0.1

    modelo = Sequential()
    modelo.add(LSTM(units=units_lstm, return_sequences=True))
    modelo.add(BatchNormalization())
    modelo.add(Dropout(dropout))
    modelo.add(TimeDistributed(Dense(units=units_oculta, activation='relu')))
    modelo.add(Flatten())
    modelo.add(Dense(units=10, activation='softmax'))

    modelo.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    es = EarlyStopping(monitor='val_loss', patience=3)

    modelo.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=128, callbacks=[es], verbose=1)

    y_pred = modelo.predict(X_test)
    y_pred = np.argmax(y_pred, axis=1)
    accuracy = accuracy_score(np.argmax(y_test, axis=1), y_pred)

    return accuracy

# Ejecutar la función objetivo sin optimización de hiperparámetros
mejor_resultado = objective()

print("Mejor resultado:", mejor_resultado)


In [None]:
# Función objetivo para la optimización de hiperparámetros
def objective():
    units_lstm = 256
    units_oculta = 1024
    dropout = 0.1

    modelo = Sequential()
    modelo.add(Dense(units=units_oculta, activation='relu'))
    modelo.add(Dense(units=units_oculta/2, activation='relu'))
    modelo.add(Dense(units=units_oculta/4, activation='relu'))
    modelo.add(LSTM(units=units_lstm, return_sequences=True))
    modelo.add(BatchNormalization())
    modelo.add(Dropout(dropout))
    modelo.add(TimeDistributed(Dense(units=units_oculta, activation='relu')))
    modelo.add(Flatten())
    modelo.add(Dense(units=10, activation='softmax'))

    modelo.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    es = EarlyStopping(monitor='val_loss', patience=3)

    modelo.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=10, batch_size=128, callbacks=[es], verbose=1)

    y_pred = modelo.predict(X_test)
    y_pred = np.argmax(y_pred, axis=1)
    accuracy = accuracy_score(np.argmax(y_test, axis=1), y_pred)

    return accuracy

# Ejecutar la función objetivo sin optimización de hiperparámetros
mejor_resultado = objective()

print("Mejor resultado:", mejor_resultado)
