<a href="https://colab.research.google.com/github/mateollorente/Producto/blob/master/PTB_XL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 1. Descargar el archivo ZIP directamente desde PhysioNet
# Este comando descarga la versión 1.0.3 del dataset.
!wget -N https://physionet.org/static/published-projects/ptb-xl/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.3.zip

# 2. Descomprimir el archivo ZIP
# El comando 'unzip -q' descomprime silenciosamente (sin listar todos los archivos).
# Los archivos se extraerán en la carpeta actual donde ejecutes el notebook.
!unzip -q ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.3.zip

# 3. (Opcional) Verificar que los archivos existen
# Listar el contenido para confirmar que se descomprimió correctamente.
# Deberías ver las carpetas 'records100', 'records500' y los archivos .csv
!ls

--2025-10-27 02:16:06--  https://physionet.org/static/published-projects/ptb-xl/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.3.zip
Resolving physionet.org (physionet.org)... 18.18.42.54
Connecting to physionet.org (physionet.org)|18.18.42.54|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://physionet.org/content/ptb-xl/get-zip/1.0.3/ [following]
--2025-10-27 02:16:07--  https://physionet.org/content/ptb-xl/get-zip/1.0.3/
Reusing existing connection to physionet.org:443.
HTTP request sent, awaiting response... 200 OK
Length: 1839504686 (1.7G) [application/zip]
Saving to: ‘ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.3.zip’

                  p   2%[                    ]  39.19M   413KB/s    eta 74m 47s^C
[ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.3.zip]
  End-of-central-directory signature not found.  Either this file is not
  a zipfile, or it constitutes one disk of a mult

In [None]:
import wfdb
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.utils import to_categorical
import ast # Para procesar el diccionario de scp_codes

# --- 1. Definir Constantes y Cargar Metadatos ---
SAMPLING_RATE = 100
DATA_PATH = './' # Ruta base donde están los archivos descomprimidos
RECORDS_PATH = f'{DATA_PATH}records{SAMPLING_RATE}/' # Ruta a los archivos WFDB

# Cargar metadatos
metadata_df = pd.read_csv(f'{DATA_PATH}ptbxl_database.csv', index_col='ecg_id')
scp_df = pd.read_csv(f'{DATA_PATH}scp_statements.csv', index_col=0)

# Filtrar solo códigos SCP de diagnóstico para las superclases
scp_df = scp_df[scp_df.diagnostic == 1]

# --- 2. Función para Mapear Códigos SCP a Superclases ---
def aggregate_diagnostic(scp_codes_dict):
    """
    Mapea los códigos SCP de un ECG a su superclase diagnóstica principal.
    Prioriza NORM si está presente, de lo contrario, toma la primera superclase encontrada.
    Devuelve NaN si no hay códigos de diagnóstico mapeables.
    """
    # Mapeo de superclases (ajustar si es necesario)
    superclass_map = {
        'NORM': 'NORM', # Normal ECG
        'MI': 'MI',     # Myocardial Infarction
        'STTC': 'STTC', # ST/T Change
        'CD': 'CD',     # Conduction Disturbance
        'HYP': 'HYP'    # Hypertrophy
    }

    # Añadir columna de superclase al DataFrame scp_df si no existe
    # (Esto puede requerir un mapeo manual o usar las columnas existentes)
    # Por simplicidad, asumimos que scp_df tiene una columna 'diagnostic_class'
    # que contiene NORM, MI, STTC, CD, HYP. Si no, necesitas crearla.
    # Ejemplo: scp_df['diagnostic_class'] = scp_df['diagnostic_subclass'].map(subclass_to_superclass_dict)

    if not isinstance(scp_codes_dict, dict):
      return np.nan # No es un diccionario válido

    # Buscar si 'NORM' está presente con alta probabilidad
    if 'NORM' in scp_codes_dict and scp_codes_dict['NORM'] >= 50:
         return 'NORM'

    # Si no es NORM, buscar la primera superclase diagnóstica presente
    for code, likelihood in scp_codes_dict.items():
        if code in scp_df.index and likelihood >= 50: # Considerar solo si la probabilidad es alta
            # Asume que scp_df tiene la columna 'diagnostic_class'
            # Necesitas asegurarte de que esta columna exista y esté poblada
            # con NORM, MI, STTC, CD, HYP
            if 'diagnostic_class' in scp_df.columns:
                super_class = scp_df.loc[code, 'diagnostic_class']
                if super_class in superclass_map:
                    return super_class
            else:
                 # Si no existe 'diagnostic_class', mapear manualmente aquí o retornar NaN
                 # print(f"Advertencia: Columna 'diagnostic_class' no encontrada en scp_df.")
                 # return np.nan # O intentar un mapeo manual si conoces los códigos
                 pass # Evitar que falle si la columna no existe

    return np.nan # Si no se encuentra ninguna superclase válida

# --- 3. Aplicar Mapeo de Etiquetas y Dividir Datos ---

# Convertir la columna 'scp_codes' de string a diccionario
metadata_df['scp_codes'] = metadata_df.scp_codes.apply(lambda x: ast.literal_eval(x))

# Aplicar la función para obtener la etiqueta de superclase
metadata_df['diagnostic_superclass'] = metadata_df.scp_codes.apply(aggregate_diagnostic)

# Eliminar filas donde no se pudo asignar una superclase
metadata_df = metadata_df.dropna(subset=['diagnostic_superclass'])

# Crear un mapeo numérico para las clases
unique_classes = metadata_df.diagnostic_superclass.unique()
class_map = {label: i for i, label in enumerate(unique_classes)}
metadata_df['label_id'] = metadata_df.diagnostic_superclass.map(class_map)
num_classes = len(unique_classes)

print(f"Mapeo de Clases: {class_map}")

# Dividir según los folds recomendados
train_df = metadata_df[metadata_df.strat_fold <= 8]
val_df = metadata_df[metadata_df.strat_fold == 9]
test_df = metadata_df[metadata_df.strat_fold == 10]

print(f"Tamaños -> Entrenamiento: {len(train_df)}, Validación: {len(val_df)}, Prueba: {len(test_df)}")

# --- 4. Función para Cargar y Normalizar Señales ---

def load_and_preprocess_data(df, records_path, sampling_rate):
    """Carga señales, normaliza y convierte etiquetas."""

    # Cargar señales (elige filename_lr para 100Hz)
    signals = [wfdb.rdsamp(records_path + f)[0] for f in df.filename_lr]
    X = np.array(signals)

    # Normalizar cada señal (lead por lead)
    # (Muestras, Longitud, Derivaciones) -> (Muestras, Longitud_Normalizada, Derivaciones)
    scaler = StandardScaler()
    X_scaled = np.zeros_like(X, dtype=np.float32) # Usar float32 para Keras
    for i in range(X.shape[0]):
         # Normaliza cada derivación (lead) independientemente
         X_scaled[i] = scaler.fit_transform(X[i])

    # Obtener etiquetas numéricas y convertir a one-hot
    y = df.label_id.values
    y_cat = to_categorical(y, num_classes=num_classes)

    return X_scaled, y_cat

# --- 5. Ejecutar Carga y Preprocesamiento ---
print("Cargando y preprocesando datos de entrenamiento...")
X_train, y_train = load_and_preprocess_data(train_df, RECORDS_PATH, SAMPLING_RATE)
print("Cargando y preprocesando datos de validación...")
X_val, y_val = load_and_preprocess_data(val_df, RECORDS_PATH, SAMPLING_RATE)
print("Cargando y preprocesando datos de prueba...")
X_test, y_test = load_and_preprocess_data(test_df, RECORDS_PATH, SAMPLING_RATE)

print("\n--- Formas Finales ---")
print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"X_val shape: {X_val.shape}, y_val shape: {y_val.shape}")
print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, GlobalAveragePooling1D, Dense, Dropout, BatchNormalization

def build_conv1d_model(input_shape, num_classes):
    model = Sequential([
        Conv1D(filters=64, kernel_size=10, activation='relu', input_shape=input_shape),
        BatchNormalization(),
        MaxPooling1D(pool_size=3),

        Conv1D(filters=128, kernel_size=10, activation='relu'),
        BatchNormalization(),
        MaxPooling1D(pool_size=3),

        Conv1D(filters=128, kernel_size=10, activation='relu'),
        BatchNormalization(),
        GlobalAveragePooling1D(), # Reduce la dimensionalidad antes de Dense

        Dense(64, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax') # Softmax para multiclase
    ])

    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    return model

# --- Uso ---
# input_shape = (X_train.shape[1], X_train.shape[2]) # (longitud_secuencia, num_derivaciones)
# num_classes = y_train.shape[1]
# model = build_conv1d_model(input_shape, num_classes)
# model.summary()