In [14]:
import numpy as np
import scipy.io
from scipy import signal
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.metrics import (
    confusion_matrix,
    accuracy_score,
    recall_score,
    f1_score,
    ConfusionMatrixDisplay,
)
import matplotlib.pyplot as plt
import seaborn as sns
from braindecode.models import EEGNetv4
import copy
import random
import pandas as pd
import tqdm
from pathlib import Path


torch.cuda.is_available()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.cuda.get_device_name(0)

'NVIDIA GeForce GTX 1650'

# Funções de apoio

In [3]:
def plot_learning_curves(train_losses, val_losses, train_accuracies, val_accuracies):
    epochs = range(1, len(train_losses) + 1)

    # Curva de Perda
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, label="Perda Treinamento")
    plt.plot(epochs, val_losses, label="Perda Validação")
    plt.xlabel("Épocas")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Curva de Perda: Treino e validação")

    # Curva de Acurácia
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accuracies, label="Acurácia Treinamento")
    plt.plot(epochs, val_accuracies, label="Acurácia Validação")
    plt.xlabel("Épocas")
    plt.ylabel("Acurácia (%)")
    plt.legend()
    plt.title("Curva de Acurácia: Treino e validação")

    plt.tight_layout()
    plt.show()

In [4]:
def separar_em_janelas(matriz, tamanho_janela, incluir_ultimo=False):
    # Calcular o número total de linhas
    total_linhas = matriz.shape[0]

    # Calcular o número de grupos necessário
    num_janelas = total_linhas // tamanho_janela

    # Inicializar uma lista vazia para armazenar os grupos
    janelas = []

    # Iterar sobre a matriz, pegando as linhas de tamanho_grupo em tamanho_grupo e armazenando em grupos
    for i in range(0, total_linhas, tamanho_janela):
        janela = matriz[i : i + tamanho_janela]  # Pegar o grupo de tamanho_grupo linhas
        janelas.append(janela)  # Adicionar o grupo à lista

    # Se incluir_ultimo for False e houver linhas restantes, remover o último grupo
    if not incluir_ultimo and total_linhas % tamanho_janela != 0:
        janelas.pop()

    return janelas, num_janelas

In [5]:
def bandpass_filter(
    dados, taxa_amostragem, freq_corte_low, freq_corte_high, ordem_filtro
):
    """
    Filtra dados EEG utilizando um filtro Butterworth passa-banda.
    Parâmetros:
    dados (ndarray): Dados do EEG com formato (número de eletrodos, número de amostras, número de frequências, número de trials).
    taxa_amostragem (int): Frequência de amostragem dos sinais EEG (Hz).
    freq_corte_low (float): Frequência de corte inferior do filtro passa-banda (Hz).
    freq_corte_high (float): Frequência de corte superior do filtro passa-banda (Hz).
    ordem_filtro (int): Ordem do filtro Butterworth.

    Retorna:
    ndarray: Dados EEG filtrados.
    """

    # **Construção do filtro passa-banda**
    # Cria o filtro passa-banda com os parâmetros especificados
    b, a = signal.butter(
        ordem_filtro,
        [freq_corte_low, freq_corte_high],
        btype="bandpass",
        analog=False,
        output="ba",
        fs=taxa_amostragem,
    )

    # **Filtragem dos dados**
    # Realiza o processo de filtragem para todas as frequências, trials e eletrodos
    num_eletrodos, num_amostras, num_freqs, num_trials = dados.shape

    filtered_data = np.zeros_like(dados)
    # Filtra os dados para cada frequência, trial e eletrodo
    for f in range(num_freqs):  # Para cada frequência de estimulação
        for trial in range(num_trials):  # Para cada trial
            for eletrodo in range(num_eletrodos):  # Para cada eletrodo
                # Filtra o sinal com o filtro de fase zero
                eletrodo_filtrado = signal.filtfilt(b, a, dados[eletrodo, :, f, trial])
                # Substitui o dado original pelo filtrado
                filtered_data[eletrodo, :, f, trial] = eletrodo_filtrado

    return filtered_data

In [36]:
def train(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    num_epochs=100,
    device=0,
    save_path="best_model.pth",
):
    best_val_accuracy = 0.0
    model.to(device)
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []
    for epoch in tqdm.notebook.tqdm(range(num_epochs)):
        # Training Phase
        model.train()
        running_loss = 0.0
        train_correct = 0
        train_total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            # eval train
            _, preds = torch.max(outputs, 1)
            train_correct += (preds == labels).sum().item()
            train_total += labels.size(0)

        train_accuracy = train_correct / train_total
        avg_train_loss = running_loss / len(train_loader)

        # eval validation
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        with torch.inference_mode():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                # val accuracy
                _, preds = torch.max(outputs, 1)
                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)

        val_accuracy = val_correct / val_total
        avg_val_loss = val_loss / len(val_loader)

        train_losses.append(avg_train_loss)
        val_losses.append(avg_val_loss)
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        # Save if best vall acc
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            best_model = copy.deepcopy(model.state_dict())
            torch.save(model.state_dict(), save_path)
            # print(f"Best model saved with accuracy: {best_val_accuracy:.4f}")

        print(
            f"Epoch {epoch + 1}/{num_epochs}: "
            f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, "
            f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}"
        )
    plot_learning_curves(train_losses, val_losses, train_accuracies, val_accuracies)
    model.load_state_dict(best_model)
    return model

In [28]:
def evaluate(model, test_loader, chanels):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.inference_mode():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Metrics
    accuracy = accuracy_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds, average="weighted")
    f1 = f1_score(all_labels, all_preds, average="weighted")

    cm = confusion_matrix(all_labels, all_preds)

    print(f"Test set Accuracy: {accuracy:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

    classes = np.unique(np.concatenate((all_labels, all_preds)))

    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
    fig, ax = plt.subplots(figsize=(15, 15))  # aumenta a figura
    disp.plot(
        ax=ax, cmap="Blues", xticks_rotation="vertical"
    )  # rotaciona os rótulos para caber melhor
    plt.show()

    # print(all_labels)
    # print(all_preds)

    return accuracy, recall, f1, cm

# Configurações de treino

In [24]:
frequencias_e_fases = scipy.io.loadmat(
    "C:/Users/machi/Documents/Mestrado/repos/data/benchmark/Freq_Phase.mat"
)
frequencias = frequencias_e_fases["freqs"]
frequencias = np.round(frequencias, 2).ravel()
fases = frequencias_e_fases["phases"]

# Parâmetros de filtragem
filter_order = 10
freq_cut_high = 70
freq_cut_low = 6

# Parâmetros de pré-processamento
sample_rate = 250
delay = 160  # 160 amostras, 0,5s (sem estimulação) + 0,14s (latencia para começo da evocação)

# Parâmetros de janelas e sessões
tamanho_da_janela = 1
tamanho_da_janela = int(np.ceil(tamanho_da_janela * sample_rate))

# Eletrodos e frequências de interesse
occipital_electrodes = np.array([47, 53, 54, 55, 56, 57, 60, 61, 62])
frequencias_desejadas = frequencias[:8]
indices = [np.where(frequencias == freq)[0][0] for freq in frequencias_desejadas]

epochs = 2


exp_dir = Path("10_freqs_1s_cm")

In [25]:
print("Frequências de interesse:", frequencias_desejadas)
print("Indices das frequências de interesse:", indices)

Frequências de interesse: [ 8.  9. 10. 11. 12. 13. 14. 15.]
Indices das frequências de interesse: [0, 1, 2, 3, 4, 5, 6, 7]


In [37]:
def load_data_from_all_users():
    all_data = []
    for user in tqdm.notebook.tqdm(range(1, 11), desc="Carregando dados dos usuários"):
        file_path = (
            f"C:/Users/machi/Documents/Mestrado/repos/data/benchmark/S{user}.mat"
        )
        # Carregar e preparar dados
        data = scipy.io.loadmat(file_path)["data"]
        data = bandpass_filter(
            data, sample_rate, freq_cut_low, freq_cut_high, filter_order
        )
        data = data[:, (delay) : (delay + 1250), :, :]

        all_data.append(data)
    return all_data

In [38]:
all_data = load_data_from_all_users()

Carregando dados dos usuários:   0%|          | 0/10 [00:00<?, ?it/s]

# Leave-one-user-out Cross Validation

Deixa um usuário de fora e treina em todos os outros

In [None]:
seed = 42
torch.cuda.manual_seed(seed)
torch.manual_seed(seed)

# Lista para armazenar as métricas de todos os usuários e sessões
metricas_usuarios = []

exp_dir.mkdir(parents=True, exist_ok=True)

# Leave-one-user-out cross-validation
for user in range(1, len(all_data) + 1):
    print(f"Processando Usuário {user}")

    n_freqs_sel = len(indices)

    metricas_crossval = []

    users_train = [u for u in range(1, len(all_data) + 1) if u != user]
    user_test = user

    x_train = []
    labels_train = []

    x_test = []
    labels_test = []

    for u in users_train:
        data = all_data[u - 1]
        for session in range(data.shape[3]):
            for freq in range(len(indices)):
                eeg_trial = data[occipital_electrodes, :, indices[freq], session]

                eeg_trial_janelas, numero_janelas = separar_em_janelas(
                    eeg_trial.T, tamanho_da_janela
                )
                eeg_trial_janelas_array = np.stack(eeg_trial_janelas)
                eeg_trial_janelas_array = eeg_trial_janelas_array.transpose(0, 2, 1)

                # print(eeg_trial_janelas_array.shape)

                x_train.append(eeg_trial_janelas_array)

                # Adiciona um rótulo 'freq' para cada janela gerada
                labels_train.extend([frequencias[freq]] * numero_janelas)
    x_train = np.concatenate(x_train, axis=0)

    data = all_data[user_test - 1]
    for session in range(data.shape[3]):
        for freq in range(len(indices)):

            eeg_trial = data[occipital_electrodes, :, indices[freq], session]

            eeg_trial_janelas, numero_janelas = separar_em_janelas(
                eeg_trial.T, tamanho_da_janela
            )
            eeg_trial_janelas_array = np.stack(eeg_trial_janelas)
            eeg_trial_janelas_array = eeg_trial_janelas_array.transpose(0, 2, 1)
            x_test.append(eeg_trial_janelas_array)
            # Adiciona um rótulo 'freq' para cada janela gerada
            labels_test.extend([frequencias[freq]] * numero_janelas)

    x_test = np.concatenate(x_test, axis=0)

    # Mapeamento de rótulos
    mapeamento = {rotulo: i for i, rotulo in enumerate(sorted(frequencias_desejadas))}
    rotulos_treinamento = torch.tensor(
        [mapeamento[rotulo.item()] for rotulo in labels_train]
    )
    labels_test = torch.tensor([mapeamento[rotulo.item()] for rotulo in labels_test])

    # Converter para tensores
    x_train = torch.from_numpy(x_train.copy()).float().to(device)
    X_test = torch.from_numpy(x_test.copy()).float().to(device)
    Y_treino = rotulos_treinamento.to(torch.long).to(device)
    Y_teste = labels_test.to(torch.long).to(device)

    print("x_train:", x_train.shape)
    print("X_test:", X_test.shape)
    print("Y_treino:", Y_treino.shape)
    print("Y_teste:", Y_teste.shape)

    # Configurar modelo e treinamento
    model = EEGNetv4(
        n_chans=9,
        n_outputs=len(frequencias_desejadas),
        n_times=tamanho_da_janela,
        kernel_length=(tamanho_da_janela // 2),
    )
    model = model.to(device)

    dataset = TensorDataset(x_train, Y_treino)
    train_size = int(0.85 * len(dataset))
    val_size = len(dataset) - train_size

    train_dataset, val_dataset = random_split(
        dataset,
        [train_size, val_size],
        generator=torch.Generator().manual_seed(seed),
    )

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
    test_loader = DataLoader(
        TensorDataset(X_test, Y_teste), batch_size=10, shuffle=False
    )
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    # Treinar
    best_model = train(
        model,
        train_loader,
        val_loader,
        criterion,
        optimizer,
        num_epochs=epochs,
        device=device,
        save_path=exp_dir.joinpath(f"best_model_user_{user}.pth"),
    )

    # Avaliar
    accuracy, recall, f1, cm = evaluate(
        best_model, test_loader, chanels=len(frequencias_desejadas)
    )

    # Armazenar métricas
    metricas_crossval.append(
        {
            "usuario": user,
            "acuracia": accuracy,
            "recall": recall,
            "f1-score": f1,
            "confusion_matrix": cm,
        }
    )

    print(
        f"Usuário {user} Finalizado: Acurácia={accuracy:.4f}, Recall={recall:.4f}, F1={f1:.4f}"
    )

    # Salvar as métricas de cada usuário
    metricas_usuarios.extend(metricas_crossval)
    # =====================================
    # Salvar resultados finais em CSV
    # =====================================

    # Criar um DataFrame com as métricas
    df_metricas = pd.DataFrame(metricas_usuarios)

    # Salvar como CSV
    df_metricas.to_csv(exp_dir.joinpath(f"{exp_dir}_metricas.csv"), index=False)

    print("-" * 50)

# Leave one-session-out cross validation (DNN and TRCANet style)
Train a general model on 5 sessions, then from there fine-tune a model for each user using his 5 session again and validate on the sixth

In [30]:
frequencias_e_fases = scipy.io.loadmat(
    "C:/Users/machi/Documents/Mestrado/repos/data/benchmark/Freq_Phase.mat"
)
frequencias = frequencias_e_fases["freqs"]
frequencias = np.round(frequencias, 2).ravel()
fases = frequencias_e_fases["phases"]

# Parâmetros de filtragem
filter_order = 10
freq_cut_high = 70
freq_cut_low = 6

# Parâmetros de pré-processamento
sample_rate = 250
delay = 160  # 160 amostras, 0,5s (sem estimulação) + 0,14s (latencia para começo da evocação)

# Parâmetros de janelas e sessões
tamanho_da_janela = 1
tamanho_da_janela = int(np.ceil(tamanho_da_janela * sample_rate))

# Eletrodos e frequências de interesse
occipital_electrodes = np.array([47, 53, 54, 55, 56, 57, 60, 61, 62])
frequencias_desejadas = frequencias[:8]
indices = [np.where(frequencias == freq)[0][0] for freq in frequencias_desejadas]

epochs_general = 3
epochs_fine_tune = 2

exp_dir = Path("leave-one-session-out-10_freqs_1s")

In [None]:
seed = 42
torch.cuda.manual_seed(seed)
torch.manual_seed(seed)

exp_dir.mkdir(parents=True, exist_ok=True)
general_models_dir = exp_dir.joinpath("general_models")
general_models_dir.mkdir(parents=True, exist_ok=True)
fine_tuned_models_dir = exp_dir.joinpath("fine_tuned_models")
fine_tuned_models_dir.mkdir(parents=True, exist_ok=True)

# Lista para armazenar as métricas de todos os usuários e sessões
metricas_usuarios = []

num_sessions = 6
num_users = len(all_data)

for sessao_teste in range(num_sessions):  # Leave-one-session-out
    print(f"Sessão de teste: {sessao_teste}")

    # 1. Treinar modelo geral em 5 sessões de todos os usuários
    x_train_general = []
    y_train_general = []

    for user in range(num_users):
        data = all_data[user]
        for sessao in range(num_sessions):
            if sessao == sessao_teste:
                continue
            for freq in range(len(indices)):
                eeg_trial = data[occipital_electrodes, :, indices[freq], sessao]
                eeg_trial_janelas, numero_janelas = separar_em_janelas(
                    eeg_trial.T, tamanho_da_janela
                )
                eeg_trial_janelas_array = np.stack(eeg_trial_janelas).transpose(0, 2, 1)
                x_train_general.append(eeg_trial_janelas_array)
                y_train_general.extend([frequencias[freq]] * numero_janelas)

    x_train_general = np.concatenate(x_train_general, axis=0)

    # Mapeamento de rótulos
    mapeamento = {rotulo: i for i, rotulo in enumerate(sorted(frequencias_desejadas))}
    y_train_general_tensor = torch.tensor(
        [mapeamento[rotulo.item()] for rotulo in y_train_general]
    )

    X_train_general = torch.from_numpy(x_train_general.copy()).float().to(device)
    Y_train_general = y_train_general_tensor.to(torch.long).to(device)

    # Treinar modelo geral
    model_general = EEGNetv4(
        n_chans=9,
        n_outputs=len(frequencias_desejadas),
        n_times=tamanho_da_janela,
        kernel_length=(tamanho_da_janela // 2),
    ).to(device)

    dataset_general = TensorDataset(X_train_general, Y_train_general)
    train_size = int(0.85 * len(dataset_general))
    val_size = len(dataset_general) - train_size
    train_dataset, val_dataset = random_split(
        dataset_general,
        [train_size, val_size],
        generator=torch.Generator().manual_seed(seed),
    )
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model_general.parameters(), lr=0.0001)

    print("Treinando modelo geral...")
    best_model_general = train(
        model_general,
        train_loader,
        val_loader,
        criterion,
        optimizer,
        num_epochs=epochs_general,
        device=device,
        save_path=general_models_dir.joinpath(
            f"best_general_model_session_{sessao_teste}.pth"
        ),
    )

    # 2. Para cada usuário, fine-tune e validação na sessão deixada de fora
    for user in range(num_users):
        print(f"Usuário {user+1} - Fine-tuning e validação na sessão {sessao_teste}")
        data = all_data[user]

        # Fine-tune: usar as 5 sessões do usuário exceto a de teste
        x_finetune = []
        y_finetune = []
        for sessao in range(num_sessions):
            if sessao == sessao_teste:
                continue
            for freq in range(len(indices)):
                eeg_trial = data[occipital_electrodes, :, indices[freq], sessao]
                eeg_trial_janelas, numero_janelas = separar_em_janelas(
                    eeg_trial.T, tamanho_da_janela
                )
                eeg_trial_janelas_array = np.stack(eeg_trial_janelas).transpose(0, 2, 1)
                x_finetune.append(eeg_trial_janelas_array)
                y_finetune.extend([frequencias[freq]] * numero_janelas)
        x_finetune = np.concatenate(x_finetune, axis=0)
        y_finetune_tensor = torch.tensor(
            [mapeamento[rotulo.item()] for rotulo in y_finetune]
        )

        X_finetune = torch.from_numpy(x_finetune.copy()).float().to(device)
        Y_finetune = y_finetune_tensor.to(torch.long).to(device)

        # Teste: usar a sessão deixada de fora
        x_test = []
        y_test = []
        for freq in range(len(indices)):
            eeg_trial = data[occipital_electrodes, :, indices[freq], sessao_teste]
            eeg_trial_janelas, numero_janelas = separar_em_janelas(
                eeg_trial.T, tamanho_da_janela
            )
            eeg_trial_janelas_array = np.stack(eeg_trial_janelas).transpose(0, 2, 1)
            x_test.append(eeg_trial_janelas_array)
            y_test.extend([frequencias[freq]] * numero_janelas)
        x_test = np.concatenate(x_test, axis=0)
        y_test_tensor = torch.tensor([mapeamento[rotulo.item()] for rotulo in y_test])

        X_test = torch.from_numpy(x_test.copy()).float().to(device)
        Y_test = y_test_tensor.to(torch.long).to(device)

        # Fine-tune: carregar pesos do modelo geral
        model_finetune = EEGNetv4(
            n_chans=9,
            n_outputs=len(frequencias_desejadas),
            n_times=tamanho_da_janela,
            kernel_length=(tamanho_da_janela // 2),
        ).to(device)
        model_finetune.load_state_dict(best_model_general.state_dict())

        dataset_finetune = TensorDataset(X_finetune, Y_finetune)
        train_size = int(0.85 * len(dataset_finetune))
        val_size = len(dataset_finetune) - train_size
        train_dataset, val_dataset = random_split(
            dataset_finetune,
            [train_size, val_size],
            generator=torch.Generator().manual_seed(seed),
        )
        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
        test_loader = DataLoader(
            TensorDataset(X_test, Y_test), batch_size=10, shuffle=False
        )

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model_finetune.parameters(), lr=0.00005)

        print("Fine-tuning modelo para o usuário...")
        best_model_finetune = train(
            model_finetune,
            train_loader,
            val_loader,
            criterion,
            optimizer,
            num_epochs=epochs_fine_tune,
            device=device,
        )

        # Avaliar
        accuracy, recall, f1, cm = evaluate(
            best_model_finetune, test_loader, chanels=len(frequencias_desejadas)
        )

        # Armazenar métricas
        metricas_usuarios.append(
            {
                "usuario": user + 1,
                "sessao_teste": sessao_teste,
                "acuracia": accuracy,
                "recall": recall,
                "f1-score": f1,
                "confusion_matrix": cm,
            }
        )

        print(
            f"Usuário {user+1} - Sessão {sessao_teste} Finalizada: Acurácia={accuracy:.4f}, Recall={recall:.4f}, F1={f1:.4f}"
        )

    print("-" * 50)

    # =====================================
    # Salvar resultados finais em CSV
    # =====================================

    df_metricas = pd.DataFrame(metricas_usuarios)
    df_metricas.to_csv(
        exp_dir.joinpath(f"{exp_dir}_metricas_leave_one_session_out_finetune.csv"),
        index=False,
    )