In [None]:
from google.colab import drive
drive.mount('/content/drive/')
import os
os.chdir('/content/drive/MyDrive/eeg_data')
import sys
sys.path.append('/content/drive/MyDrive/eeg_data')

Mounted at /content/drive/


In [None]:
!ls ./base

Config.py		  ExractFeatures.py	       __pycache__	tensorPCA.py
config_trial_spikes.yalm  load_files_trials_spikes.py  session_data.py	utils.py
eeg.py			  MyRC_ESN.py		       subject.py


In [None]:
!pip install mne
!pip install yasa
!pip install dotmap
!pip install nilearn
!pip install mne_connectivity
!pip install optuna

Collecting mne
  Downloading mne-1.7.1-py3-none-any.whl (7.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: mne
Successfully installed mne-1.7.1
Collecting yasa
  Downloading yasa-0.6.5-py2.py3-none-any.whl (33.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.8/33.8 MB[0m [31m40.8 MB/s[0m eta [36m0:00:00[0m
Collecting antropy (from yasa)
  Downloading antropy-0.1.6.tar.gz (17 kB)
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tensorpac>=0.6.5 (from yasa)
  Downloading tensorpac-0.6.5-py3-none-any.whl (423 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m423.6/423.6 kB[0m [31m43.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pyriemann>=0.2.7 (from yasa)
  Downloading pyriemann-0.6-py2.py3-none-any.

In [None]:
# @title Listado ficheros datasets creados de los datos raw
import os
import numpy as np

def read_files_and_show (directory = 'dataset'):
    # Verifica si el directorio existe
    if not os.path.exists (directory):
        print(f"El directorio {directory} no existe.")
        return

    # Obtiene la lista de archivos en el subdirectorio
    files = [f for f in os.listdir(directory) if f.endswith('.npy')]

    # Verifica si hay archivos en el directorio
    if not files:
        print(f"No se encontraron archivos .npy en el directorio {directory}.")
        return

    # Lee cada archivo y muestra sus dimensiones
    for file in files:
        path_file = os.path.join (directory, file)
        try:
            data = np.load (path_file)
            print(f"El archivo {file} tiene dimensiones: {data.shape}")
        except Exception as e:
            print(f"No se pudo leer el archivo {file}: {e}")

# Llamada a la función
read_files_and_show ()

El archivo dt_Younger_s_ICA_features_30_4.npy tiene dimensiones: (23, 360, 31)
El archivo dt_Older_s_ICA_features_30_4.npy tiene dimensiones: (24, 360, 31)
El archivo dt_Younger_s_ICA_30_4.npy tiene dimensiones: (23, 39680, 17)
El archivo dt_Older_s_ICA_30_4.npy tiene dimensiones: (24, 39680, 17)
El archivo dt_Younger_s_features_30_4.npy tiene dimensiones: (23, 360, 31)
El archivo dt_Older_s_features_30_4.npy tiene dimensiones: (24, 360, 31)
El archivo dt_Younger_s_30_4.npy tiene dimensiones: (23, 39680, 17)
El archivo dt_Older_s_30_4.npy tiene dimensiones: (24, 39680, 17)
El archivo dt_Younger_z_ICA_features_30_4.npy tiene dimensiones: (23, 296, 31)
El archivo dt_Older_z_ICA_features_30_4.npy tiene dimensiones: (24, 296, 31)
El archivo dt_Younger_z_ICA_30_4.npy tiene dimensiones: (23, 39680, 9)
El archivo dt_Older_z_ICA_30_4.npy tiene dimensiones: (24, 39680, 9)
El archivo dt_Younger_z_features_30_4.npy tiene dimensiones: (23, 296, 31)
El archivo dt_Older_z_features_30_4.npy tiene dim

In [None]:
# @title Búsqueda parametros optimos del modelo MyRC apra cada dataset creado
import re
import os
import sys
import optuna
import argparse
import warnings
import numpy as np
import matplotlib.pyplot as plt

from nilearn import plotting
from scipy.stats import norm
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score
from sklearn.metrics.pairwise import cosine_similarity
# Import specific functions/classes/modules
from base.eeg import (
    all_channels, eeg_channels, eog_channels, process_eeg_data_with_ica,
    process_eeg_data_without_ica
)

from base.MyRC_ESN import (
    MyRC, MyESN
)

def standardize_eeg_signals_opt( X ):
    """
    Estandariza las señales de EEG para cada canal de cada sujeto.

    Args:
        X (np.ndarray): Datos de entrada con forma (n_subjects, n_samples, n_channels).

    Returns:
        X_standardized (np.ndarray): Datos estandarizados con la misma forma que X.
    """
    X_standardized = np.copy( X )

    def standardize_channel( channel_data ):
        scaler = StandardScaler()
        return scaler.fit_transform( channel_data.reshape( -1, 1 ) ).flatten()

    for i in range( X.shape[0] ):
        X_standardized[i] = np.apply_along_axis( standardize_channel, 0, X[i] )

    return X_standardized

def minmax_normalize_eeg_signals_opt( X ):
    """
    Normaliza las señales de EEG para cada canal de cada sujeto usando Min-Max.

    Args:
        X (np.ndarray): Datos de entrada con forma (n_subjects, n_samples, n_channels).

    Returns:
        X_normalized (np.ndarray): Datos normalizados con la misma forma que X.
    """
    X_normalized = np.copy( X )

    def normalize_channel( channel_data ):
        scaler = MinMaxScaler()
        return scaler.fit_transform( channel_data.reshape( -1, 1 ) ).flatten()

    for i in range( X.shape[0] ):
        X_normalized[i] = np.apply_along_axis( normalize_channel, 0, X[i] )

    return X_normalized

def check_and_load_files( file_path, verbose=False ):
    """
    Verifica si el archivo existe y lo carga. Asigna etiquetas basadas en el nombre del archivo.

    Args:
        file_path (str): Ruta del archivo a cargar.
        verbose (bool): Indica si se debe mostrar información detallada.

    Returns:
        tuple: Dataset cargado y etiquetas.
    """
    # Check if the file exists
    if not os.path.exists( file_path ):
        raise FileNotFoundError( f"El fichero {file_path} no se encuentra disponible." )

    # Load the dataset
    dataset = np.load( file_path )

    if verbose: print( f'dataset: {dataset.shape}' )

    # Assuming labels are included in the filename, adjust as necessary
    # For this example, we assume the younger/older distinction and their respective counts
    if 'younger' in file_path:
        labels = [0] * 23
    elif 'older' in file_path:
        labels = [1] * 24
    else:
        raise ValueError( "Filename must contain 'younger' or 'older' to create labels." )

    labels = np.array( labels )

    return dataset, labels

def train_reservoir( config, X, Y, verbose=False ):
    """
    Entrena el modelo de reservorio.

    Args:
        config (dict): Configuración del modelo.
        X (np.ndarray): Datos de entrada.
        Y (np.ndarray): Etiquetas.
        verbose (bool): Indica si se debe mostrar información detallada.

    Returns:
        tuple: Estado del reservorio, representación de entrada y etiquetas.
    """
    if verbose: print( f"config: {config}" )
    model_clus = MyESN( config )

    X_ = standardize_eeg_signals_opt( X )

    if verbose: print( f'X: {X.shape}' )
    if verbose: print( f'X_: {X_.shape}' )

    my_rc_clus         = MyRC( model_clus, config )
    result_rc          = my_rc_clus.fit( X_ )

    rc_state           = result_rc[0]
    rc_dim_states      = result_rc[1]
    input_repr         = result_rc[2]
    output_redout_layer= result_rc[3]

    if verbose:
        print( f'config: {config}' )
        print( f'rc_state: {rc_state.shape}' )
        print( f'rc_dim_states: {rc_dim_states.shape}' )
        print( f'input_repr: {input_repr.shape}' )
        print( f'output_redout_layer: {output_redout_layer.shape}' )

    return rc_state, input_repr, Y
# Función para ignorar advertencias de tipo FutureWarning
def warn (*args, **kwargs):
    pass
def calculate_kmeans_and_metrics (X, Y, verbose):
    similarity_matrix = cosine_similarity (X)
    np.fill_diagonal (similarity_matrix, 0)

    kmeans = KMeans (n_clusters=2)
    kmeans.fit_transform (similarity_matrix)
    predicted_labels_similarity = kmeans.labels_

    kmeans.fit_transform (X)
    predicted_labels_X = kmeans.labels_

    return predicted_labels_similarity, predicted_labels_X

def log_results(file_log, trial, config, report, accuracy_value, f1_score, auc_roc_value):
    with open (file_log, "a") as f:
        f.write (f"Trial {trial.number}\n")
        f.write (f"Configuration: {config}\n")
        f.write (f"Report: {report}\n")
        f.write (f"accu: {accuracy_value}\n")
        f.write (f"f1_score: {f1_score}\n")
        f.write (f"auc_roc_value: {auc_roc_value}\n")
        f.write ("\n")

def evaluate_accuracy(trial, X, Y, state, file_prefix, verbose=False):

    predicted_labels_similarity, predicted_labels_X = calculate_kmeans_and_metrics(X, Y, verbose)

    labels = Y
    if verbose:
        print(f'trial: {trial}')
        print(f'\t labels: {labels}')
        print(f'\t predicted_labels_similarity: {predicted_labels_similarity}')
        print(f'\t predicted_labels_X: {predicted_labels_X}')

    # Generate and log metrics for similarity matrix
    report_similarity         = classification_report (labels, predicted_labels_similarity, output_dict = True)
    f1_score_similarity       = report_similarity ['weighted avg']['f1-score']
    auc_roc_value_similarity  = roc_auc_score (labels, predicted_labels_similarity)
    accuracy_value_similarity = accuracy_score (labels, predicted_labels_similarity)

    file_log_similarity = f"./optim_param/{file_prefix}_similarity.txt"
    log_results (file_log_similarity, trial, trial.params, report_similarity, accuracy_value_similarity, f1_score_similarity, auc_roc_value_similarity)

    # Generate and log metrics for X
    report_X         = classification_report (labels, predicted_labels_X, output_dict=True)
    f1_score_X       = report_X ['weighted avg']['f1-score']
    auc_roc_value_X  = roc_auc_score (labels, predicted_labels_X)
    accuracy_value_X = accuracy_score (labels, predicted_labels_X)

    file_log_X = f"./optim_param/{file_prefix}_X.txt"
    log_results (file_log_X, trial, trial.params, report_X, accuracy_value_X, f1_score_X, auc_roc_value_X)

    if verbose:
        print(f'\t f1_score_similarity::state = {state}: {f1_score_similarity}')
        print(f'\t auc_roc_value_similarity:state = {state}; {auc_roc_value_similarity}')
        print(f'\t accuracy_value_similarity:state = {state}; {accuracy_value_similarity}')

        print(f'\t f1_score_X::state = {state}: {f1_score_X}')
        print(f'\t accuracy_value_X:state = {state}; {accuracy_value_X}')
        print(f'\t auc_roc_value_X:state = {state}; {auc_roc_value_X}')

    return accuracy_value_similarity, accuracy_value_X


def train_and_evaluate_model ( trial, X, Y, file_prefix, verbose = False ):
    """
    Entrena y evalúa el modelo de EEG.

    Args:
        trial (optuna.Trial): Prueba actual de Optuna.
        X (np.ndarray): Datos de entrada.
        Y (np.ndarray): Etiquetas.
        file_prefix (str): Prefijo para el archivo de log.
        verbose (bool): Indica si se debe mostrar información detallada.

    Returns:
        float: Precisión de la evaluación del modelo.
    """

    config = {
        'seed':0,
        'init_type':trial.suggest_categorical('init_type', ['orthogonal', 'trunc_normal', 'binorm']),
        'init_std':trial.suggest_float('init_std', 0.01, 0.05, log = True),
        'init_mean':0,
        'input_size': X.shape[2],
        'n_internal_units': X.shape[2]*10,
        'spectral_radius': trial.suggest_float('spectral_radius', 0.96, 1.00),
        'leak': trial.suggest_float('leak', 0.38, 0.96),
        'input_scaling': 0.10,
        'nonlinearity': trial.suggest_categorical('nonlinearity', ['relu', 'tanh']),
        'connectivity': trial.suggest_float('connectivity', 0.1, 0.5),
        'noise_level': trial.suggest_float('noise_level', 0.1, 10.00),
        'n_drop': trial.suggest_int('n_drop', 0, 100) if trial.suggest_categorical('n_drop_present', [True, False]) else None,
        'washout':'init',
        'w_ridge_embedding':trial.suggest_float('w_ridge_embedding', 10.0, 20.0, log = True),
        'mts_rep': trial.suggest_categorical('mts_rep', ['reservoir', 'output', 'id','last','mean','state']),
        'bidir':'True', # trial.suggest_categorical('bidir', ['True', 'False']),
        'circle': False,
        'dimred_method':  trial.suggest_categorical('dimred_method', ['tenpca', None]),
        'n_dim': trial.suggest_int('n_dim', 30, 75) if trial.suggest_categorical('dimred_method', ['tenpca', None]) == 'tenpca' else 30,
        'use_input_bias':True,
        'use_input_layer':'True', #trial.suggest_categorical('use_input_layer', ['True', 'False']),
        'readout_type': None,
        'threshold':0.3,
        'svm_kernel': 'linear',
        'svm_gamma': 0.005,
        'svm_C': 5.0,
        'w_ridge': 5.0,
        'mlp_w_l2': 0.01,
        'mlp_num_epochs': 2000,
        'mlp_layout': (10, 10),
        'mlp_batch_size': 32,
        'mlp_learning_rate': 0.01,
        'mlp_learning_rate_type': 'constant',
        'plasticity_synaptic':None, # 'hebb'.'oja', 'covariance','bcm'
        'theta_m':0.01,
        'learning_rate':0.1,
        'plasticity_intrinsic':None, # 'excitability', 'activation_function'
        'new_activation_function':'tanh',
        'excitability_factor':0.01,
        'device': 'cpu'
    }

    config ['bidir']           = True if config ['bidir'] == 'True' else False
    config ['use_input_layer'] = True if config ['use_input_layer'] == 'True' else False

    if config['dimred_method'] is not None:
        config['n_dim'] = min(config['n_dim'], config ['n_internal_units'])

    # Lógica para ajustar washout si n_drop no es None
    if config['n_drop'] is not None:
        config['washout'] = trial.suggest_categorical ('washout', ['init', 'rand'])

    rc_state, input_repr, Y = train_reservoir (config, X, Y, verbose = verbose)

    dt_rc_state = rc_state.reshape (rc_state.shape[0], -1)

    file_prefix_state = f"results_state_{file_prefix}"
    file_prefix_input = f"results_inter_{file_prefix}"

    accuracy_value_state_similarity, accuracy_value_state_X = evaluate_accuracy (trial, dt_rc_state, Y,
                                                                                 state = True, file_prefix = file_prefix_state, verbose = verbose)
    accuracy_value_input_similarity, accuracy_value_input_X = evaluate_accuracy (trial, input_repr, Y,
                                                                                 state = False, file_prefix = file_prefix_input, verbose = verbose)

    return max (accuracy_value_state_similarity, accuracy_value_input_similarity, accuracy_value_state_X, accuracy_value_input_X)



def extract_parameters(filename):
    pattern = r'dt_(Older|Younger)_([a-zA-Z]*)_(ICA_)?(features_)?(\d+)_(\d+).npy'
    match = re.match(pattern, filename)

    if match:
        groups = match.groups()
        selectChannels = groups[1] if groups[1] else 'default_value'  # Reemplaza 'default_value' con el valor por defecto que desees
        freq           = groups[4]
        decim          = groups[5]
        ica            = True if groups[2] else  False
        features       = True if groups[3] else False

        return {
            'selectChannels': selectChannels,
            'freq': freq,
            'decim': decim,
            'ica': ica,
            'features': features
        }
    else:
        return None

def create_objective (X, Y, file_prefix, verbose = True):
    def objective (trial):
        # función para entrenar y evaluar tu modelo
        score = train_and_evaluate_model (trial, X, Y, file_prefix, verbose = verbose)
        return score

    # Definir el pruner: en este caso, usando el pruner MedianPruner
    pruner = optuna.pruners.MedianPruner (n_startup_trials = 5, n_warmup_steps = 10)

    return objective, pruner

def check_and_load_files (channels = 's', ica = True, features = False,
                          freq = 30, decim = 4, verbose = False):

    # Check if the directory exists, if not, raise an exception
    if not os.path.exists("./dataset"):
        raise FileNotFoundError("El directorio de datasets no se encuentra disponible.")

    # Name Files EEG data
    file_RC_y = f"./dataset/dt_Younger_{channels}{'_ICA_' if features else '_'}{'features_' if ica else ''}{freq}_{decim}.npy"
    file_RC_o = f"./dataset/dt_Older_{channels}{'_ICA_' if features else '_'}{'features_' if ica else ''}{freq}_{decim}.npy"

    if verbose: print (f'file_RC_y: {file_RC_y}')
    if verbose: print (f'file_RC_o: {file_RC_o}')

    # Verificar si los archivos existen
    if not os.path.exists(file_RC_y) or not os.path.exists(file_RC_o):
        raise FileNotFoundError("Los ficheros de datasets no se encuentran disponibles.")

    # Cargar los datasets
    dataset_RC_y = np.load (file_RC_y)
    dataset_RC_o = np.load (file_RC_o)

    if verbose: print(f'dataset_RC_y: {dataset_RC_y.shape}')
    if verbose: print(f'dataset_RC_o: {dataset_RC_o.shape}')

    # Concatenar los datasets
    dt_classifier = np.concatenate ((dataset_RC_y, dataset_RC_o), axis = 0)

    # Crear etiquetas para cada instancia
    zeros_list = [0] * 23
    ones_list  = [1] * 24

    # Concatenar las dos listas
    dt_labels = np.array (zeros_list + ones_list)

    # Verificar las formas resultantes
    if verbose: print ("Forma de los datos concatenados:", dt_classifier.shape)
    if verbose: print ("Forma de las etiquetas concatenadas:", dt_labels.shape)

    # Asignar datos y etiquetas
    X = dt_classifier
    Y = dt_labels

    # Define the directory path para registrar los valores de lso hiperparametros y las metricas asociadas
    directory = "./optim_param_results"
        # Check if the directory exists, if not, create it
    if not os.path.exists(directory):
        os.makedirs(directory)

    return X, Y

def main(args):
    # Implementa la lógica principal del programa aquí
    if args.verbose:
        print("Running with the following parameters:")
        print(f"Frequency: {args.freq}")
        print(f"Decimation: {args.decim}")
        print(f"Select Channels: {args.selectChannels}")
        print(f"direct_files: {args.direct_files}")
        print(f"ICA: {'Yes' if args.icaFlag else 'No'}")
        print(f"Features: {'Yes' if args.featuresFlag else 'No'}")
        print(f"Verbose: {args.verbose}")
    # Aquí iría el resto del procesamiento del programa
    # Define variables based on arguments
    freq           = args.freq
    decim          = args.decim
    ica            = args.icaFlag
    features       = args.featuresFlag
    verbose        = args.verbose
    n_trials       = args.trials
    directory_path = args.direct_files
    selec_channels = args.selectChannels
    # Load and display dimensions of the dataset files
    files = os.listdir( directory_path )

    zeros_list = [0] * 23
    ones_list  = [1] * 24
    # Concatenar las dos listas
    dt_labels = np.array(zeros_list + ones_list)
    freq  = 30
    decim = 4
    selectChannels_options = ['s','z', 'd', 'i','eeg']
    ICAflag_options        = [True, False]
    feature_options        = [True, False]

    for selectChannels in selectChannels_options:
        for ICAflag in ICAflag_options:
            for featuresFlag in feature_options:
                X, y_labels = check_and_load_files (channels = selectChannels, ica = ICAflag,
                                       features = featuresFlag, freq = 30, decim = 4, verbose = verbose)

                try:
                    file_prefix = f"dt_result_{'features_' if features else ''}{channels}_eeg_{'ica' if ica else 'noica'}_{freq}_{decim}"

                    if verbose: print (f'file_prefix: {file_prefix}')
                    # Define objective function for Optuna
                    objective, pruner = create_objective (X, y_labels, file_prefix, verbose = verbose)
                    study             = optuna.create_study (direction = 'maximize', pruner = pruner)
                    study.optimize (objective, n_trials = n_trials)

                    best_trial = study.best_trial
                    with open (f"{file_prefix}_pt.txt", "a") as f:
                        f.write ("\n---\n")
                        f.write ("Mejor configuración:\n")
                        f.write (f"{best_trial.params}\n")
                        f.write (f"Mejor puntuación (f1_score): {best_trial.value}\n")
                        for key, value in trial.params.items():
                            f.write ( "    {}: {}".format( key, value ) )

                    if verbose: print ("Mejor configuración:", best_trial.params)
                    if verbose: print ("Mejor puntuación:", best_trial.value)
                except Exception as e:
                    print (e)
if __name__ == "__main__":
    if 'ipykernel' in sys.modules:
        args = argparse.Namespace(
            direct_files   ='./dataset',
            freq           = 30,
            decim          = 4,
            selectChannels = 's',  # s, z, d, i, eeg
            trials         = 1000,
            icaFlag        = True,
            featuresFlag   = True,
            verbose        = True
        )
    else:
        # Parse command-line arguments
        parser = argparse.ArgumentParser(description='EEG Clustering Optimization')
        parser.add_argument ('--direct_files',   type = str, default ='./dataset', help='Directory for EEG data files')
        parser.add_argument ('--freq',           type = int, default = 30, help='Frequency for EEG data processing')
        parser.add_argument ('--decim',          type = int, default = 4, help='Decimation factor for EEG data processing')
        parser.add_argument ('--selectChannels', type = str, default = 's', help='Select channels for EEG data')
        parser.add_argument ('--trials',         type = int, default = 1, help = "El número de iteraciones (por defecto es 4).")
        parser.add_argument ('--icaFlag',      action = 'store_true', help = 'Flag to apply ICA on EEG data')
        parser.add_argument ('--featuresFlag', action = 'store_true', help = 'Flag to use features for EEG data')
        parser.add_argument ('--verbose',      action = 'store_true', help = 'Flag to enable verbose output')
        args = parser.parse_args()

    main(args)

ModuleNotFoundError: No module named 'optuna'