# LIBRERIAS

Exportamos las librerías que vamos a emplear

In [1]:
import os
import scipy
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.io import loadmat
from scipy.interpolate import splev, splrep
import scipy.io as sio
from scipy.signal import resample
from scipy.signal import kaiserord, firwin, lfilter

# BASE DE DATOS

In [None]:
def ST_T_delineation(ecg_signal, fiducial_points, fs):
"""
Esta función delinea el complejo ST-T usando intervalos de 300 ms de longitud.
El inicio del ST-T se elige a una distancia variable del punto QRS que
depende de la longitud del intervalo RR anterior.

Parámetros:
    ecg_signal (array): Señal de ECG.
    fiducial_points (array): Anotaciones de la onda R.
    fs (float): Frecuencia de muestreo.

Retornos:
    ST_T_complexes (array): Matriz de complejos ST-T.
    ST_T_onset (array): Matriz de anotaciones de valores de inicio de ST-T.
"""

    RR_interval = np.diff(fiducial_points) / fs  # Intervalo RR en segundos
    qi = 40 + (1.3 * np.sqrt(RR_interval * 1e3))  # ST-T onset from R in milliseconds

    if qi.ndim == 1:
        qi = np.column_stack((qi[0], qi[:-1]))
    else:
        qi = np.column_stack((qi[:, 0], qi[:, :-1]))
    s = np.array(qi * 1e-3 * fs)
    fiducial_points = fiducial_points.transpose()


    ST_T_onset = fiducial_points + np.ceil(s) + 1  # ST-T onset in samples

# para que no sea una lista con matrices con el mismo numero repetido
    ST_T_onset = [matriz[0] for matriz in ST_T_onset]

    ST_T_interval = np.ceil(300e-3 * fs)  # ST-T interval length in number of samples

    ST_T_complexes = np.zeros((len(ST_T_onset)-1, int(ST_T_interval)))
    last_beat = ST_T_onset[-1]

    #La variable onset representa el inicio de cada segmento y la variable interval representa la duración de cada segmento
    number_of_ST_T = len(ST_T_onset)

    for k in range(number_of_ST_T):
        onset = int(np.ceil(ST_T_onset[k]))
        interval = onset + ST_T_interval

        if interval <= len(ecg_signal):
            ST_T_complexes[k, :] = ecg_signal[int(onset):int(interval)]

    return ST_T_complexes, ST_T_onset

In [None]:
# Definimos la ruta donde se encuentran las señales
ruta_carpeta='/content/Data_TWA_Physionet_ControlECG/'
# Abre el archivo en modo lectura
contenido_carpeta = os.listdir(ruta_carpeta)
base=[]
# Imprime el contenido de la carpeta para comprobar los archivos
print("Contenido de la carpeta:")
for elemento in contenido_carpeta:
    base.append(elemento)
print(len(base))


# Creamos un diccionario para almacenar los datos
data_dict = {}
for archivo in base:
    ruta_archivo = os.path.join(ruta_carpeta, archivo)
    print(ruta_archivo)
    # Verificar si es un archivo .mat
    if os.path.isfile(ruta_archivo):
        if archivo.endswith('.mat'):
            # Cargar archivo .mat
            signal = loadmat(ruta_archivo)

            #Definimos 
            Fs = signal['Fs']
            ecg = signal['ecg']
            qrs = signal['qrs']
            QRS_ann = signal['qrs_ann']

            
            #Remuestreo 
            duration_original = len(ecg[0,:]) / Fs
            qrs = np.array(qrs) / Fs
            Fs=250
            ecg = resample(ecg[0,:], int(duration_original * Fs)).reshape(1, -1)
            qrs= (qrs * Fs).astype(int)


            # Asociar los datos al archivo correspondiente en el diccionario.
            data_dict[archivo] = {
                'ecg': ecg,
                'ST_T_onset': ST_T_onset,
                'qrs': qrs,
                'qrs_ann': QRS_ann
            }


# AÑADIR ALTERNANCIA

### FUNCIONES DE LA ALTERNANCIA

In [None]:
def include_TWA(ecg, ST_T_onset, Alternan_wave, TWA_pattern, Type_of_inclusion):
"""
El propósito de esta función es la inclusión de una onda alternante en una señal de ECG.

Parámetros:
    ecg (numpy array): Array de enteros correspondiente a la señal de ECG.
    ST_T_onset (numpy array): Matriz de anotaciones de valores de inicio de ST-T.
    Alternan_wave (numpy array): Forma de onda para realizar TWA.
    TWA_pattern (numpy array): Array que describe los intervalos donde se incluyen las TWA.
    Type_of_inclusion (str): Si este string es 'alternate', la onda alternante se incluye en cada latido alterno con signo opuesto.
                             De lo contrario, la onda alternante se incluye solo cada dos latidos.

Retornos:
    ecg_with_TWA (numpy array): ECG con TWA.
    Alt_waves (numpy array): Matriz cuyas filas consisten en las ondas alternantes del ecg_with_TWA. Posteriormente se usa para calcular el ANR.
"""

    # Definimos la longitud de la onda de alternancia
    L_AltWav = len(Alternan_wave)

    # Creamos una copia de la señal de ECG
    ecg_with_TWA = ecg.copy()

    # Creamos una matriz de Alt_wave
    Alt_waves = np.zeros((len(ST_T_onset), L_AltWav))

    if Type_of_inclusion.lower() == 'alternate':
        # Incluir una onda alternante en cada otro latido con signo opuesto
        for k, onset in enumerate(ST_T_onset):
            if onset + L_AltWav <= (ecg.shape[1]):
                Alt_waves[k, :] = TWA_pattern[k] * Alternan_wave * (-1) ** k

                ecg_with_TWA[0, onset:onset + L_AltWav] += Alt_waves[k, :]



    else:
        # Incluimos la onda de alternancia cada dos latidos 
        for k, onset in enumerate(ST_T_onset[1::2]):
            if onset + L_AltWav <= (ecg.shape[1]):
                Alt_waves[k, :] = TWA_pattern[2 * k] * Alternan_wave
                ecg_with_TWA[0, onset:onset + L_AltWav] += Alt_waves[k, :]
    return ecg_with_TWA, Alt_waves


In [None]:
def alternant_wave_inclusion(ecg, TWA_pattern, type_of_inclusion, ST_T_onset, V_Alt_wave, Fs):
    
"""
Esta función introduce una onda alternante según el patrón definido por 'TWA_pattern' en la señal 'ecg'.

Parámetros de entrada:
    - ecg: anotaciones de la onda R.
    - TWA_pattern: vector de patrón TWA.
    - type_of_inclusion: manera en que se incluye TWA; puede ser 'alternate' o no.
    - ST_T_onset: inicio de los complejos ST-T del ECG.
    - V_Alt_wave: voltaje alternante.
    - Fs: frecuencia de muestreo.

Parámetros de salida:
    - ecg_with_TWA: señal de ECG con alternancia.
    - Alt_waves: matriz cuyas filas consisten en la onda alternante de ecg_with_TWA.
    - Alternan_wave: forma de onda para realizar TWA.
    - Onset_Valt: inicio de la inclusión de TWA, considerando el efecto de Jitter.
"""

    # Cargamos la onda alternante que añadiremos
    mat_data = scipy.io.loadmat('AltWav_250hz.mat')

    AltWav = np.random.randint(1, 16)

    # Access the desired key
    Alternan_wave = mat_data['AltWav_250hz'][:, AltWav - 1]

    if type_of_inclusion.lower() == 'alternate':
        alpha = V_Alt_wave / (np.max(np.abs(2 * Alternan_wave)) * 1e3)
    else:
        alpha = V_Alt_wave / (np.max(np.abs(Alternan_wave)) * 1e3)

    jitter_std = int(0.02 * Fs)  # Desviación estándar del efecto de Jitter de 20 ms (0.02*Fs samples)

    Onset_Valt = (ST_T_onset + np.round(np.random.randn(len(ST_T_onset)) * jitter_std)).astype(int)


    ecg_with_TWA, Alt_waves = include_TWA(ecg, Onset_Valt, alpha * Alternan_wave, TWA_pattern, type_of_inclusion)

    V0 = np.max(np.abs(alpha * Alternan_wave)) * 1e3  # Amplitud de la onda de alternancia en microvoltios
    if type_of_inclusion.lower() == 'alternate':
        V0 = 2 * V0

    return ecg_with_TWA, Alt_waves, Alternan_wave, alpha, Onset_Valt

### INTRODUCCIÓN ALTERNANCIA

In [None]:
signals_with_alternans = {}

# Calculamos la mitad del conjunto de datos 
half_dataset_size = len(data_dict) //2

# Obtenemos las claves correspondientes a la mitad del dicionario del conjunto de datos
half_keys = list(data_dict.keys())[:half_dataset_size]

# Establecemos los parámetros
type_of_inclusion = 'alternate'
V_Alt_wave = 70 # muV ----->  Especificamos la amplitud del voltaje de la alternancia que queramos introducir. En este TFG hemos usado 70,35 y 20
Fs = 250 # Hz

for key in half_keys:
    signal = data_dict[key]['ecg']

    # Obtenemos los 'TWA_pattern' y los 'ST_T_onset' correspondientes..
    TWA_pattern = np.ones(qrs.shape[1])
    TWA_pattern = np.ones(data_dict[key]['qrs'].shape[1])
    ST_T_onset = data_dict[key]['ST_T_onset']
    QRS_ann = data_dict[key]['qrs_ann']
    qrs = data_dict[key]['qrs']
    # Genera la nueva señal con alternancia usando la función 'alternant_wave_inclusion'.
    ecg_with_TWA, alt_waves, Alternan_wave, alpha, Onset_Valt = alternant_wave_inclusion(signal, TWA_pattern, type_of_inclusion, ST_T_onset, V_Alt_wave, Fs)
    # Convertimos la lista a NumPy array
    st_t_onset_array = np.array(ST_T_onset)
    # Agregamos la nueva señal al diccionario de señales
    signals_with_alternans [key] = {
                'ecg': ecg_with_TWA,
                'ST_T_onset': st_t_onset_array,
                'qrs': qrs,
                'qrs_ann': QRS_ann,
                'label': 1
            }

### UNIÓN DICCIONARIOS

In [None]:
half_dataset_size = len(data_dict) //2
half_keys = list(data_dict.keys())[half_dataset_size:]
signals_no_TWA = {key: data_dict[key] for key in half_keys}
merged_dict = {**signals_with_alternans, **signals_no_TWA}


# PREPROCESADO

### FUNCIONES

#### 1. Eliminación del desplazamiento de la línea base

In [None]:
def bw_elimination(orig_ecg, fiducial_points, fs):
"""
Esta función produce una estimación del baseline wander utilizando ajuste polinomial mediante splines cúbicos.

Parámetros:
    orig_ecg (array): Muestras del ECG original.
    fiducial_points (array): Puntos fiduciales.
    fs (float): Frecuencia de muestreo.

Retornos:
    bw_estimate (array): Estimación del baseline wander.
"""


    tPQ = 80e-3  # Tomamos una muestra representativa de la línea isoeléctrica 80 ms antes del punto fiducial.
    nPQ = int(tPQ * fs)
    numb_fid_points = len(fiducial_points)

    if fiducial_points[0].any() - nPQ <= 0:
        the_knots = np.array(np.zeros(numb_fid_points+1))
        the_knots[0] = 1
        the_knots[1:numb_fid_points] = fiducial_points[1:numb_fid_points] - nPQ
        the_knots[numb_fid_points] = len(orig_ecg)
        the_knots=the_knots-1
        the_knots=the_knots.astype(int)
        c =np.concatenate((the_knots[1],the_knots[1:numb_fid_points],the_knots[numb_fid_points-1]),axis=None)

        bw_estimate = splev(np.arange(len(orig_ecg)), splrep(the_knots,orig_ecg[c]), ext=0)

    else:
        the_knots = np.array(np.zeros(numb_fid_points + 2))
        the_knots[1:numb_fid_points] = fiducial_points - nPQ
        the_knots[0] = 1
        the_knots[numb_fid_points] = len(orig_ecg)
        the_knots=the_knots-1
        the_knots=the_knots.astype(int)
        c =np.concatenate((the_knots[1],the_knots[1:numb_fid_points],the_knots[numb_fid_points-1]),axis=None)
        bw_estimate = splev(np.arange(len(orig_ecg)), splrep(the_knots,c), ext=0)


    return bw_estimate

#### 2. Segmentación en complejos ST-T

In [None]:
def ST_T_delineation(ecg_signal, fiducial_points, fs):
"""
Esta función delinea el complejo ST-T usando intervalos de 300 ms de longitud.
El inicio del ST-T se elige a una distancia variable del punto QRS que
depende de la longitud del intervalo RR anterior.

Parámetros:
    ecg_signal (array): Señal de ECG.
    fiducial_points (array): Anotaciones de la onda R.
    fs (float): Frecuencia de muestreo.

Retornos:
    ST_T_complexes (array): Matriz de complejos ST-T.
    ST_T_onset (array): Matriz de anotaciones de valores de inicio de ST-T.
"""

    RR_interval = np.diff(fiducial_points) / fs  # Intervalo RR en segundos
    qi = 40 + (1.3 * np.sqrt(RR_interval * 1e3))  # ST-T onset desde R en milisegundos

    if qi.ndim == 1:
        qi = np.column_stack((qi[0], qi[:-1]))
    else:
        qi = np.column_stack((qi[:, 0], qi[:, :-1]))
    s = np.array(qi * 1e-3 * fs)
    fiducial_points = fiducial_points.transpose()


    ST_T_onset = fiducial_points + np.ceil(s) + 1  # ST-T onset en muestras

# para que no sea una lista con matrices con el mismo numero repetido
    ST_T_onset = [matriz[0] for matriz in ST_T_onset]

    ST_T_interval = np.ceil(300e-3 * fs)  # Longitud del intervalo ST-T en número de muestras

    ST_T_complexes = np.zeros((len(ST_T_onset)-1, int(ST_T_interval)))
    last_beat = ST_T_onset[-1]

    #La variable onset representa el inicio de cada segmento y la variable interval representa la duración de cada segmento
    number_of_ST_T = len(ST_T_onset)

    for k in range(number_of_ST_T):
        onset = int(np.ceil(ST_T_onset[k]))
        interval = onset + ST_T_interval

        if interval <= len(ecg_signal):
            ST_T_complexes[k, :] = ecg_signal[int(onset):int(interval)]

    return ST_T_complexes, ST_T_onset

#### 3. Enventanado

In [None]:
def heartbeat_windowing(ST_T_complexes, ST_T_onset, WindSize, WindStep):
"""
Esta función realiza el enventanado para rastrear alternancia en un ECG.

Parámetros:
    ST_T_complexes (array): Matriz 2D de los segmentos ST-T de la señal.
    ST_T_onset (array): Inicio de los segmentos ST-T.
    WindSize (int): Longitud de la ventana en latidos.
    WindStep (int): Tamaño del paso del proceso deslizante.

Retornos:
    ST_T_segments (array): Tensor con los complejos ST-T enventanados.
    ST_T_onset_by_segments (array): Inicio ST-T del tensor.
    ST_T_dim (tuple): Dimensiones de la matriz de entrada de complejos ST-T.
"""

    Overlap = WindSize - WindStep
    ST_T_dim = np.shape(ST_T_complexes)

    NumbSegments = int((ST_T_dim[0] - Overlap) / WindStep)


    # Ajustamos el número de segmentos si es necesario.
    if NumbSegments == np.ceil((ST_T_dim[0] - Overlap) / WindStep):
        NumbSegments = NumbSegments - 1


    ST_T_segments = np.zeros((WindSize, ST_T_dim[1], NumbSegments))  # Tensor para almacenar complejos ST-T enventanados
    ST_T_onset_by_segments = np.zeros((NumbSegments, WindSize))  # ST-T onset del tensor


    index1 = 0
    index2 = WindSize

    # Enventanando
    for k in range(NumbSegments):
        ST_T_segments[:,:,k] = ST_T_complexes[index1:index2,:]

        ST_T_onset_by_segments[k,:] = ST_T_onset[index1:index2]
        index1 = index1 + WindStep
        index2 = index2 + WindStep

    return ST_T_segments, ST_T_onset_by_segments, ST_T_dim

#### 4. Alineación de los ST-T

In [2]:

#funcion para redondear
def np_round(a, decimals=0):
    return np.round(a * 10 ** decimals) / 10 ** decimals


def ST_T_alignment(ST_T_complexes, ST_T_onset, ecg_signal, fs):

    nbeats, nsamples = ST_T_complexes.shape
    ST_T_complexes_align = np.zeros(ST_T_complexes.shape)
    # Media del template
    proto = np.median(ST_T_complexes, axis=0)
    # Posible variación temporal desde la posición inicial de cada complejo ST_T
    w =(0.03 * fs)  # muestras
    w = np_round(w)
    w= int(w)

    for k in range(nbeats):
        ini = int(ST_T_onset[k])
        fin = int(ini + nsamples - 1)

        ecg_segment = ecg_signal[ini - w:fin + w]
        cross_corr = np.zeros(2 * w)

        for m in range(2 * w):
            temp = ecg_segment[m:-(2 * w) + m] 
            # Ajustar la dimensión de temp
            if temp.shape[0] < proto.shape[0]:
                temp = np.append(temp, temp[-1])
            cc = np.corrcoef(proto, temp)
            cross_corr[m] = cc[1, 0]

        posMax = np.argmax(cross_corr)
        ST_T_complexes_align[k, :] = ecg_segment[posMax:posMax + nsamples]    

    return ST_T_complexes_align

#### 5. Sustracción de fondo

In [None]:
def background_subtraction(ST_T_ComplexTensor, WindSize):
"""
Esta función realiza la substracción de fondo de complejos ST-T consecutivos.

Parámetros:
    ST_T_ComplexTensor (array): Tensor con complejos ST-T enventanados. (array 3D)
    WindSize (int): Tamaño de la ventana deslizante.

Retornos:
    ST_T_TensorBacGrElim (array): Tensor de complejos ST-T enventanados después de la substracción de fondo.
"""

    ST_T_TensorBacGrElim = np.zeros_like(ST_T_ComplexTensor)

    X = ST_T_ComplexTensor.copy()
    A_pattern = X[0::2, :]
    A_pattern_dis = np.zeros_like(A_pattern)
    A_pattern_dis[0:-1, :] = A_pattern[1:, :]
    A_pattern_dis[-1, :] = A_pattern[WindSize // 2 - 1, :]
    B_pattern = X[1::2, :]
    X[0::2, :] = X[0::2, :] - B_pattern
    X[1::2, :] = X[1::2, :] - A_pattern_dis
    ST_T_TensorBacGrElim = X

    return ST_T_TensorBacGrElim

#### 6. Filtro Paso Bajo

In [None]:
def filtering_ST_T_segments(ST_T_ComplexTensor, Fs):
"""
Esta función filtra los complejos ST-T para eliminar el ruido fuera de la banda de TWA (0.3-15 Hz).

Parámetros:
    ST_T_ComplexTensor (array): Tensor con complejos ST-T enventanados.
    Fs (float): Frecuencia de muestreo de trabajo.

Retornos:
    ST_T_TensorFiltered (array): Tensor de complejos ST-T enventanados después del filtrado.
"""

    # Diseñando el filtro
    nyquist = 0.5 * Fs
    passband_low = 0.3
    passband_high = 15.0
    ripple_db = 0.01
    ripple = 80.0

    # obtener numtaps y beta usanod kaiserord
    numtaps, beta = kaiserord(ripple, (passband_high - passband_low) / nyquist)

    # Calcular las frecuencias de corte normalizadas.
    cutoff_low = passband_low / nyquist
    cutoff_high = passband_high / nyquist

    # Calculando los coeficientes del filtro usando firwin
    b = firwin(numtaps, [cutoff_low, cutoff_high], window=('kaiser', beta), pass_zero=False, scale=False)
    delay = int((numtaps + 1) / 2)
    b_wide = b

    ST_T_TensorFiltered = np.zeros_like(ST_T_ComplexTensor)  # Inicializar el tensor filtrado

    # Iterar sobre segmentos, ventanas y muestras ST-T
    for k1 in range(ST_T_ComplexTensor.shape[2]):
        for k2 in range(ST_T_ComplexTensor.shape[0]):
            # Aplicar filtrado a cada ventana
            filtered_wide = lfilter(b_wide, 1, np.concatenate((ST_T_ComplexTensor[k2, :, k1], np.zeros(delay))))
            ST_T_TensorFiltered[k2, :, k1] = filtered_wide[delay:ST_T_ComplexTensor.shape[1] + delay]

    return ST_T_TensorFiltered

### PREPROCESAMIENTO

In [None]:
#Creamos un diccionario para almacenar todas las señales filtradas
all_filtered_FINAL = {}

for key in merged_dict:
    signal = merged_dict[key]['ecg']
    qrs = merged_dict[key]['qrs']
    


    ecg = signal
  # 1. Eliminación del desplazamiento de la línea base
    bw_estimate_clean = bw_elimination(ecg[0,::], qrs[0,::], Fs)
    clean_ecg_noBW = ecg[0,:] - bw_estimate_clean + np.mean(bw_estimate_clean)

  # 2. Segmentación en complejos ST-T 
    ST_T_complexes, ST_T_onset = ST_T_delineation(clean_ecg_noBW, qrs, Fs)


  # 3. Enventanado
    WindSize = 32 # analisis en bloques de 32 latidos 
    WindStep = 8 # cada ventana consta de 8 latidos nuevos 
    ST_T_segments, ST_T_onset_by_segments, _ = heartbeat_windowing(ST_T_complexes, ST_T_onset, WindSize, WindStep)


  # 4. Alineación de los segmentos ST-T

    WindSize = 32
    NumbSegments = ST_T_segments.shape[2]

  # Paso 1: Crear una matriz de ceros para almacenar los segmentos ST-T alineados
    ST_T_segments_aligned = np.zeros((WindSize, ST_T_segments.shape[1], NumbSegments))
  # Paso 2: Iterar sobre cada segmento
    for k in range(NumbSegments):
      # Paso 3: Alineación de los segmentos ST-T usando la función de alineamiento 
        aligned_segment = ST_T_alignment(ST_T_segments[:,::,k], ST_T_onset_by_segments[k,:], clean_ecg_noBW, Fs)

      # Paso 4: Guardamos los segmentos alineados 
        ST_T_segments_aligned[:,::,k] = aligned_segment


  # 5. sustración de fondo 

    WindSize = 32
    NumbSegments = ST_T_segments_aligned.shape[2]

    ST_T_dif = np.zeros_like(ST_T_segments_aligned)
    for k in range(NumbSegments-1):
        ST_T_TensorBacGrElim = background_subtraction(ST_T_segments_aligned[:,:,k], WindSize)

        ST_T_dif[:,:,k] = ST_T_TensorBacGrElim


  # 6. Filtro paso Bajo, la información está por debajo de 15 Hz.
    fs = 250
    ST_T_dif_filtered = filtering_ST_T_segments(ST_T_dif, fs)
    segmentos = {}
    for k in range(NumbSegments):
        segmentos[k] = ST_T_dif_filtered[:,:,k]
        all_filtered_FINAL[key] = segmentos

# ALMACENAMIENTO 

Guardamos el diccionario que está comprendido por múltiples arrays de NumPy en un archivo comprimido .npz

In [None]:
np.savez('/content/ecg_data.npz', **all_filtered_FINAL)