# 3.1 Preparación del entorno

In [1]:
# Instalar bibliotecas necesarias
import wfdb
import scipy.io as sio
import matplotlib.pyplot as plt
import numpy as np
from scipy.io import loadmat
import os
import pandas as pd
from scipy.signal import butter, filtfilt
from biosppy.signals import ecg
from biosppy.signals import ppg
from biosppy.plotting import plot_ecg
from biosppy.plotting import plot_ppg

from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.metrics import classification_report
from sklearn.ensemble import RandomForestClassifier



In [2]:
#Función para limpiar carpetas
def limpiar_carpeta(carpeta):
    for file in os.listdir(carpeta):
        os.remove(carpeta+'/'+file)

limpiar_carpeta('data/cleaned')

FileNotFoundError: [WinError 3] El sistema no puede encontrar la ruta especificada: 'data/cleaned'

In [None]:
# Funciones de obtención de datos

def get_name_files(path,type):
    files = []
    for file in os.listdir(path):
        if file.endswith(type):
            files.append(file[:-4])
    return files

def read_file_hea(filename, path):
    # lee archivo .hea
    record = wfdb.rdheader(path+filename)
    return record

def read_file_mat(file, path):
    # carga archivo .mat (tiene las señales)
    mat_data = loadmat(path + file +'.mat')
    signals = mat_data['val']
    return signals

def read_file_flat(file,path):
    mat_data = loadmat(path + file +'.mat')
    if 'flatareas' in mat_data:
        flatareas = mat_data['flatareas']
    else:
        flatareas = np.array([])
    return flatareas

def read_file_zigzag(file,path):
    mat_data = loadmat(path + file +'.mat')
    if 'zigzagareas' in mat_data:
        zigzagareas = mat_data['zigzagareas']
    else:
        zigzagareas = np.array([])
    return zigzagareas

# 3.2 Análisis exploratorio de datos

In [None]:

old_path='data/training/' #ruta de los archivos preliminares
#toma los nombres de los archivos .hea
nameFiles = get_name_files(old_path,'.hea')
#revisa el primer archivo .hea
heaRecord1 = read_file_hea(nameFiles[0], old_path)
print(heaRecord1.__dict__)

In [None]:
all_unique_sig_names = set()
for file in nameFiles:
    record = read_file_hea(file, old_path)
    all_unique_sig_names.update(record.sig_name)

print(f"Nombres de canal presentes: {all_unique_sig_names}")
print(f"Total de canales: {len(all_unique_sig_names)}")

In [None]:
# contar cuantos datos .mat hay en data/training
i=0
for file in os.listdir('data/training/'):
    if file.endswith('.mat'):
        i += 1
print('cantidad de datos:', i)

## 3.2.1 Funciones de visualización
Se desarrollaron funciones de visualización para observar las caracteristicas de los datos de manera gráfica, junto con una recopilación en '.csv' de todos los archivos '.hea' de cada archivo para observar sus características disponible aquí [ExploraciónHea](data/exploraciónHea.csv)

Entre las funciones más útiles se encuentra 'graf_signals', con ella se exportó una gráfica de cada caso en formato '.jpg'. Los gráficos pueden encontrarse aquí [Gráficos de señal preliminar](data/imgs)

In [None]:

def grafica_ecg(record, signals, path_save,picos_r=None): #subfunción de graf_signals
  
    plt.figure(figsize=(12, 6))
    for i in range(len(record.sig_name)):
        plt.subplot(len(record.sig_name), 1, i+1)
        plt.plot(signals[i, :], label=f'Señal: {record.sig_name[i]}')
        if picos_r is not None:
            plt.plot(picos_r, signals[i, picos_r], 'ro', markersize=1)
            #Imprime los picos_r
            print(f'Picos R señal {record.sig_name[i]}: {picos_r}')

        plt.xlabel('Muestras')
        plt.ylabel(record.units[i])
        plt.legend()

    plt.suptitle(record.comments[0] + ' - ' + record.comments[1])
  
    plt.savefig(path_save + record.record_name + '.jpg')
    plt.close()

#función para graficar una cantidad de señales/casos de un directorio de archivos .mat
def graf_signals(files, path, cant, path_save):
    for fileName in files[:cant]:
        print('Graficando señales de: ', fileName)
        record = read_file_hea(fileName, 'data/training/')
        signals, flat_areas, zigzag_areas = read_file_mat(fileName, path) 
        #picos_r = sio.loadmat(path +file+'.mat')['picos_r']
        grafica_ecg(record,signals, path_save)

# Grafica todos los canales de una señal/caso con sus areas planas y zigzag
def grafica_preview(record, signals, oldSignal, path, flat_areas=[], zigzag_areas=[]):
    plt.figure(figsize=(12, 6))
    for i in range(signals.shape[0]):
        plt.subplot(signals.shape[0], 1, i + 1)
        plt.plot(signals[i, :], label=f'Señal limpia: {record.sig_name[i]}', color='blue', alpha=0.5)
        plt.plot(oldSignal[i, :], label=f'Señal original: {record.sig_name[i]}', color='red', alpha=0.3)
        # Resaltar visualmente los segmentos planos si existen

        if  flat_areas.size > 0:
            for flat in flat_areas[0,i]:
                plt.axvspan(flat[0], flat[1], color='red', alpha=0.5)

        # Resaltar visualmente los segmentos zigzag si existen
        if zigzag_areas.size > 0:
            for zigzag in zigzag_areas[0,i]:
                plt.axvspan(zigzag[0], zigzag[1], color='yellow', alpha=0.5)
        
        plt.xlabel('Muestras')
        plt.ylabel(record.units[i])
        plt.legend()

    plt.suptitle(record.comments[0] + ' - ' + record.comments[1])
    plt.show()

#Grafica sólo canales ya procesados, mostrando sus picosR en caso de ecg, picos en caso de ppg y latidos cardiacos y complejo QRS
def grafica_preview_bios(record, signal):
    try:
        ecg_type = ['I', 'II', 'III', 'V']
        pleth_type = ['PLETH']
        for i in range(len(record.sig_name)):
            channel = signal[i, :]
            if record.sig_name[i] in ecg_type:
                ecg.ecg(signal=channel, sampling_rate=250, show=True)
            elif record.sig_name[i] in pleth_type:
                ppg.ppg(signal=channel, sampling_rate=250, show=True)
    except Exception as e:
        print(f'Señal {record.record_name} no válida para esta gráfica: {e}')


In [None]:
# Visualizador general: Grafica toda la información útil de una señal
def visualizador_señales_en_file(nameFile_, path, old_path):
    nameFile = nameFiles.index(nameFile_) #nombre de la muestra
    record = read_file_hea(nameFiles[nameFile], old_path) #buscar . hea siempre será en old_path
    signalPreview = read_file_mat(nameFiles[nameFile], path)
    oldSignal = read_file_mat(nameFiles[nameFile], old_path)
    flat_areas = read_file_flat(nameFiles[nameFile], path)
    zigzag_areas = read_file_zigzag(nameFiles[nameFile], path)

    grafica_preview(record, signalPreview, oldSignal, path, flat_areas, zigzag_areas)
    grafica_preview_bios(record, signalPreview) #sólo para señales correctas

def visualizador_señales_limpias(record, signals):
    ecg_type = ['I', 'II', 'III', 'V']
    pleth_type = ['PLETH']
    print(record.comments[0] + ' - ' + record.comments[1])
    for i in range(len(record.sig_name)):
        print(record.sig_name[i])
        channel = signals[i, :]
        if record.sig_name[i] in ecg_type:
            a=ecg.ecg(signal=channel, sampling_rate=250, show=True)
        elif record.sig_name[i] in pleth_type:
            ppg.ppg(signal=channel, sampling_rate=250, show=True)


## 3.2.2 Limpieza de los datos

Gracias a las funciones de visualización se pueden observar caracteristicas importantes de la señal.

En primer lugar se observó estas no tenían una longitud estándar, muchas poseían ruido y no se identificaba a simple vista el complejo QRS.
Además muchos canales poseían líneas planas y en zigzag, lo que dificulta aún mas la detección de características.

Debido a esto se determina el siguiente plan de acción:
- Detección de lineas planas y zig_zag.
- Desarrollo de un filtro butterworth de paso bajo, con un cutoff=1 y order=2. [2]
- Clasificar datos válidos para entrenamiento y testing.

Se detallan a continuación funciones importantes:

**mark_flat_lines** : Marca líneas planas en la señal donde la diferencia entre valores consecutivos es menor que un umbral.

Parameteros:
- signal: La señal a analizar (ECG o PPG).
- fs: Frecuencia de muestreo en Hz.
- min_duration: Duración mínima en segundos para que se considere una línea plana.
- flat_value: El valor que se utilizará para marcar las líneas planas en la señal.
- threshold: Umbral de diferencia entre muestras consecutivas para considerar una línea plana.

Returns:
- marked_signal: La señal con las líneas planas marcadas.
- flat_areas: Lista de tuplas (inicio, fin) que indican las áreas planas detectadas.

**mark_zigzag_lines** :  Identifica segmentos en zigzag en una señal.
    
Parámetros:
- signal (numpy array): La señal a analizar.
- fs (int): Frecuencia de muestreo en Hz. Por defecto es 250 Hz.
- min_duration (float): Duración mínima en segundos de un zigzag. Por defecto es 0.2 segundos.

Retorna:
zigzag_areas (list): Lista de tuplas (inicio, fin) de los segmentos en zigzag.

Tanto 'min_duration' como 'threshold' fueron ajustados para detectar correctamente señales sin actividad necesaria, o de nula actividad.

In [None]:

def mark_flat_lines(signal, fs=250, min_duration=0.3, flat_value=-9999, threshold=2):
    # Número de muestras consecutivas que definen una línea plana
    min_samples = int(min_duration * fs)

    # Diferencias absolutas entre muestras consecutivas
    diff_signal = np.abs(np.diff(signal))

    # Encuentra dónde la diferencia es menor que el umbral
    flat_segments = (diff_signal < threshold).astype(int)

    # Identifica segmentos planos continuos de al menos min_samples de duración
    flat_areas = []
    current_length = 0
    start_index = None

    for i in range(len(flat_segments)):
        if flat_segments[i] == 1:  # Si es un segmento plano
            if current_length == 0:
                start_index = i
            current_length += 1
        else:
            if current_length >= min_samples:  # Si cumple con la duración mínima
                flat_areas.append((start_index, start_index + current_length))
            current_length = 0

    # Considerar el último segmento si termina en una zona plana
    if current_length >= min_samples:
        flat_areas.append((start_index, start_index + current_length))

    # Marca los segmentos planos en la señal
    marked_signal = np.copy(signal)
    for start, end in flat_areas:
        marked_signal[start:end] = flat_value  # Marcar con un valor específico como -9999
    
    return marked_signal, flat_areas

def mark_zigzag_lines(signal, fs=250, min_duration=0.3):
    # Número de muestras consecutivas que definen un zigzag
    min_samples = int(min_duration * fs)
    
    # Calcula las diferencias entre muestras consecutivas (pendiente)
    diff_signal = np.diff(signal)
    
    # Determina si la pendiente es positiva o negativa
    sign_changes = np.sign(diff_signal)
    
    # Identifica los cambios de signo consecutivos (zigzag)
    zigzag_segments = (sign_changes[:-1] * sign_changes[1:] == -1).astype(int)

    # Identifica segmentos zigzag continuos de al menos min_samples de duración
    zigzag_areas = []
    current_length = 0
    start_index = None
    
    for i in range(len(zigzag_segments)):
        if zigzag_segments[i] == 1:  # Cambio de pendiente
            if current_length == 0:
                start_index = i
            current_length += 1
        else:
            if current_length >= min_samples:
                zigzag_areas.append((start_index, start_index + current_length + 1))
            current_length = 0

    # Último segmento
    if current_length >= min_samples:
        zigzag_areas.append((start_index, start_index + current_length + 1))
    
    return zigzag_areas

# filtro butterworth
def butter_bandpass_filter(data, cutoff=1, fs=250, order=2):
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = filtfilt(b, a, data)
    return y


### 3.2.2.1 Selección de datos válidos 

Para la clasificación de EBC, ETC y ASY, las señales
de ECG y PPG estaban presentes en todos los casos. [2]

En el caso de VFB y VTA, debería ser limitada
únicamente a las señales de ECG, ya que los datos de PPG eran muy
irregulares y no ayudaban a realizar ninguna predicción razonable. [2]

Con esta información, se determina que almenos un canal derivado de ECG debería tener una señal 'limpia', es decir, libre de líneas planas y zig-zag para poder extraer sus características QRS, latidos cardiacos y picos R. Con esta condición, ese caso en particular se considera válido. 

Con respecto a los canales 'PLETH' se determina más adelante si califican para considerar sus caracterísiticas, ya que depende del tipo de arritmia su utilización. 

Además de esto se crea directorio para los datos válidos ('data/cleaned')


In [None]:

new_path = 'data/cleaned/'

if not os.path.exists(new_path):
    os.makedirs(new_path)
    
#se separan las señales calificadas y las no calificadas
for fileName in nameFiles:
    path = old_path
    record = read_file_hea(fileName, path)
    signals = read_file_mat(fileName, path)

    # Recorta y deja sólo los últimos 10 segundos de señal
    signals = signals[:, -record.fs*20:]
    dirtySignals = signals

    # Se aplica filtro sólo a las señales ECG ('I', 'II', 'III' y 'V')
    ecg_type = ['I', 'II', 'III', 'V']
    flat_areas_all = []
    zigzag_areas_all = []

    my_ecg = []
    strike = 0

    try:
        for i in range(len(record.sig_name)):
            if record.sig_name[i] in ecg_type and record.sig_name[i] not in my_ecg:
                my_ecg.append(record.sig_name[i])
                
        for i in range(len(record.sig_name)):

            if record.sig_name[i] in ecg_type:
                
                flat_signal, flat_areas0 = mark_flat_lines(signals[i, :], fs=record.fs)
                zigzag_areas0 = mark_zigzag_lines(signals[i, :], fs=record.fs)

            if len(flat_areas0) != 0 or len(zigzag_areas0) != 0:
                strike = strike + 1

        if strike >= len(my_ecg):
            raise Exception('Señal no califica para ser del training, no contiene ecg limpias'.format(fileName))

        else:
            sio.savemat(new_path + fileName + '.mat', {
            'val': signals,
            })

            print('se guardó la señal ', fileName, ' en el directorio cleaned')

    except Exception as e:
        print(f'Señal corrupta {fileName}: {e}')
        print(f'Señal no califica para ser del training {fileName} se descarta todo su archivo')

        
        
# se elimino 'v846s' de forma manual ya que saltó el filtro de líneas planas-zigzag y posee anomalías
os.remove('data/cleaned/v846s.mat')



In [None]:
# Se revisa cuántas señales se guardaron en el directorio cleaned
i=0
for file in os.listdir('data/cleaned/'):
    if file.endswith('.mat'):
        i += 1
print('cantidad de datos válidos para training:', i)

Una vez se tienen los datos válidos se corroboran sus características, cabe destacar que aún no están limpios

In [None]:
# graficar una señal limpia
fileName = 'b388s'
path = 'data/cleaned/'
record = read_file_hea(fileName, 'data/training/')
signals = read_file_mat(fileName, path)
flat_areas = read_file_flat(fileName, path)
zigzag_areas = read_file_zigzag(fileName, path)
grafica_preview(record, signals, signals, path, flat_areas, zigzag_areas)


In [None]:
visualizador_señales_limpias(read_file_hea(fileName, 'data/training/'), read_file_mat(fileName, path))

### 3.2.2.2 Preparación de los datos calificados


**Limpieza de los datos válidos** 

Los canales considerados para el análisis, sólo corresponden a derivaciones ECG y PPG, por lo que son los únicos que se van a analizar.

Se utiliza la librería [Biosspy](https://biosppy.readthedocs.io/en/stable/) para la limpieza, normalización de los datos y extracción de características, junto con un filtro de paso de banda. Posterior a esto se extraen las caracteristicas de cada caso y se guardan los canales ECG y PPG en un vector de canales

In [None]:
from scipy.signal import savgol_filter

# retorna los datos necesarios y limpios de cada caso
def biosppy_cases_data(new_path, old_path, nameFiles):

    data_cases = []
    
    for fileName in nameFiles:

        signals = read_file_mat(fileName, new_path)
        record = read_file_hea(fileName, old_path)

        # Se aplica filtro sólo a las señales ECG ('I', 'II', 'III' y 'V') y PLETH
        ecg_type = ['I', 'II', 'III', 'V']
        pleth_type = ['PLETH']
        
        allSignals = []
        
        channel_type = []
        anomaly_type= record.comments[0]
        alarm_veracity = record.comments[1]

        min_heart_rate_pleth = 0
        max_heart_rate_pleth = 0
        min_heart_rate_ecg = 0
        max_heart_rate_ecg = 0

        for i in range(len(record.sig_name)): 
            ecg_if = (record.sig_name[i] in ecg_type)
            pleth_if = (record.sig_name[i] in pleth_type)
            if ecg_if or pleth_if:
                
                if ecg_if:
                    try:
                        channel = signals[i, :]
                        butterFilter = butter_bandpass_filter(channel, cutoff=1, fs=250, order=2)
                        channel = channel - butterFilter
                        
                        channel = ecg.ecg(signal=channel, sampling_rate=250, show=False)
                        channel_type = record.sig_name[i]

                        heart_rate_ecg = channel['heart_rate']

                        if heart_rate_ecg.any() and min_heart_rate_ecg == 0:
                            min_heart_rate_ecg = min(heart_rate_ecg)
                        if heart_rate_ecg.any() and max_heart_rate_ecg == 0:
                            max_heart_rate_ecg = max(heart_rate_ecg)
                            
                        allSignals.append([channel, channel_type])
                    except Exception as e:
                        print(f'canal {record.sig_name[i]} en señal {fileName} no válido para biosppy: {e}')
                    
                elif pleth_if:
                    
                    try:

                        channel = signals[i, :]
                        channel = ppg.ppg(signal=channel, sampling_rate=250, show=False)
                        
                        filtered_channel = channel['filtered']
                        channel_type = record.sig_name[i]

                        peaks = channel['peaks']
                        heart_rate_pleth = channel['heart_rate']

                        if heart_rate_pleth.any():
                            min_heart_rate_pleth = min(heart_rate_pleth)
                        if heart_rate_pleth.any():
                            max_heart_rate_pleth= max(heart_rate_pleth)

                        allSignals.append([channel, channel_type])

                    except Exception as e:
                        print(f'canal {record.sig_name[i]} en señal {fileName} no válido para biosppy: {e}')
        data_cases.append({'fileName:': fileName, 'channels': allSignals,'alarm_veracity': alarm_veracity, 'anomaly_type': anomaly_type, 'max_heart_rate_ecg': max_heart_rate_ecg, 'min_heart_rate_ecg': min_heart_rate_ecg, 'max_heart_rate_pleth': max_heart_rate_pleth, 'min_heart_rate_pleth': min_heart_rate_pleth})

    return data_cases


In [None]:

new_path = 'data/cleaned/'
nameFiles = get_name_files(new_path,'.mat')


all_unique_comments_anomaly = set()
for file in nameFiles:
    record = read_file_hea(file, old_path)
    all_unique_comments_anomaly.update(record.comments[0].split())

print(f"Tipos de arritmia presentes: {all_unique_comments_anomaly}")
print(f"Total: {len(all_unique_comments_anomaly)}")

        

# 3.3 Desarrollo de algoritmos


Se puede observar que al limpiar la señal, se descartan las señales PLETH y ECG que, o no poseen suficientes pulsos para detectar el ritmo cardiaco o poseen anomalías en su señal que no permiten extraer sus características incluso con los filtros aplicados.
Luego se convierten las señales válidas en un DataFrame de la librería [Pandas](https://pandas.pydata.org) 



In [None]:

data_cases = biosppy_cases_data(new_path, old_path, nameFiles)
data_cases = pd.DataFrame(data_cases)


In [None]:
data_cases

Se establece una función extra para visualizar las señales limpias con todas sus características importantes de un sólo parámetro

In [None]:
def visualizador_df_signals(channel):
    ecg_type = ['I', 'II', 'III', 'V']
    pleth_type = ['PLETH']
    print(channel['alarm_veracity'] + ' - ' + channel['anomaly_type'])
    if channel['channel_type'] in ecg_type:
        ecg.ecg(signal=channel['channel'], sampling_rate=250, show=True)
    elif channel['channel_type'] in pleth_type:
        ppg.ppg(signal=channel['channel'], sampling_rate=250, show=True)
