## Biblioteca

In [None]:
!pip install mne
!pip install mne_bids
!pip install openneuro-py
!pip install dash
!pip install dash-bootstrap-components
!pip install captum
import openneuro
from mne_bids import BIDSPath, read_raw_bids
import os
from mne.datasets import sample
from mne.preprocessing import ICA
from mne.channels import make_standard_montage
import matplotlib.pyplot as plt
from mne.datasets import sample
import pandas as pd
import seaborn as sns
import openneuro
import numpy as np
from numpy.fft import fft, ifft, fftfreq
from mne import make_fixed_length_epochs
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import torch.nn as nn
from sklearn.model_selection import train_test_split
from captum.attr import IntegratedGradients


## Banco de dados

In [None]:

# Definir o ID do dataset
dataset = "ds002778"

# Definir o diretório onde o dataset será salvo
bids_root = os.path.join(os.path.dirname(sample.data_path()), dataset)

# Criar o diretório, se ele ainda não existir
if not os.path.isdir(bids_root):
    os.makedirs(bids_root)

# Baixar todos os dados do dataset
openneuro.download(dataset=dataset, target_dir=bids_root)

In [None]:
# Diretório base dos dados no formato BIDS
bids_root = '/root/mne_data/ds002778'

# Lista para armazenar os dados carregados
eeg_data = {}

# Iterar pelos participantes
for participant_id in os.listdir(bids_root):
    if participant_id.startswith("sub-"):  # Apenas participantes
        print(f"Carregando dados para {participant_id}...")

        participant_dir = os.path.join(bids_root, participant_id)
        sessions = os.listdir(participant_dir)
        print(f"  Sessões disponíveis: {sessions}")

        try:
            # Verificar o tipo de participante: Grupo Controle (HC) ou Parkinson (PD)
            if "ses-hc" in sessions:  # Grupo Controle
                bids_path_hc = BIDSPath(root=bids_root, subject=participant_id.replace("sub-", ""),
                                        session="hc", task="rest", datatype="eeg", extension=".bdf")
                print(f"  Tentando carregar: {bids_path_hc}")
                if bids_path_hc.fpath.exists():
                    raw_hc = read_raw_bids(bids_path_hc)
                    eeg_data[f"{participant_id}_hc"] = raw_hc
                    print(f"  Dados carregados para {participant_id} - HC.")
                else:
                    print(f"  Arquivo não encontrado para {participant_id} - HC.")

            elif "ses-on" in sessions and "ses-off" in sessions:  # Grupo Parkinson
                # Carregar estado OFF
                bids_path_off = BIDSPath(root=bids_root, subject=participant_id.replace("sub-", ""),
                                         session="off", task="rest", datatype="eeg", extension=".bdf")
                print(f"  Tentando carregar: {bids_path_off}")
                if bids_path_off.fpath.exists():
                    raw_off = read_raw_bids(bids_path_off)
                    eeg_data[f"{participant_id}_off"] = raw_off
                    print(f"  Dados carregados para {participant_id} - OFF.")
                else:
                    print(f"  Arquivo não encontrado para {participant_id} - OFF.")

                # Carregar estado ON
                bids_path_on = BIDSPath(root=bids_root, subject=participant_id.replace("sub-", ""),
                                        session="on", task="rest", datatype="eeg", extension=".bdf")
                print(f"  Tentando carregar: {bids_path_on}")
                if bids_path_on.fpath.exists():
                    raw_on = read_raw_bids(bids_path_on)
                    eeg_data[f"{participant_id}_on"] = raw_on
                    print(f"  Dados carregados para {participant_id} - ON.")
                else:
                    print(f"  Arquivo não encontrado para {participant_id} - ON.")

        except Exception as e:
            print(f"Erro ao carregar dados para {participant_id}: {str(e)}")

print(f"Carregamento de dados concluído! Total de participantes carregados: {len(eeg_data)}")


In [None]:
# Extrair IDs únicos dos pacientes
unique_patients = set(participant.split("_")[0] for participant in eeg_data.keys())

# Contar o número total de pacientes únicos
total_unique_patients = len(unique_patients)

# Exibir os IDs únicos e o número total de pacientes únicos
print("Pacientes únicos carregados:")
for patient in unique_patients:
    print(patient)
print(f"\nTotal de pacientes únicos: {total_unique_patients}")

## Pré-processamento


#### Aplicar filtro Notch para remover ruídos de linha elétrica.

In [None]:
# Frequências para o filtro Notch (60 Hz e 120 Hz)
notch_freqs = [60, 120]

# Iterar pelos dados carregados
for participant, raw_data in eeg_data.items():
    try:
        print(f"Carregando dados para {participant} na memória...")
        raw_data.load_data()  # Carregar dados na memória

        print(f"Aplicando filtro Notch para {participant}...")
        raw_notch = raw_data.notch_filter(freqs=notch_freqs)
        eeg_data[participant] = raw_notch  # Atualizar os dados no dicionário

        print(f"Filtro Notch aplicado para {participant}.")
    except Exception as e:
        print(f"Erro ao aplicar filtro Notch para {participant}: {e}")

print("Filtro Notch aplicado a todos os participantes.")


#### Passa-Banda

In [None]:
# Definir os limites do filtro passa-banda
low_freq = 0.5  # Limite inferior
high_freq = 50  # Limite superior

# Aplicar o filtro para cada conjunto de dados
for participant_id, raw_data in eeg_data.items():
    try:
        print(f"Aplicando filtro passa-banda ({low_freq}-{high_freq} Hz) para {participant_id}...")

        # Certificar-se de que os dados estão na memória
        raw_data.load_data()

        # Aplicar o filtro passa-banda
        raw_filtered = raw_data.filter(l_freq=low_freq, h_freq=high_freq)

        # Substituir os dados antigos pelos filtrados no dicionário
        eeg_data[participant_id] = raw_filtered
        print(f"Filtro passa-banda aplicado para {participant_id}.")

    except Exception as e:
        print(f"Erro ao aplicar filtro passa-banda para {participant_id}: {e}")

print("Filtro passa-banda aplicado a todos os participantes.")

#### Verificação dos Dados Pós-Filtro

In [None]:


# Diretório temporário para salvar os gráficos
output_dir = "/mnt/data/output_graphics"
os.makedirs(output_dir, exist_ok=True)

# Função para verificar o sinal e a PSD e salvar os gráficos
def verificar_sinal_psd_salvar(raw_data, title, save_path):
    # Garantir que o diretório de salvamento existe
    os.makedirs(save_path, exist_ok=True)

    # Visualizar e salvar o sinal de EEG
    signal_path = os.path.join(save_path, f"{title.replace(' ', '_')}_signal.png")
    plt.figure(figsize=(15, 5))
    raw_data.plot(start=0, duration=10, n_channels=10, title=title, show=False)
    plt.savefig(signal_path, dpi=300)
    plt.close()

    # Visualizar e salvar a PSD
    psd_path = os.path.join(save_path, f"{title.replace(' ', '_')}_psd.png")
    plt.figure(figsize=(10, 5))
    raw_data.plot_psd(fmin=0.5, fmax=50, average=True, show=False)
    plt.title(f"PSD: {title}")
    plt.savefig(psd_path, dpi=300)
    plt.close()

    return signal_path, psd_path

# Simulação de estrutura de `eeg_data` e exemplo de uso
try:

    # Criar um dicionário para análise
    samples = {
        "PD_OFF - sub-pd17_off": eeg_data["sub-pd17_off"],
        "PD_ON - sub-pd17_on": eeg_data["sub-pd17_on"],
        "HC - sub-hc24_hc": eeg_data["sub-hc24_hc"]
    }

    # Verificar os dados e salvar gráficos
    saved_files = []
    for group, raw in samples.items():
        print(f"Salvando gráficos para {group}...")
        files = verificar_sinal_psd_salvar(raw, f"Sinal e PSD - {group}", save_path=output_dir)
        saved_files.extend(files)

    print("Gráficos salvos nos seguintes arquivos:")
    for file in saved_files:
        print(file)

except Exception as e:
    print(f"Erro ao processar os dados: {e}")

#### Remoção de artefatos

In [None]:


# Configuração geral de ICA e salvamento dos gráficos
def aplicar_ica_e_salvar_graficos(eeg_data, channels_to_remove, output_dir, montage_type="standard_1020"):
    dados_limpos = {}
    montage = make_standard_montage(montage_type)  # Configurar montagem padrão

    # Criar diretório de saída, se não existir
    os.makedirs(output_dir, exist_ok=True)

    for participant_id, raw_data in eeg_data.items():
        try:
            print(f"Iniciando ICA para {participant_id}...")

            # Verificar e remover canais específicos
            raw_data.drop_channels([ch for ch in channels_to_remove if ch in raw_data.info['ch_names']])

            # Aplicar montagem e carregar dados na memória
            raw_data.set_montage(montage)
            raw_data.load_data()

            # Configurar ICA
            ica = ICA(n_components=20, random_state=42, max_iter='auto')

            # Ajustar ICA aos dados filtrados
            print(f"Ajustando ICA para {participant_id}...")
            ica.fit(raw_data.copy().pick_types(eeg=True))

            # Identificar artefatos relacionados a EOG usando o canal frontal (Fp1)
            print(f"Identificando artefatos relacionados a EOG para {participant_id}...")
            eog_indices, eog_scores = ica.find_bads_eog(raw_data, ch_name='Fp1')

            # Exibir os índices dos componentes relacionados a EOG
            print(f"Componentes relacionados a EOG para {participant_id}: {eog_indices}")

            # Marcar os componentes identificados para exclusão
            ica.exclude = eog_indices

            # Aplicar ICA para limpar os dados
            print(f"Aplicando ICA para limpar os dados de {participant_id}...")
            raw_cleaned = ica.apply(raw_data.copy())

            # Adicionar dados limpos ao dicionário
            dados_limpos[participant_id] = raw_cleaned

            # Criar diretório específico para o participante
            participant_dir = os.path.join(output_dir, participant_id)
            os.makedirs(participant_dir, exist_ok=True)

            # Salvar o gráfico dos componentes ICA
            plot_path = os.path.join(participant_dir, f"{participant_id}_ica_components.png")
            print(f"Salvando gráfico de componentes ICA para {participant_id} em {plot_path}...")
            ica.plot_components(show=False)
            plt.savefig(plot_path)
            plt.close()

            print(f"ICA aplicada e gráfico salvo com sucesso para {participant_id}.\n")
        except Exception as e:
            print(f"Erro ao processar {participant_id}: {e}")

    return dados_limpos

# Lista de canais a serem removidos
channels_to_remove = ['EXG1', 'EXG2', 'EXG3', 'EXG4', 'EXG5', 'EXG6', 'EXG7', 'EXG8', 'Status']

# Diretório para salvar os gráficos
output_dir = "/mnt/data/ica_plots"

# Aplicar ICA para todos os participantes e salvar gráficos
dados_limpos = aplicar_ica_e_salvar_graficos(eeg_data, channels_to_remove, output_dir)

In [None]:
# Identificar os dados limpos para cada condição
psd_on = dados_limpos.get("sub-pd9_on")   # Substitua por identificador correto para condição ON
psd_off = dados_limpos.get("sub-pd9_off") # Substitua por identificador correto para condição OFF
psd_hc = dados_limpos.get("sub-hc31_hc")  # Substitua por identificador correto para HC

# Verificar se os dados estão disponíveis
dados_psd = {
    "on": psd_on,
    "off": psd_off,
    "hc": psd_hc
}

# Iterar pelas condições e calcular o PSD
for condition, dados  in dados_psd.items():
    if dados is not None:
        try:
            print(f"Calculando PSD para a condição {condition}...")

            # Calcular o PSD (com limites de frequência ajustados)
            psd = dados.copy().pick_types(eeg=True).compute_psd(fmin=0.5, fmax=50.0)

            # Plotar o PSD
            plt.clf()  # Limpar gráficos anteriores
            psd.plot()

            # Melhorar o título
            plt.suptitle(f"Densidade Espectral de Potência (PSD) - Condição {condition.upper()}",
                         fontsize=16,
                         fontweight='bold',
                         y=1.05)  # Centralizado e ajustado acima do gráfico

            plt.show()

        except Exception as e:
            print(f"Erro ao calcular ou plotar o PSD para {condition}: {e}")
    else:
        print(f"Dados para a condição '{condition}' não encontrados. Certifique-se de que os dados estão disponíveis.")

## Normalização dos dados limpos

In [None]:
# Função para normalizar os dados (Z-score normalization)
def z_score_normalization(data):


    mean = np.mean(data, axis=1, keepdims=True)  # Média por canal
    std = np.std(data, axis=1, keepdims=True)    # Desvio padrão por canal
    return (data - mean) / std

# Aplicar normalização aos dados limpos
dados_normalizados = {}
for participant_id, raw_cleaned in dados_limpos.items():
    data = raw_cleaned.get_data()  # Extrair os dados em formato de array [n_channels, n_samples]
    normalized_data = z_score_normalization(data)  # Normalizar os dados
    dados_normalizados[participant_id] = normalized_data  # Armazenar os dados normalizados

    print(f"Normalização aplicada para {participant_id}. Dimensões: {normalized_data.shape}")


## Segmentação

In [None]:
# Parâmetros de segmentação
segment_duration = 2.0  # Duração de cada segmento em segundos
overlap = 0.2    # Proporção de sobreposição (exemplo: 50%)
sfreq = 256  # Frequência de amostragem (ajuste para os seus dados)

# Função para realizar a segmentação
def segmentar_dados(data, sfreq, segment_duration, overlap):
    n_channels, n_samples = data.shape
    segment_length = int(segment_duration * sfreq)  # Comprimento de cada segmento em pontos
    step = int(segment_length * (1 - overlap))      # Passo entre segmentos

    # Validação do parâmetro 'step'
    if step <= 0:
        raise ValueError("O parâmetro 'overlap' deve ser menor que 1.0.")

    segments = []
    for start in range(0, n_samples - segment_length + 1, step):
        end = start + segment_length
        segments.append(data[:, start:end])

    # Incluir o último segmento, se necessário
    if n_samples % step != 0 and (n_samples - segment_length) % step != 0:
        segments.append(data[:, -segment_length:])

    return np.array(segments)  # [n_segments, n_channels, n_samples_per_segment]

# Aplicar segmentação aos dados normalizados
dados_segmentados = {}
for participant_id, normalized_data in dados_normalizados.items():
    try:
        segmentos = segmentar_dados(normalized_data, sfreq, segment_duration, overlap)
        dados_segmentados[participant_id] = segmentos  # Armazenar os segmentos
        print(f"Segmentação concluída para {participant_id}. Segmentos: {segmentos.shape}")
    except Exception as e:
        print(f"Erro ao processar {participant_id}: {e}")

In [None]:

# Nomes reais dos canais (já obtidos de raw.info['ch_names'])
nomes_canais = [
    'Fp1', 'AF3', 'F7', 'F3', 'FC1', 'FC5', 'T7', 'C3', 'CP1', 'CP5',
    'P7', 'P3', 'Pz', 'PO3', 'O1', 'Oz', 'O2', 'PO4', 'P4', 'P8',
    'CP6', 'CP2', 'C4', 'T8', 'FC6', 'FC2', 'F4', 'F8', 'AF4', 'Fp2',
    'Fz', 'Cz'
]

# Exemplo de estrutura de pacientes
pacientes = {
    "ON": dados_segmentados["sub-pd6_on"],  # Substitua com seus dados reais
    "OFF": dados_segmentados["sub-pd6_off"],  # Substitua com seus dados reais
    "HC": dados_segmentados["sub-hc10_hc"]  # Substitua com seus dados reais
}

# Ajustando para visualizar todos os canais e mais segmentos
n_canais = len(nomes_canais)  # Total de canais disponíveis no arquivo
n_segmentos = 1  # Visualizar apenas 1 segmento por classe

# Criar o gráfico novamente
fig, axes = plt.subplots(n_segmentos, len(pacientes), figsize=(15, 15), sharex=True, sharey=True)

# Garantir que `axes` seja uma matriz bidimensional mesmo com 1 segmento
if n_segmentos == 1:
    axes = np.expand_dims(axes, axis=0)

for col, (classe, segmentos) in enumerate(pacientes.items()):
    for row in range(n_segmentos):
        # Selecionar o segmento
        segmento = segmentos[row, :n_canais, :]  # Seleciona os canais e amostras do segmento

        # Criar vetor de tempo
        tempo = np.linspace(0, segmento.shape[1] / sfreq, segmento.shape[1])

        # Plotar cada canal do segmento
        for canal in range(segmento.shape[0]):
            axes[row, col].plot(tempo, segmento[canal, :] + canal * 10, label=nomes_canais[canal])  # Deslocamento para visualização

        # Ajustar título e eixos
        axes[row, col].set_title(f"{classe} - Segmento {row + 1}")
        axes[row, col].set_xlabel("Tempo (s)")
        axes[row, col].set_ylabel("Amplitude (μV)")

        # Adicionar legenda apenas no primeiro segmento de cada classe
        if row == 0:
            axes[row, col].legend(loc="upper right", fontsize=8)

# Ajustar layout e salvar a figura
fig.suptitle("Segmentos dos Canais de EEG (ON, OFF, HC)", fontsize=16)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])

# Salvar a figura
fig_path = "/mnt/data/segmentos_eeg_reais_raw.png"
plt.savefig(fig_path, dpi=300)  # Salvar com alta qualidade
plt.show()

# Informar o caminho do arquivo salvo
print(f"Figura salva em: {fig_path}")




In [None]:
output_dir = "segmentos_plotados"
os.makedirs(output_dir, exist_ok=True)

# Plotar e salvar gráficos dos segmentos
for participant_id, segments in dados_segmentados.items():
    print(f"Plotando segmentos para {participant_id}...")
    num_segments = min(3, segments.shape[0])  # Mostrar até 3 segmentos
    for i in range(num_segments):
        plt.figure(figsize=(10, 4))
        plt.plot(segments[i, 0, :])  # Plotar apenas o primeiro canal
        plt.title(f"{participant_id} - Segmento {i+1}")
        plt.xlabel("Tempo (amostras)")
        plt.ylabel("Amplitude")
        plt.grid(True)
        # Salvar o gráfico
        output_path = os.path.join(output_dir, f"{participant_id}_segmento_{i+1}.png")
        plt.savefig(output_path)
        plt.close()
        print(f"Gráfico salvo em: {output_path}")


## Treino e Teste

In [None]:
# Configurar o dispositivo (GPU ou CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")

# Função para combinar dados e rótulos para HC, OFF e ON
def preparar_dados_segmentados(dados_segmentados):
    X = []
    y = []
    for participant_id, segmentos in dados_segmentados.items():
        if "hc" in participant_id:
            label = 0  # HC
        elif "off" in participant_id:
            label = 1  # OFF
        elif "on" in participant_id:
            label = 2  # ON
        else:
            raise ValueError(f"Rótulo desconhecido para o participante {participant_id}")

        X.append(segmentos)
        y.extend([label] * segmentos.shape[0])

    X = np.concatenate(X, axis=0)  # Combinar todos os segmentos
    y = np.array(y)  # Criar array de rótulos
    return X, y

# Preparar os dados segmentados
X, y = preparar_dados_segmentados(dados_segmentados)

# Dividir os dados em 90% treino e 10% teste
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42, stratify=y)

# Exibir tamanhos dos conjuntos
print(f"Treino: {X_train.shape}, Teste: {X_test.shape}")

# Converter para tensores PyTorch e transferir para o dispositivo
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)

# Criar TensorDataset e DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Exibir informações dos DataLoaders
print(f"Número de batches - Treino: {len(train_loader)}, Teste: {len(test_loader)}")

## Modelo Transformer para classficação



In [None]:

# Classe PositionalEncoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        if x.size(1) > self.encoding.size(1):
            raise ValueError(
                f"Comprimento da entrada ({x.size(1)}) excede o máximo permitido ({self.encoding.size(1)}). "
                "Ajuste `max_len` na PositionalEncoding."
            )
        return x + self.encoding[:, :x.size(1), :].to(x.device)

# Modelo Transformer para EEG
class TransformerEEG(nn.Module):
    def __init__(self, n_channels, n_classes, seq_len, d_model=128, nhead=8, num_layers=3, dim_feedforward=256, dropout=0.3):
        super(TransformerEEG, self).__init__()
        self.input_proj = nn.Linear(n_channels, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len=seq_len)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(d_model, n_classes)

    def forward(self, x):
        x = self.input_proj(x.transpose(1, 2))  # Projeção dos canais
        x = self.positional_encoding(x)
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)  # Pooling global
        x = self.fc(x)
        return x

# Configuração do modelo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
seq_len = 1024
n_classes = 3
model = TransformerEEG(
    n_channels=32,
    n_classes=n_classes,
    seq_len=seq_len,
    d_model=128,
    nhead=8,
    num_layers=3,
    dim_feedforward=256,
    dropout=0.3
).to(device)

# Função de perda e otimizador
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0003)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

# Treinamento
n_epochs = 15
train_losses = []

for epoch in range(n_epochs):
    model.train()
    train_loss = 0
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_loader)
    train_losses.append(train_loss)

    print(f"Época {epoch + 1}/{n_epochs} - Loss Treino: {train_loss:.4f}")

# Avaliação no conjunto de teste
model.eval()
correct = 0
total = 0
y_pred = []
y_true = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X)
        _, predicted = torch.max(outputs, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(batch_y.cpu().numpy())
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

accuracy = correct / total
print(f"Acurácia no conjunto de teste: {accuracy:.4f}")

# Relatório de classificação
print("\nRelatório de Classificação:")
print(classification_report(y_true, y_pred))

In [None]:

plt.figure(figsize=(10, 6))
plt.plot(range(1, len(train_losses) + 1), train_losses, label='Perda de Treinamento')
plt.xlabel('Épocas')
plt.ylabel('Perda')
plt.title('Perda de Treinamento')
plt.legend()
plt.show()



### Validação do modelo

#### Teste

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Avaliação no conjunto de teste
model.eval()
correct = 0
total = 0
y_pred = []
y_true = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X)
        _, predicted = torch.max(outputs, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(batch_y.cpu().numpy())
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

# Acurácia
accuracy = correct / total
print(f"Acurácia no conjunto de teste: {accuracy:.4f}")

# Relatório de classificação
print("\nRelatório de Classificação:")
print(classification_report(y_true, y_pred))

# Matriz de Confusão
conf_matrix = confusion_matrix(y_true, y_pred)


# Plotar a Matriz de Confusão
classes = np.unique(y_true)  # Nomes das classes
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=classes, yticklabels=classes)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title(" Matriz de confusão ")
plt.show()


#### Treino

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Avaliação no conjunto de treinamento
model.eval()
correct = 0
total = 0
y_pred = []
y_true = []

with torch.no_grad():
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X)
        _, predicted = torch.max(outputs, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(batch_y.cpu().numpy())
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

# Acurácia no conjunto de treinamento
accuracy = correct / total
print(f"Acurácia no conjunto de treinamento: {accuracy:.4f}")

# Relatório de classificação
print("\nRelatório de Classificação no Conjunto de Treinamento:")
print(classification_report(y_true, y_pred))

# Matriz de Confusão
conf_matrix = confusion_matrix(y_true, y_pred)

# Plotar a Matriz de Confusão
classes = np.unique(y_true)  # Nomes das classes
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=classes, yticklabels=classes)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Matriz de Confusão no Conjunto de Treinamento")
plt.show()

## Captum

In [None]:


# Certifique-se de que o modelo está em modo de avaliação
model.eval()

# Função para calcular as importâncias com Integrated Gradients
def calcular_importancias_ig(model, X_test, y_test):
    ig = IntegratedGradients(model)
    all_importances = []

    for i in range(X_test.shape[0]):
        input_tensor = X_test[i].unsqueeze(0)  # Adicionar dimensão de batch
        target = y_test[i].unsqueeze(0)  # Adicionar dimensão de batch
        attributions, _ = ig.attribute(input_tensor, target=target, return_convergence_delta=True)
        all_importances.append(attributions.squeeze(0).mean(dim=1).cpu().numpy())  # Média por canal

    return np.array(all_importances)

# Cálculo das importâncias
all_importances = calcular_importancias_ig(model, X_test_tensor, y_test_tensor)

# Nomes dos canais
try:
    nomes_canais = raw.info['ch_names']
except NameError:
    nomes_canais = [
        'Fp1', 'Fp2', 'F3', 'F4', 'C3', 'C4', 'P3', 'P4', 'O1', 'O2',
        'T3', 'T4', 'T5', 'T6', 'Fz', 'Cz', 'Pz', 'Oz', 'FC1', 'FC2',
        'CP1', 'CP2', 'PO1', 'PO2', 'AFz', 'TP1', 'TP2', 'FT1', 'FT2',
        'FCz', 'CPz', 'POz'
    ]

# Função para calcular a média das importâncias por condição
def separar_por_condicao(importancias, y_test, num_canais):
    medias_importancias = {'HC': np.zeros(num_canais),
                           'OFF': np.zeros(num_canais),
                           'ON': np.zeros(num_canais)}

    contadores = {'HC': 0, 'OFF': 0, 'ON': 0}

    for i, classe in enumerate(y_test):
        if classe == 0:  # HC
            medias_importancias['HC'] += importancias[i]
            contadores['HC'] += 1
        elif classe == 1:  # OFF
            medias_importancias['OFF'] += importancias[i]
            contadores['OFF'] += 1
        elif classe == 2:  # ON
            medias_importancias['ON'] += importancias[i]
            contadores['ON'] += 1

    # Normalizar pelas contagens
    for key in medias_importancias:
        if contadores[key] > 0:
            medias_importancias[key] /= contadores[key]

    return medias_importancias

# Calcular as médias das importâncias por condição
medias_importancias = separar_por_condicao(all_importances, y_test_tensor.cpu().numpy(), num_canais=32)

# Gerar gráficos com os nomes reais dos canais
for condicao, importancias in medias_importancias.items():
    plt.figure(figsize=(12, 6))
    plt.bar(nomes_canais, importancias)
    plt.title(f"Importância dos Canais - Condição: {condicao}")
    plt.xlabel("Canais")
    plt.ylabel("Importância Média")
    plt.xticks(rotation=45)
    plt.grid(axis='y')
    plt.tight_layout()
    plt.show()



#### Mapas topográficos

In [None]:


# Criar o info com os nomes dos canais e localização
montage = mne.channels.make_standard_montage("standard_1020")  # Alterar conforme necessário
info = mne.create_info(ch_names=nomes_canais, sfreq=256, ch_types="eeg")  # Frequência de amostragem arbitrária
info.set_montage(montage)

# Função para gerar mapas topográficos
def gerar_mapas_topograficos(importancias_por_condicao, nomes_canais, info):
    for condicao, importancias in importancias_por_condicao.items():
        # Mapear importâncias para os canais
        evoked = mne.EvokedArray(np.expand_dims(importancias, axis=1), info, tmin=0)  # tmin define o tempo inicial

        # Plotar mapa topográfico
        fig, ax = plt.subplots(1, 1, figsize=(8, 6))
        mne.viz.plot_topomap(evoked.data[:, 0], evoked.info, axes=ax, cmap="viridis", show=False)
        ax.set_title(f"Mapa Topográfico - Condição: {condicao}")
        plt.show()

# Gerar mapas topográficos
gerar_mapas_topograficos(medias_importancias, nomes_canais, info)