
# EDA — MIT-BIH Arrhythmia Database (Kaggle CSV + annotations)

**Proyecto:** *Clasificación explicable de arritmias cardíacas a partir de ECG transformados en espectrogramas mediante CNNs*  

**Objetivo del EDA:** Implementar un Análisis Exploratorio de Datos (EDA) completo sobre los registros MIT-BIH (canales MLII y V5) y sus anotaciones de latido/evento, para extraer **insights** que guíen el preprocesamiento y el modelado (segmentación en ventanas, normalización, manejo de clases, etc.).

In [1]:
# =====================
# Importación de librerías
# =====================
import os, re, glob, math, textwrap
from pathlib import Path

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from scipy import stats

# Estilo de gráficos
sns.set(style="whitegrid")
plt.rcParams["figure.figsize"] = (12, 5)

# Mostrar todas las columnas de pandas
pd.set_option("display.max_columns", None)

print("Versions -> pandas:", pd.__version__)


Versions -> pandas: 2.2.2



## Configuración y utilidades

- `DATA_DIR`: carpeta donde están los `.csv` y `annotations.txt` (formato Kaggle).  
- `FS`: frecuencia de muestreo.  
- Helpers para **cargar señales**, **cargar anotaciones** y **unificar formatos**.


In [None]:
# =====================
# Configuración
# =====================
# Ajusta esta ruta a tu carpeta local (por ejemplo: r"C:/Users/tu_usuario/Documents/Titulacion_UES/Notebooks/EDA-MIT-BIH-Arrhythmia-Database/kaggle_dataset")
DATA_DIR = Path(r"C:\Users\luisa\Documents\Titulacion_UEES\Notebooks\EDA-MIT-BIH-Arrhythmia-Database\kaggle_dataset")

# Frecuencia de muestreo típica de MIT-BIH
FS = 360  # Hz

# Intento de auto-descubrimiento si no existe la ruta por defecto
if not DATA_DIR.exists():
    # Busca recursivamente en el árbol actual alguna carpeta con csv+annotations
    candidates = []
    for p in Path(".").rglob("*"):
        try:
            if p.is_dir():
                has_csv = any(Path(p).glob("[0-9][0-9][0-9].csv"))
                has_ann = any(Path(p).glob("[0-9][0-9][0-9]annotations.txt"))
                if has_csv and has_ann:
                    candidates.append(p)
        except Exception:
            pass
    if candidates:
        DATA_DIR = candidates[0]
        print("DATA_DIR auto-detectado en:", DATA_DIR.resolve())
    else:
        print("⚠️ No se encontraron datos automáticamente. Ajusta DATA_DIR manualmente.")

DATA_DIR = DATA_DIR.resolve()
DATA_DIR

In [None]:
# =====================
# Funciones robustas de carga
# =====================
def _clean_signal_df(df):
    """Normaliza nombres de columnas y tipos para el CSV de señales."""
    # Limpiar comillas simples, espacios extras en nombres de columnas
    df.columns = [c.replace("'", "").strip() for c in df.columns]
    # Mapear 'sample #' (ignorando mayúsculas/minúsculas) a 'sample'
    rename_map = {}
    for c in df.columns:
        cc = c.lower().replace(" ", "").replace("#", "")
        if cc == "sample":
            rename_map[c] = "sample"
    if rename_map:
        df = df.rename(columns=rename_map)
    # Normalizar nombres de canales comunes
    for c in list(df.columns):
        clean = c.strip().upper().replace(" ", "")
        if clean in ("MLII", "ML2"):
            df = df.rename(columns={c: "MLII"})
        elif clean in ("V1","V2","V4","V5","V6"):
            df = df.rename(columns={c: clean})
    # Forzar numérico
    for c in df.columns:
        df[c] = pd.to_numeric(df[c], errors='coerce')
    # Quitar filas con NaN en columnas clave (al menos sample y al menos un canal)
    # Definir columnas de canal
    canal_cols = [col for col in df.columns if col in ("MLII","V1","V2","V4","V5","V6")]
    # dropna en sample y al menos un canal
    df = df.dropna(subset=["sample"] + canal_cols, how="all").reset_index(drop=True)
    # Asegurar tipo de sample
    df['sample'] = df['sample'].astype(int)
    # Crear columna de tiempo
    df['time_sec'] = df['sample'] / FS
    return df

def read_signal_csv(csv_path: Path) -> pd.DataFrame:
    """Lee un CSV de señales con encabezados tipo 'sample #', 'MLII', 'V1'."""
    # Intento normal
    df = pd.read_csv(csv_path)
    return _clean_signal_df(df)
    # Podrías agregar fallback si no reconoce columnas, pero usualmente con tu encabezado será suficiente.

def read_annotations_txt(ann_path: Path) -> pd.DataFrame:
    """Lee archivo de anotaciones con encabezado: Time   Sample #  Type  Sub Chan  Num Aux."""
    # Leer con read_csv separador por espacios múltiples, considerando encabezado
    # Primero limpiamos encabezado
    df = pd.read_csv(ann_path, sep=r"\s+", engine='python')
    # Si la columna “Sample #” está presente, renómbrala
    df_cols = df.columns.tolist()
    rename_ann = {}
    for c in df_cols:
        cc = c.lower().replace(" ", "").replace("#","")
        if cc == "sample":
            rename_ann[c] = "sample"
        elif cc == "time":
            rename_ann[c] = "Time"
        elif cc == "type":
            rename_ann[c] = "type"
        # las otras columnas Sub, Chan, Num, Aux pueden dejarse igual
    if rename_ann:
        df = df.rename(columns=rename_ann)
    # Asegurar que columnas necesarias existen
    required = ["sample", "type"]
    for r in required:
        if r not in df.columns:
            raise RuntimeError(f"Columna requerida '{r}' no encontrada en {ann_path}, columnas: {df.columns.tolist()}")
    # Convertir sample a entero
    df['sample'] = pd.to_numeric(df['sample'], errors='coerce')
    df = df.dropna(subset=['sample']).reset_index(drop=True)
    df['sample'] = df['sample'].astype(int)
    # Limpiar tipo
    df['type'] = df['type'].astype(str).str.strip()
    df = df[df['type'] != ""].reset_index(drop=True)
    # Tiempo: si columna Time existe y no vacía, convertimos; si no, la generamos
    if 'Time' in df.columns:
        # A veces Time es formato hh:mm:ss.sss o mm:ss.xxx o cosas así;
        # si no parsea bien, conviene usar sample/FS como respaldo
        # Intentemos parseo robusto: si Time parsea como número flotante, bueno, sino ignorar
        try:
            df['Time_num'] = pd.to_numeric(df['Time'], errors='coerce')
        except:
            df['Time_num'] = np.nan
        if df['Time_num'].isna().all():
            df['Time'] = df['sample'] / FS
        else:
            df['Time'] = df['Time_num']
        df = df.drop(columns=['Time_num'], errors='ignore')
    else:
        df['Time'] = df['sample'] / FS

    # Seleccionar columnas finales
    # Asegurar que al menos las columnas: sample, type, Sub, Chan, Num, Aux, Time
    for col in ['Sub','Chan','Num','Aux']:
        if col not in df.columns:
            df[col] = np.nan
    return df[['sample','type','Sub','Chan','Num','Aux','Time']].copy()


def load_record(record_id: str, base_dir: Path = DATA_DIR) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Carga señales y anotaciones para un `record_id` (p. ej., '100')."""
    csv_path = base_dir / f"{record_id}.csv"
    ann_path = base_dir / f"{record_id}annotations.txt"
    if not csv_path.exists():
        raise FileNotFoundError(f"No existe: {csv_path}")
    if not ann_path.exists():
        raise FileNotFoundError(f"No existe: {ann_path}")
    sig = read_signal_csv(csv_path)
    ann = read_annotations_txt(ann_path)
    # Merge: marca el tipo de latido en las muestras exactas anotadas
    sig = sig.copy()
    sig['record'] = str(record_id)
    ann = ann.copy()
    ann['record'] = str(record_id)
    sig = sig.merge(ann[['sample','type']], on='sample', how='left')
    return sig, ann


def list_record_ids(base_dir: Path = DATA_DIR) -> list[str]:
    """Lista IDs disponibles (a partir de archivos ###.csv)."""
    ids = []
    for p in base_dir.glob("[0-9][0-9][0-9].csv"):
        ids.append(p.stem)
    return sorted(ids)