In [None]:
%pip install pyedflib antropy scikit-learn numpy scipy pandas google-api-python-client google-auth-httplib2 google-auth-oauthlib

In [1]:
import numpy as np
import os
import re
import time
import io
import gc
import pyedflib
import pandas as pd
from scipy.signal import welch
from scipy.stats import skew, kurtosis
import antropy
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.utils import resample
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
import warnings

warnings.filterwarnings("ignore", category=RuntimeWarning)

Funções de Acesso ao Google Drive

In [2]:
SCOPES = ['https://www.googleapis.com/auth/drive.readonly']

def get_drive_service():
    creds = None
    creds_folder = 'credentials'
    token_path = os.path.join(creds_folder, 'token.json')
    credentials_path = os.path.join(creds_folder, 'credentials.json')

    os.makedirs(creds_folder, exist_ok=True)

    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, SCOPES)

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            if not os.path.exists(credentials_path):
                raise FileNotFoundError(
                    f"ERRO CRÍTICO: O arquivo 'credentials.json' não foi encontrado dentro da pasta '{creds_folder}'."
                )
            flow = InstalledAppFlow.from_client_secrets_file(credentials_path, SCOPES)
            creds = flow.run_local_server(port=0)

        with open(token_path, 'w') as token:
            token.write(creds.to_json())

    try:
        service = build('drive', 'v3', credentials=creds)
        print("Serviço do Google Drive conectado com sucesso.")
        return service
    except Exception as e:
        print(f"Erro ao construir o serviço do Drive: {e}")
        return None


def find_folder_id(service, folder_name, parent_id='root'):
    query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and '{parent_id}' in parents"
    results = service.files().list(q=query, fields="files(id, name)").execute()
    items = results.get('files', [])
    return items[0]['id'] if items else None


def find_folder_id_by_path(service, path_components):
    current_parent_id = 'root'
    for folder_name in path_components:
        query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and '{current_parent_id}' in parents"
        results = service.files().list(q=query, fields="files(id)").execute()
        items = results.get('files', [])
        if not items:
            print(f"Pasta '{folder_name}' não encontrada em '{current_parent_id}'.")
            return None
        current_parent_id = items[0]['id']
    print("Caminho do dataset encontrado no Drive!")
    return current_parent_id


def get_files_from_drive_folder(service, folder_id):
    query = f"'{folder_id}' in parents"
    results = service.files().list(q=query, fields="files(id, name)").execute()
    return {file['name']: file['id'] for file in results.get('files', [])}


def download_file_locally(service, file_id, local_filename):
    request = service.files().get_media(fileId=file_id)
    with io.FileIO(local_filename, 'wb') as fh:
        downloader = MediaIoBaseDownload(fh, request)
        done = False
        while not done:
            status, done = downloader.next_chunk()
            if status:
                print(f"Download {int(status.progress() * 100)}% concluído...", end="\r")
    print(f"Arquivo salvo em: {local_filename}")


Funções de Processamento de Dados

In [3]:
def parse_summary_file(file_path):
    seizure_info = {}; current_file = ""
    with open(file_path, 'r', errors='ignore') as f: lines = f.readlines()
    for i, line in enumerate(lines):
        line = line.strip()
        if line.startswith("File Name:"): 
            current_file = line.split(': ')[1]; seizure_info[current_file] = []
        elif line.startswith("Seizure Start Time:") and i + 1 < len(lines):
            start_time = int(re.search(r'(\d+)', line).group(1)); end_line = lines[i+1].strip()
            if end_line.startswith("Seizure End Time:"): 
                end_time = int(re.search(r'(\d+)', end_line).group(1))
                seizure_info[current_file].append((start_time, end_time))
    return seizure_info

def extract_single_feature_vector(eeg_window, fs=256):
    freqs, psd = welch(eeg_window, fs=fs, nperseg=len(eeg_window)); total_power = np.sum(psd)
    def get_band_power(f_low, f_high): return np.sum(psd[np.logical_and(freqs >= f_low, freqs <= f_high)])
    delta, theta, alpha, beta, gamma = get_band_power(0.5, 4), get_band_power(4, 8), get_band_power(8, 13), get_band_power(13, 30), get_band_power(30, 80)
    band_powers = [p / total_power if total_power > 0 else 0 for p in [delta, theta, alpha, beta, gamma]]
    ratios = [beta / alpha if alpha > 0 else 0, (delta + theta) / (alpha + beta) if (alpha + beta) > 0 else 0]
    entropies = [antropy.perm_entropy(eeg_window, normalize=True), antropy.spectral_entropy(eeg_window, sf=fs, method='welch', normalize=True), antropy.sample_entropy(eeg_window)]
    stats = [np.mean(np.abs(eeg_window)), np.std(eeg_window), skew(eeg_window), kurtosis(eeg_window)]
    features = [total_power] + band_powers + ratios + entropies + stats
    if len(features) < 46: features.extend([np.mean(features) if features else 0] * (46 - len(features)))
    return np.array(features[:46])

Classe do Modelo HDC

In [4]:
from sklearn.metrics.pairwise import cosine_similarity

class HDC:
    def __init__(self, dimensions, num_features, num_levels, num_classes=2, seed=42):
        np.random.seed(seed)
        self.D, self.num_features, self.num_levels, self.num_classes = dimensions, num_features, num_levels, num_classes
        self.level_vectors = np.random.choice([-1, 1], size=(num_levels, self.D))
        self.feature_vectors = np.random.choice([-1, 1], size=(num_features, self.D))
        self.class_prototypes = np.zeros((self.num_classes, self.D))

    def _quantize(self, data, num_levels):
        min_val, max_val = np.min(data), np.max(data)
        if max_val == min_val: return np.zeros_like(data, dtype=int)
        return np.round((data - min_val) / (max_val - min_val) * (num_levels - 1)).astype(int)

    def encode(self, x_data):
        num_samples, num_features = x_data.shape
        x_quantized = np.array([self._quantize(x_data[:, i], self.num_levels) for i in range(num_features)]).T
        encoded_data = np.zeros((num_samples, self.D))
        for i in range(num_samples):
            sample_hv = np.sum([self.feature_vectors[f] * self.level_vectors[x_quantized[i, f]] for f in range(num_features)], axis=0)
            encoded_data[i] = np.sign(sample_hv) if np.any(sample_hv) else np.zeros(self.D)
        return encoded_data

    def predict(self, x_encoded):
        similarities = cosine_similarity(x_encoded, self.class_prototypes)
        return np.argmax(similarities, axis=1)

    def train_standard(self, x_encoded, y_train):
        self.class_prototypes = np.array([np.sum(x_encoded[y_train == i], axis=0) for i in range(self.num_classes)])

    def train_multipass(self, x_encoded, y_train, epochs=10, lr=0.05, subtract_wrong=True, initial_training=True):
        if initial_training: self.train_standard(x_encoded, y_train)
        for _ in range(epochs):
            y_pred = self.predict(x_encoded)
            for i in range(len(y_train)):
                if y_pred[i] != y_train[i]:
                    self.class_prototypes[y_train[i]] += lr * x_encoded[i]
                    if subtract_wrong: self.class_prototypes[1-y_train[i]] -= lr * x_encoded[i]

    def train_multicentroid(self, x_encoded, y_train, threshold=0.25, reduce=True):
        prototypes, proto_labels = [], []
        for i in range(len(y_train)):
            sample_hv, correct_label = x_encoded[i], y_train[i]; best_sim, best_proto_idx = -1, -1
            if prototypes:
                similarities = cosine_similarity(sample_hv.reshape(1, -1), np.array(prototypes))[0]
                for j, label in enumerate(proto_labels):
                    if label == correct_label and similarities[j] > best_sim: 
                        best_sim, best_proto_idx = similarities[j], j
            if best_sim < threshold: 
                prototypes.append(sample_hv); proto_labels.append(correct_label)
            else: 
                prototypes[best_proto_idx] += sample_hv
        final_prototypes = np.zeros((self.num_classes, self.D))
        for label in range(self.num_classes):
            indices = [i for i, l in enumerate(proto_labels) if l == label]
            if indices: final_prototypes[label] = np.sum(np.array(prototypes)[indices], axis=0)
        self.class_prototypes = final_prototypes


Função de Pós-Processamento

In [5]:
def post_process_predictions(predictions, window_size=5, merge_gap=30, fs=0.5):
    smoothed_preds = np.copy(predictions)
    for i in range(len(predictions)):
        start, end = max(0, i - window_size // 2), min(len(predictions), i + window_size // 2 + 1)
        if np.mean(predictions[start:end]) < 0.5: smoothed_preds[i] = 0
    return smoothed_preds

**Parâmetros para a entrega da Semana 1**

In [15]:
PATIENT_ID = 'chb01'
DRIVE_PATH_COMPONENTS = ['TCC EPILEPSIA DATA', 'chb-mit-scalp-eeg-database-1.0.0']

MAX_GRAVACOES_POR_PACIENTE = 10
DIMENSIONS = 10000
NUM_LEVELS = 100
EPOCHS_MULTIPASS = 12          
LEARNING_RATE_MULTIPASS = 0.1
THRESHOLD_MULTICENTROID = 0.25

SMOOTHING_WINDOW_SIZE = 5      
MERGE_SEIZURES_THRESHOLD = 30  
MAX_CHANNELS = None              

start_time = time.time()
try:
    service = get_drive_service()
    main_folder_id = find_folder_id_by_path(service, DRIVE_PATH_COMPONENTS)
    patient_folder_id = find_folder_id(service, PATIENT_ID, parent_id=main_folder_id)
    drive_files = get_files_from_drive_folder(service, patient_folder_id)

    summary_filename = f"{PATIENT_ID}-summary.txt"
    local_summary_path = "./temp_summary.txt"
    download_file_locally(service, drive_files[summary_filename], local_summary_path)
    seizure_times = parse_summary_file(local_summary_path)
    os.remove(local_summary_path)

    edf_files = sorted([n for n in drive_files.keys() if n.endswith('.edf')])[:MAX_GRAVACOES_POR_PACIENTE]

    all_features, all_labels = [], []
    print(f"\n--- Processando {len(edf_files)} arquivos ---")
    for edf_file in edf_files:
        local_temp_path = f"./temp_{edf_file}"
        print(f"Processando: {edf_file}...")
        try:
            download_file_locally(service, drive_files[edf_file], local_temp_path)
            with pyedflib.EdfReader(local_temp_path) as f:
                fs = f.getSampleFrequency(0)

                if MAX_CHANNELS is None:
                    n_channels = f.signals_in_file
                else:
                    n_channels = min(int(MAX_CHANNELS), int(f.signals_in_file))

                signals = np.array([f.readSignal(c) for c in range(n_channels)])

                window_samples = int(fs * 2)
                step = max(1, window_samples // 2)

                total = signals.shape[1]
                if total < window_samples:
                    n_windows = 0
                else:
                    n_windows = 1 + (total - window_samples) // step

                for j in range(n_windows):
                    start = j * step
                    end = start + window_samples
                    window = signals[:, start:end]

                    is_seizure = any(
                        max(start/fs, s_start) < min(end/fs, s_end)
                        for s_start, s_end in seizure_times.get(edf_file, [])
                    )

                    feats = np.mean(
                        [extract_single_feature_vector(window[c, :], fs) for c in range(n_channels)],
                        axis=0
                    )
                    all_features.append(feats)
                    all_labels.append(1 if is_seizure else 0)
        finally:
            if os.path.exists(local_temp_path):
                os.remove(local_temp_path)

    gc.collect()

    X, y = np.array(all_features), np.array(all_labels)
    X_scaled = StandardScaler().fit_transform(X)

    X_seiz, X_non = X_scaled[y==1], X_scaled[y==0]
    n_non = min(len(X_non), len(X_seiz))
    X_non_bal = resample(X_non, replace=False, n_samples=n_non, random_state=42)

    X_bal = np.vstack((X_seiz, X_non_bal))
    y_bal = np.hstack((np.ones(len(X_seiz)), np.zeros(n_non)))

    X_train, X_test, y_train, y_test = train_test_split(
        X_bal, y_bal, test_size=0.3, random_state=42, stratify=y_bal
    )

    NUM_FEATURES = X_train.shape[1]
    hdc_encoder = HDC(DIMENSIONS, NUM_FEATURES, NUM_LEVELS, seed=42)
    X_train_hd = hdc_encoder.encode(X_train)
    X_test_hd = hdc_encoder.encode(X_test)
    print("Codificação concluída.")

    strategies = ['Padrão', 'Multi-Pass', 'Multi-Centroid', 'MC+MP']
    results = {}

    for strat in strategies:
        print(f"\n--- TREINANDO E AVALIANDO ESTRATÉGIA: {strat} ---")
        hdc_model = HDC(DIMENSIONS, NUM_FEATURES, NUM_LEVELS, seed=42)

        if strat == 'Padrão':
            hdc_model.train_standard(X_train_hd, y_train)
        elif strat == 'Multi-Pass':
            hdc_model.train_multipass(X_train_hd, y_train,
                                      epochs=EPOCHS_MULTIPASS,
                                      lr=LEARNING_RATE_MULTIPASS,
                                      subtract_wrong=True)
        elif strat == 'Multi-Centroid':
            hdc_model.train_multicentroid(X_train_hd, y_train,
                                          threshold=THRESHOLD_MULTICENTROID,
                                          reduce=True)
        elif strat == 'MC+MP':
            hdc_model.train_multicentroid(X_train_hd, y_train,
                                          threshold=THRESHOLD_MULTICENTROID,
                                          reduce=True)
            hdc_model.train_multipass(X_train_hd, y_train,
                                      epochs=EPOCHS_MULTIPASS,
                                      lr=LEARNING_RATE_MULTIPASS,
                                      subtract_wrong=True,
                                      initial_training=False)

        preds = hdc_model.predict(X_test_hd)
        results[strat] = {'Predições': preds}

except Exception as e:
    print(f"Erro: {e}")
finally:
    print(f"\nTempo total: {(time.time() - start_time)/60:.2f} min", flush=True)

if 'results' in locals() and results:
    print(f"\n\n--- RESULTADOS PACIENTE: {PATIENT_ID} ---")
    for strat, data in results.items():
        y_pred_smoothed = post_process_predictions(
            data['Predições'],
            window_size=SMOOTHING_WINDOW_SIZE,
            merge_gap=MERGE_SEIZURES_THRESHOLD,
            fs=0.5
        )
        cm = confusion_matrix(y_test, y_pred_smoothed, labels=[0, 1])
        print(f"\nEstratégia: {strat}")
        print(f"  F1: {f1_score(y_test, y_pred_smoothed, zero_division=0):.4f}, "
              f"Precisão: {precision_score(y_test, y_pred_smoothed, zero_division=0):.4f}, "
              f"Sensibilidade: {recall_score(y_test, y_pred_smoothed, zero_division=0):.4f}")
        print("  Matriz de Confusão:")
        print(f"    VN: {cm[0][0]} | FP: {cm[0][1]}")
        print(f"    FN: {cm[1][0]} | VP: {cm[1][1]}")


Serviço do Google Drive conectado com sucesso.
Caminho do dataset encontrado no Drive!
Arquivo salvo em: ./temp_summary.txt

--- Processando 10 arquivos ---
Processando: chb01_01.edf...
Arquivo salvo em: ./temp_chb01_01.edf
Processando: chb01_02.edf...
Arquivo salvo em: ./temp_chb01_02.edf
Processando: chb01_03.edf...
Arquivo salvo em: ./temp_chb01_03.edf
Processando: chb01_04.edf...
Arquivo salvo em: ./temp_chb01_04.edf
Processando: chb01_05.edf...
Arquivo salvo em: ./temp_chb01_05.edf
Processando: chb01_06.edf...
Arquivo salvo em: ./temp_chb01_06.edf
Processando: chb01_07.edf...
Arquivo salvo em: ./temp_chb01_07.edf
Processando: chb01_08.edf...
Arquivo salvo em: ./temp_chb01_08.edf
Processando: chb01_09.edf...
Arquivo salvo em: ./temp_chb01_09.edf
Processando: chb01_10.edf...
Arquivo salvo em: ./temp_chb01_10.edf
Codificação concluída.

--- TREINANDO E AVALIANDO ESTRATÉGIA: Padrão ---

--- TREINANDO E AVALIANDO ESTRATÉGIA: Multi-Pass ---
Erro: only integers, slices (`:`), ellipsis (`

38 minutos e 10 segundos

In [16]:
PATIENT_ID = 'chb01'
DRIVE_PATH_COMPONENTS = ['TCC EPILEPSIA DATA', 'chb-mit-scalp-eeg-database-1.0.0']

MAX_GRAVACOES_POR_PACIENTE = 10
DIMENSIONS = 10000
NUM_LEVELS = 100
EPOCHS_MULTIPASS = 20          
LEARNING_RATE_MULTIPASS = 0.1
THRESHOLD_MULTICENTROID = 0.25

SMOOTHING_WINDOW_SIZE = 7      
MERGE_SEIZURES_THRESHOLD = 30  
MAX_CHANNELS = 16              

start_time = time.time()
try:
    service = get_drive_service()
    main_folder_id = find_folder_id_by_path(service, DRIVE_PATH_COMPONENTS)
    patient_folder_id = find_folder_id(service, PATIENT_ID, parent_id=main_folder_id)
    drive_files = get_files_from_drive_folder(service, patient_folder_id)

    summary_filename = f"{PATIENT_ID}-summary.txt"
    local_summary_path = "./temp_summary.txt"
    download_file_locally(service, drive_files[summary_filename], local_summary_path)
    seizure_times = parse_summary_file(local_summary_path)
    os.remove(local_summary_path)

    edf_files = sorted([n for n in drive_files.keys() if n.endswith('.edf')])[:MAX_GRAVACOES_POR_PACIENTE]

    all_features, all_labels = [], []
    print(f"\n--- Processando {len(edf_files)} arquivos ---")
    for edf_file in edf_files:
        local_temp_path = f"./temp_{edf_file}"
        print(f"Processando: {edf_file}...")
        try:
            download_file_locally(service, drive_files[edf_file], local_temp_path)
            with pyedflib.EdfReader(local_temp_path) as f:
                fs = f.getSampleFrequency(0)

                if MAX_CHANNELS is None:
                    n_channels = f.signals_in_file
                else:
                    n_channels = min(int(MAX_CHANNELS), int(f.signals_in_file))

                signals = np.array([f.readSignal(c) for c in range(n_channels)])

                window_samples = int(fs * 2)
                step = max(1, window_samples // 2)

                total = signals.shape[1]
                if total < window_samples:
                    n_windows = 0
                else:
                    n_windows = 1 + (total - window_samples) // step

                for j in range(n_windows):
                    start = j * step
                    end = start + window_samples
                    window = signals[:, start:end]

                    is_seizure = any(
                        max(start/fs, s_start) < min(end/fs, s_end)
                        for s_start, s_end in seizure_times.get(edf_file, [])
                    )

                    feats = np.mean(
                        [extract_single_feature_vector(window[c, :], fs) for c in range(n_channels)],
                        axis=0
                    )
                    all_features.append(feats)
                    all_labels.append(1 if is_seizure else 0)
        finally:
            if os.path.exists(local_temp_path):
                os.remove(local_temp_path)

    gc.collect()

    X, y = np.array(all_features), np.array(all_labels)
    X_scaled = StandardScaler().fit_transform(X)

    X_seiz, X_non = X_scaled[y==1], X_scaled[y==0]
    n_non = min(len(X_non), len(X_seiz))
    X_non_bal = resample(X_non, replace=False, n_samples=n_non, random_state=42)

    X_bal = np.vstack((X_seiz, X_non_bal))
    y_bal = np.hstack((np.ones(len(X_seiz)), np.zeros(n_non)))

    X_train, X_test, y_train, y_test = train_test_split(
        X_bal, y_bal, test_size=0.3, random_state=42, stratify=y_bal
    )

    NUM_FEATURES = X_train.shape[1]
    hdc_encoder = HDC(DIMENSIONS, NUM_FEATURES, NUM_LEVELS, seed=42)
    X_train_hd = hdc_encoder.encode(X_train)
    X_test_hd = hdc_encoder.encode(X_test)
    print("Codificação concluída.")

    strategies = ['Padrão', 'Multi-Pass', 'Multi-Centroid', 'MC+MP']
    results = {}

    for strat in strategies:
        print(f"\n--- TREINANDO E AVALIANDO ESTRATÉGIA: {strat} ---")
        hdc_model = HDC(DIMENSIONS, NUM_FEATURES, NUM_LEVELS, seed=42)

        if strat == 'Padrão':
            hdc_model.train_standard(X_train_hd, y_train)
        elif strat == 'Multi-Pass':
            hdc_model.train_multipass(X_train_hd, y_train,
                                      epochs=EPOCHS_MULTIPASS,
                                      lr=LEARNING_RATE_MULTIPASS,
                                      subtract_wrong=True)
        elif strat == 'Multi-Centroid':
            hdc_model.train_multicentroid(X_train_hd, y_train,
                                          threshold=THRESHOLD_MULTICENTROID,
                                          reduce=True)
        elif strat == 'MC+MP':
            hdc_model.train_multicentroid(X_train_hd, y_train,
                                          threshold=THRESHOLD_MULTICENTROID,
                                          reduce=True)
            hdc_model.train_multipass(X_train_hd, y_train,
                                      epochs=EPOCHS_MULTIPASS,
                                      lr=LEARNING_RATE_MULTIPASS,
                                      subtract_wrong=True,
                                      initial_training=False)

        preds = hdc_model.predict(X_test_hd)
        results[strat] = {'Predições': preds}

except Exception as e:
    print(f"Erro: {e}")
finally:
    print(f"\nTempo total: {(time.time() - start_time)/60:.2f} min", flush=True)

if 'results' in locals() and results:
    print(f"\n\n--- RESULTADOS PACIENTE: {PATIENT_ID} ---")
    for strat, data in results.items():
        y_pred_smoothed = post_process_predictions(
            data['Predições'],
            window_size=SMOOTHING_WINDOW_SIZE,
            merge_gap=MERGE_SEIZURES_THRESHOLD,
            fs=0.5
        )
        cm = confusion_matrix(y_test, y_pred_smoothed, labels=[0, 1])
        print(f"\nEstratégia: {strat}")
        print(f"  F1: {f1_score(y_test, y_pred_smoothed, zero_division=0):.4f}, "
              f"Precisão: {precision_score(y_test, y_pred_smoothed, zero_division=0):.4f}, "
              f"Sensibilidade: {recall_score(y_test, y_pred_smoothed, zero_division=0):.4f}")
        print("  Matriz de Confusão:")
        print(f"    VN: {cm[0][0]} | FP: {cm[0][1]}")
        print(f"    FN: {cm[1][0]} | VP: {cm[1][1]}")

Serviço do Google Drive conectado com sucesso.
Caminho do dataset encontrado no Drive!
Arquivo salvo em: ./temp_summary.txt

--- Processando 10 arquivos ---
Processando: chb01_01.edf...
Arquivo salvo em: ./temp_chb01_01.edf
Processando: chb01_02.edf...
Arquivo salvo em: ./temp_chb01_02.edf
Processando: chb01_03.edf...
Arquivo salvo em: ./temp_chb01_03.edf
Processando: chb01_04.edf...
Arquivo salvo em: ./temp_chb01_04.edf
Processando: chb01_05.edf...
Arquivo salvo em: ./temp_chb01_05.edf
Processando: chb01_06.edf...
Arquivo salvo em: ./temp_chb01_06.edf
Processando: chb01_07.edf...
Arquivo salvo em: ./temp_chb01_07.edf
Processando: chb01_08.edf...
Arquivo salvo em: ./temp_chb01_08.edf
Processando: chb01_09.edf...
Arquivo salvo em: ./temp_chb01_09.edf
Processando: chb01_10.edf...
Arquivo salvo em: ./temp_chb01_10.edf
Codificação concluída.

--- TREINANDO E AVALIANDO ESTRATÉGIA: Padrão ---

--- TREINANDO E AVALIANDO ESTRATÉGIA: Multi-Pass ---
Erro: only integers, slices (`:`), ellipsis (`

33 minutos e 5 segundos

In [6]:
PATIENT_ID = 'chb01'
DRIVE_PATH_COMPONENTS = ['TCC EPILEPSIA DATA', 'chb-mit-scalp-eeg-database-1.0.0']

MAX_GRAVACOES_POR_PACIENTE = 10
DIMENSIONS = 10000
NUM_LEVELS = 100
EPOCHS_MULTIPASS = 12          
LEARNING_RATE_MULTIPASS = 0.1
THRESHOLD_MULTICENTROID = 0.25

SMOOTHING_WINDOW_SIZE = 5      
MERGE_SEIZURES_THRESHOLD = 30  
MAX_CHANNELS = None              

start_time = time.time()
results = {}

try:
    service = get_drive_service()
    main_folder_id = find_folder_id_by_path(service, DRIVE_PATH_COMPONENTS)
    patient_folder_id = find_folder_id(service, PATIENT_ID, parent_id=main_folder_id)
    drive_files = get_files_from_drive_folder(service, patient_folder_id)

    summary_filename = f"{PATIENT_ID}-summary.txt"
    local_summary_path = "./temp_summary.txt"
    download_file_locally(service, drive_files[summary_filename], local_summary_path)
    seizure_times = parse_summary_file(local_summary_path)
    os.remove(local_summary_path)

    edf_files = sorted([n for n in drive_files.keys() if n.endswith('.edf')])[:MAX_GRAVACOES_POR_PACIENTE]

    all_features, all_labels = [], []
    print(f"\n--- Processando {len(edf_files)} arquivos ---")
    for edf_file in edf_files:
        local_temp_path = f"./temp_{edf_file}"
        print(f"Processando: {edf_file}...")
        try:
            download_file_locally(service, drive_files[edf_file], local_temp_path)
            with pyedflib.EdfReader(local_temp_path) as f:
                fs = f.getSampleFrequency(0)

                if MAX_CHANNELS is None:
                    n_channels = f.signals_in_file
                else:
                    n_channels = min(int(MAX_CHANNELS), int(f.signals_in_file))

                signals = np.array([f.readSignal(c) for c in range(n_channels)])

                window_samples = int(fs * 2)
                step = max(1, window_samples // 2)

                total = signals.shape[1]
                if total < window_samples:
                    n_windows = 0
                else:
                    n_windows = 1 + (total - window_samples) // step

                for j in range(n_windows):
                    start = j * step
                    end = start + window_samples
                    window = signals[:, start:end]

                    is_seizure = any(
                        max(start/fs, s_start) < min(end/fs, s_end)
                        for s_start, s_end in seizure_times.get(edf_file, [])
                    )

                    feats = np.mean(
                        [extract_single_feature_vector(window[c, :], fs) for c in range(n_channels)],
                        axis=0
                    )
                    all_features.append(feats)
                    all_labels.append(1 if is_seizure else 0)
        finally:
            if os.path.exists(local_temp_path):
                os.remove(local_temp_path)

    gc.collect()

    X, y = np.array(all_features), np.array(all_labels)
    X_scaled = StandardScaler().fit_transform(X)

    X_seiz, X_non = X_scaled[y==1], X_scaled[y==0]
    n_non = min(len(X_non), len(X_seiz))
    X_non_bal = resample(X_non, replace=False, n_samples=n_non, random_state=42)

    X_bal = np.vstack((X_seiz, X_non_bal))
    y_bal = np.hstack((np.ones(len(X_seiz)), np.zeros(n_non)))

    X_train, X_test, y_train, y_test = train_test_split(
        X_bal, y_bal, test_size=0.3, random_state=42, stratify=y_bal
    )

    NUM_FEATURES = X_train.shape[1]
    hdc_encoder = HDC(DIMENSIONS, NUM_FEATURES, NUM_LEVELS, seed=42) 
    X_train_hd = hdc_encoder.encode(X_train)
    X_test_hd = hdc_encoder.encode(X_test)
    print("Codificação concluída.")

    strategies = ['Padrão', 'Multi-Pass', 'Multi-Centroid', 'MC+MP']

    for strat in strategies:
        hdc_model = HDC(DIMENSIONS, NUM_FEATURES, NUM_LEVELS, seed=42)  
        if strat == 'Padrão':
            hdc_model.train_standard(X_train_hd, y_train)
        elif strat == 'Multi-Pass':
            hdc_model.train_multipass(X_train_hd, y_train,
                                      epochs=EPOCHS_MULTIPASS,
                                      lr=LEARNING_RATE_MULTIPASS,
                                      subtract_wrong=True)
        elif strat == 'Multi-Centroid':
            hdc_model.train_multicentroid(X_train_hd, y_train,
                                          threshold=THRESHOLD_MULTICENTROID,
                                          reduce=True)
        elif strat == 'MC+MP':
            hdc_model.train_multicentroid(X_train_hd, y_train,
                                          threshold=THRESHOLD_MULTICENTROID,
                                          reduce=True)
            hdc_model.train_multipass(X_train_hd, y_train,
                                      epochs=EPOCHS_MULTIPASS,
                                      lr=LEARNING_RATE_MULTIPASS,
                                      subtract_wrong=True,
                                      initial_training=False)

        preds = hdc_model.predict(X_test_hd)
        results[strat] = {'Predições': preds}

except Exception as e:
    print(f"Erro: {e}")
finally:
    print(f"\nTempo total: {(time.time() - start_time)/60:.2f} min", flush=True)

if 'results' in locals() and results:
    all_metrics = []
    for strat, data in results.items():
        y_pred_smoothed = post_process_predictions(
            data['Predições'],
            window_size=SMOOTHING_WINDOW_SIZE,
            merge_gap=MERGE_SEIZURES_THRESHOLD,
            fs=0.5
        )
        cm = confusion_matrix(y_test, y_pred_smoothed, labels=[0, 1])

        f1 = f1_score(y_test, y_pred_smoothed, zero_division=0)
        prec = precision_score(y_test, y_pred_smoothed, zero_division=0)
        rec = recall_score(y_test, y_pred_smoothed, zero_division=0)

        all_metrics.append({
            "Estratégia": strat,
            "F1": f1,
            "Precisão": prec,
            "Sensibilidade": rec,
            "VN": cm[0][0],
            "FP": cm[0][1],
            "FN": cm[1][0],
            "VP": cm[1][1]
        })

    df_results = pd.DataFrame(all_metrics)
    print("\n=== RESULTADOS COMPARATIVOS ===")
    display(df_results)

Serviço do Google Drive conectado com sucesso.
Caminho do dataset encontrado no Drive!
Arquivo salvo em: ./temp_summary.txt

--- Processando 10 arquivos ---
Processando: chb01_01.edf...
Arquivo salvo em: ./temp_chb01_01.edf
Processando: chb01_02.edf...
Arquivo salvo em: ./temp_chb01_02.edf
Processando: chb01_03.edf...
Arquivo salvo em: ./temp_chb01_03.edf
Processando: chb01_04.edf...
Arquivo salvo em: ./temp_chb01_04.edf
Processando: chb01_05.edf...
Arquivo salvo em: ./temp_chb01_05.edf
Processando: chb01_06.edf...
Arquivo salvo em: ./temp_chb01_06.edf
Processando: chb01_07.edf...
Arquivo salvo em: ./temp_chb01_07.edf
Processando: chb01_08.edf...
Arquivo salvo em: ./temp_chb01_08.edf
Processando: chb01_09.edf...
Arquivo salvo em: ./temp_chb01_09.edf
Processando: chb01_10.edf...
Arquivo salvo em: ./temp_chb01_10.edf
Codificação concluída.
Erro: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices

Tempo total: 41.27 min

=

Unnamed: 0,Estratégia,F1,Precisão,Sensibilidade,VN,FP,FN,VP
0,Padrão,0.842105,0.941176,0.761905,20,1,5,16


In [6]:
PATIENT_ID = 'chb01'
DRIVE_PATH_COMPONENTS = ['TCC EPILEPSIA DATA', 'chb-mit-scalp-eeg-database-1.0.0']

MAX_GRAVACOES_POR_PACIENTE = 10
DIMENSIONS = 10000
NUM_LEVELS = 100
EPOCHS_MULTIPASS = 20          
LEARNING_RATE_MULTIPASS = 0.1
THRESHOLD_MULTICENTROID = 0.25

SMOOTHING_WINDOW_SIZE = 7      
MERGE_SEIZURES_THRESHOLD = 30  
MAX_CHANNELS = 16              

start_time = time.time()
results = {}

try:
    service = get_drive_service()
    main_folder_id = find_folder_id_by_path(service, DRIVE_PATH_COMPONENTS)
    patient_folder_id = find_folder_id(service, PATIENT_ID, parent_id=main_folder_id)
    drive_files = get_files_from_drive_folder(service, patient_folder_id)

    summary_filename = f"{PATIENT_ID}-summary.txt"
    local_summary_path = "./temp_summary.txt"
    download_file_locally(service, drive_files[summary_filename], local_summary_path)
    seizure_times = parse_summary_file(local_summary_path)
    os.remove(local_summary_path)

    edf_files = sorted([n for n in drive_files.keys() if n.endswith('.edf')])[:MAX_GRAVACOES_POR_PACIENTE]

    all_features, all_labels = [], []
    print(f"\n--- Processando {len(edf_files)} arquivos ---")
    for edf_file in edf_files:
        local_temp_path = f"./temp_{edf_file}"
        print(f"Processando: {edf_file}...")
        try:
            download_file_locally(service, drive_files[edf_file], local_temp_path)
            with pyedflib.EdfReader(local_temp_path) as f:
                fs = f.getSampleFrequency(0)

                if MAX_CHANNELS is None:
                    n_channels = f.signals_in_file
                else:
                    n_channels = min(int(MAX_CHANNELS), int(f.signals_in_file))

                signals = np.array([f.readSignal(c) for c in range(n_channels)])

                window_samples = int(fs * 2)
                step = max(1, window_samples // 2)

                total = signals.shape[1]
                if total < window_samples:
                    n_windows = 0
                else:
                    n_windows = 1 + (total - window_samples) // step

                for j in range(n_windows):
                    start = j * step
                    end = start + window_samples
                    window = signals[:, start:end]

                    is_seizure = any(
                        max(start/fs, s_start) < min(end/fs, s_end)
                        for s_start, s_end in seizure_times.get(edf_file, [])
                    )

                    feats = np.mean(
                        [extract_single_feature_vector(window[c, :], fs) for c in range(n_channels)],
                        axis=0
                    )
                    all_features.append(feats)
                    all_labels.append(1 if is_seizure else 0)
        finally:
            if os.path.exists(local_temp_path):
                os.remove(local_temp_path)

    gc.collect()

    X, y = np.array(all_features), np.array(all_labels)
    X_scaled = StandardScaler().fit_transform(X)

    X_seiz, X_non = X_scaled[y==1], X_scaled[y==0]
    n_non = min(len(X_non), len(X_seiz))
    X_non_bal = resample(X_non, replace=False, n_samples=n_non, random_state=42)

    X_bal = np.vstack((X_seiz, X_non_bal))
    y_bal = np.hstack((np.ones(len(X_seiz)), np.zeros(n_non)))

    X_train, X_test, y_train, y_test = train_test_split(
        X_bal, y_bal, test_size=0.3, random_state=42, stratify=y_bal
    )

    NUM_FEATURES = X_train.shape[1]
    hdc_encoder = HDC(DIMENSIONS, NUM_FEATURES, NUM_LEVELS, seed=42) 
    X_train_hd = hdc_encoder.encode(X_train)
    X_test_hd = hdc_encoder.encode(X_test)
    print("Codificação concluída.")

    strategies = ['Padrão', 'Multi-Pass', 'Multi-Centroid', 'MC+MP']

    for strat in strategies:
        hdc_model = HDC(DIMENSIONS, NUM_FEATURES, NUM_LEVELS, seed=42)  
        if strat == 'Padrão':
            hdc_model.train_standard(X_train_hd, y_train)
        elif strat == 'Multi-Pass':
            hdc_model.train_multipass(X_train_hd, y_train,
                                      epochs=EPOCHS_MULTIPASS,
                                      lr=LEARNING_RATE_MULTIPASS,
                                      subtract_wrong=True)
        elif strat == 'Multi-Centroid':
            hdc_model.train_multicentroid(X_train_hd, y_train,
                                          threshold=THRESHOLD_MULTICENTROID,
                                          reduce=True)
        elif strat == 'MC+MP':
            hdc_model.train_multicentroid(X_train_hd, y_train,
                                          threshold=THRESHOLD_MULTICENTROID,
                                          reduce=True)
            hdc_model.train_multipass(X_train_hd, y_train,
                                      epochs=EPOCHS_MULTIPASS,
                                      lr=LEARNING_RATE_MULTIPASS,
                                      subtract_wrong=True,
                                      initial_training=False)

        preds = hdc_model.predict(X_test_hd)
        results[strat] = {'Predições': preds}

except Exception as e:
    print(f"Erro: {e}")
finally:
    print(f"\nTempo total: {(time.time() - start_time)/60:.2f} min", flush=True)

if 'results' in locals() and results:
    all_metrics = []
    for strat, data in results.items():
        y_pred_smoothed = post_process_predictions(
            data['Predições'],
            window_size=SMOOTHING_WINDOW_SIZE,
            merge_gap=MERGE_SEIZURES_THRESHOLD,
            fs=0.5
        )
        cm = confusion_matrix(y_test, y_pred_smoothed, labels=[0, 1])

        f1 = f1_score(y_test, y_pred_smoothed, zero_division=0)
        prec = precision_score(y_test, y_pred_smoothed, zero_division=0)
        rec = recall_score(y_test, y_pred_smoothed, zero_division=0)

        all_metrics.append({
            "Estratégia": strat,
            "F1": f1,
            "Precisão": prec,
            "Sensibilidade": rec,
            "VN": cm[0][0],
            "FP": cm[0][1],
            "FN": cm[1][0],
            "VP": cm[1][1]
        })

    df_results = pd.DataFrame(all_metrics)
    print("\n=== RESULTADOS COMPARATIVOS ===")
    display(df_results)

Serviço do Google Drive conectado com sucesso.
Caminho do dataset encontrado no Drive!
Arquivo salvo em: ./temp_summary.txt

--- Processando 10 arquivos ---
Processando: chb01_01.edf...
Arquivo salvo em: ./temp_chb01_01.edf
Processando: chb01_02.edf...
Arquivo salvo em: ./temp_chb01_02.edf
Processando: chb01_03.edf...
Arquivo salvo em: ./temp_chb01_03.edf
Processando: chb01_04.edf...
Arquivo salvo em: ./temp_chb01_04.edf
Processando: chb01_05.edf...
Arquivo salvo em: ./temp_chb01_05.edf
Processando: chb01_06.edf...
Arquivo salvo em: ./temp_chb01_06.edf
Processando: chb01_07.edf...
Arquivo salvo em: ./temp_chb01_07.edf
Processando: chb01_08.edf...
Arquivo salvo em: ./temp_chb01_08.edf
Processando: chb01_09.edf...
Arquivo salvo em: ./temp_chb01_09.edf
Processando: chb01_10.edf...
Arquivo salvo em: ./temp_chb01_10.edf
Codificação concluída.
Erro: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices

Tempo total: 27.35 min

=

Unnamed: 0,Estratégia,F1,Precisão,Sensibilidade,VN,FP,FN,VP
0,Padrão,0.722222,0.866667,0.619048,19,2,8,13
