In [1]:
import pandas as pd
import glob
import os
import random
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from joblib import Parallel, delayed
import multiprocessing
import neurokit2 as nk 

DATA_FOLDER = 'data'
SAMPLES_PER_CLASS = 1970 
MAX_WORKERS = -1
PRE_DISPATCH = '2*n_jobs'
SAMPLING_RATE = 1000
LEADS = ['I','II','III','aVR','aVL','aVF','V1','V2','V3','V4','V5','V6']

CLASS_MAP = {
    'normal': 0,
    'arritmia': 1,
    'block': 2,
    'fibrilation': 3
}

In [2]:
def toFeature(signal: pd.core.frame.DataFrame, time = False):
    time_features = ["HRV_MeanNN", "HRV_SDNN", "HRV_RMSSD", "HRV_pNN50"]
    F = []
    for lead in LEADS:
        clean = nk.ecg_clean(signal[lead], sampling_rate=SAMPLING_RATE)
        _, rpeaks = nk.ecg_peaks(clean, sampling_rate=SAMPLING_RATE)
        rpeak_indices = rpeaks['ECG_R_Peaks']
        # Pasar de largo si no hay suficientes R-peaks
        if np.sum(rpeak_indices) < 2:
            F += [np.nan, np.nan, np.nan, np.nan]
            continue
        try:
            _, waves_peak = nk.ecg_delineate(clean, rpeaks, sampling_rate=SAMPLING_RATE, method="peak")
            mean_r = np.mean([clean[i] if not np.isnan(i) else 0 for i in rpeaks['ECG_R_Peaks']]) if np.any(rpeaks['ECG_R_Peaks']) else np.nan
            mean_p = np.mean([clean[i] if not np.isnan(i) else 0 for i in waves_peak['ECG_P_Peaks']]) if 'ECG_P_Peaks' in waves_peak else np.nan
            mean_q = np.mean([clean[i] if not np.isnan(i) else 0 for i in waves_peak['ECG_Q_Peaks']]) if 'ECG_Q_Peaks' in waves_peak else np.nan
            mean_s = np.mean([clean[i] if not np.isnan(i) else 0 for i in waves_peak['ECG_S_Peaks']]) if 'ECG_S_Peaks' in waves_peak else np.nan
        except Exception:
            mean_r = mean_p = mean_q = mean_s = np.nan
        F += [mean_r, mean_p, mean_q, mean_s]
    # Features temporales con lead II:
    clean2 = nk.ecg_clean(signal["II"], sampling_rate=SAMPLING_RATE)
    _, rpeaks = nk.ecg_peaks(clean2, sampling_rate=SAMPLING_RATE)
    valid_rpeaks = [r for r in rpeaks['ECG_R_Peaks'] if not np.isnan(r)]
    if len(valid_rpeaks) >= 2: # Seguir de largo si no hay R-peaks suficientes
        if time:
            t = nk.hrv_time(rpeaks, sampling_rate=SAMPLING_RATE)
            F.extend(t[time_features].values.flatten().tolist())
    else:
        nan_count = 0
        if time: nan_count += len(time_features)
        F.extend([np.nan] * nan_count)
    return np.array(F)

def _process_file(item):
    path, label = item
    try:
        df = pd.read_parquet(path, engine='fastparquet')
        if not set(LEADS).issubset(df.columns): return None
        df_leads = df[LEADS].apply(pd.to_numeric, errors='coerce').fillna(0).astype(np.float32)
        feat = toFeature(df_leads, time=True) 
        
        return feat, label
    except Exception:
        return None


In [3]:

file_items = []
for folder, label in CLASS_MAP.items():
    folder_path = os.path.join(DATA_FOLDER, folder)
    paths = glob.glob(os.path.join(folder_path, '*.parquet.gzip'))
    if not paths: 
        print(f"Warning: No data for {folder}, skipping")
        continue 

    if len(paths) >= SAMPLES_PER_CLASS:
        sampled = random.sample(paths, SAMPLES_PER_CLASS)
    else:
        sampled = random.choices(paths, k=SAMPLES_PER_CLASS)
    
    file_items.extend([(p, label) for p in sampled])

random.shuffle(file_items)
cpu_count = multiprocessing.cpu_count()
workers = max(1, cpu_count - 1)
results = Parallel(n_jobs=workers, backend='loky', verbose=1)(
    delayed(_process_file)(item) for item in file_items
)
valid_results = [r for r in results if r is not None]
X_raw = np.vstack([r[0] for r in valid_results])
y_all = np.array([r[1] for r in valid_results])

imputer = SimpleImputer(strategy='mean')
X_imputed = imputer.fit_transform(X_raw)

X_train, X_test, y_train, y_test = train_test_split(
    X_imputed, y_all, test_size=0.2, random_state=42, stratify=y_all
)


[Parallel(n_jobs=7)]: Using backend LokyBackend with 7 concurrent workers.
[Parallel(n_jobs=7)]: Done  36 tasks      | elapsed:   17.3s
[Parallel(n_jobs=7)]: Done 186 tasks      | elapsed:  1.1min
[Parallel(n_jobs=7)]: Done 436 tasks      | elapsed:  2.4min
[Parallel(n_jobs=7)]: Done 786 tasks      | elapsed:  4.1min
[Parallel(n_jobs=7)]: Done 1236 tasks      | elapsed:  6.4min
[Parallel(n_jobs=7)]: Done 1786 tasks      | elapsed:  9.4min
[Parallel(n_jobs=7)]: Done 2436 tasks      | elapsed: 12.8min
[Parallel(n_jobs=7)]: Done 3186 tasks      | elapsed: 16.9min
[Parallel(n_jobs=7)]: Done 4036 tasks      | elapsed: 21.4min
[Parallel(n_jobs=7)]: Done 4986 tasks      | elapsed: 26.4min
[Parallel(n_jobs=7)]: Done 6036 tasks      | elapsed: 32.0min
[Parallel(n_jobs=7)]: Done 7186 tasks      | elapsed: 37.6min
[Parallel(n_jobs=7)]: Done 7880 out of 7880 | elapsed: 41.2min finished


In [4]:

print("\n--- Estrategia 1: Multiclase Directa ---")
rf_flat = RandomForestClassifier(n_estimators=1000, n_jobs=-1)
rf_flat.fit(X_train, y_train)

y_pred_flat = rf_flat.predict(X_test)
acc_flat = accuracy_score(y_test, y_pred_flat)
print(f"Accuracy (Plano): {acc_flat:.4f}")


--- Estrategia 1: Multiclase Directa ---
Accuracy (Plano): 0.8319


In [5]:
print("\n--- Estrategia 2: Jerárquica (Segmentada) ---")

y_train_binary = (y_train != 0).astype(int) 

rf_binary = RandomForestClassifier(n_estimators=1000, class_weight='balanced', n_jobs=-1)
rf_binary.fit(X_train, y_train_binary)

mask_anormal_train = y_train != 0
X_train_sub = X_train[mask_anormal_train]
y_train_sub = y_train[mask_anormal_train]

rf_subclass = RandomForestClassifier(n_estimators=1000, class_weight='balanced', n_jobs=-1)
rf_subclass.fit(X_train_sub, y_train_sub)

y_pred_binary_test = rf_binary.predict(X_test)
y_pred_hierarchical = np.zeros_like(y_pred_binary_test)
y_pred_hierarchical[y_pred_binary_test == 0] = 0 
mask_pred_anormal = y_pred_binary_test == 1

if np.any(mask_pred_anormal):
    subclass_predictions = rf_subclass.predict(X_test[mask_pred_anormal])
    y_pred_hierarchical[mask_pred_anormal] = subclass_predictions

acc_hier = accuracy_score(y_test, y_pred_hierarchical)
print(f"Accuracy (Jerárquico): {acc_hier:.4f}")


--- Estrategia 2: Jerárquica (Segmentada) ---
Accuracy (Jerárquico): 0.8096


In [6]:
print(f"Estrategia 1 (Plano):      {acc_flat:.4f}")
print(f"Estrategia 2 (Jerárquico): {acc_hier:.4f}")

print("\nDetalle Estrategia 1:")
print(classification_report(y_test, y_pred_flat, target_names=list(CLASS_MAP.keys())))

print("\nDetalle Estrategia 2:")
print(classification_report(y_test, y_pred_hierarchical, target_names=list(CLASS_MAP.keys())))

print("\nMatriz de Confusión (Jerárquica):")
print(confusion_matrix(y_test, y_pred_hierarchical))

Estrategia 1 (Plano):      0.8319
Estrategia 2 (Jerárquico): 0.8096

Detalle Estrategia 1:
              precision    recall  f1-score   support

      normal       0.87      0.86      0.86       394
    arritmia       0.87      0.86      0.86       394
       block       0.76      0.79      0.77       394
 fibrilation       0.84      0.82      0.83       394

    accuracy                           0.83      1576
   macro avg       0.83      0.83      0.83      1576
weighted avg       0.83      0.83      0.83      1576


Detalle Estrategia 2:
              precision    recall  f1-score   support

      normal       0.88      0.75      0.81       394
    arritmia       0.83      0.86      0.84       394
       block       0.71      0.82      0.76       394
 fibrilation       0.83      0.82      0.83       394

    accuracy                           0.81      1576
   macro avg       0.82      0.81      0.81      1576
weighted avg       0.82      0.81      0.81      1576


Matriz de Confu