# Bibliotecas


In [None]:
from scipy.signal import convolve
from scipy.signal import hilbert
from scipy.signal import find_peaks
from scipy.signal import correlate
from scipy.signal import filtfilt
from statistics import median_high
from statistics import median_low
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import statistics
import numpy as np
import scipy.io
import math
import time
import os
from sklearn.preprocessing import Normalizer

# Importando Dados

In [None]:
path = os.path.join(os.getcwd(), 'data_training')

mat_files = []
for file in os.listdir('data_training'):
    if file.endswith(".mat"):
        mat_files.append(file)

In [None]:
ECG_data = []
for file in mat_files:
    if os.name == 'nt':
        bar = '\\'
    else:
        bar = '/'
    current_path = path + bar + file
    mat_data = scipy.io.loadmat(current_path)
    ECG_data.append(mat_data['val'][0])

# Filtro de Linha de Base

In [None]:
def baseline_filter(data, f_samp, n):
    hi_D = [-0.2304, 0.7148, -0.6309, -0.0280, 0.1870, 0.0308, -0.0329, -0.0106]
    lo_D = [-0.0106, 0.0329, 0.0308, -0.1870, -0.0280, 0.6309, 0.7148, 0.2304]
    hi_R = [-0.0106, -0.0329, 0.0308, 0.1870, -0.0280, -0.6309, 0.7148, -0.2304]
    lo_R = [0.2304, 0.7148, 0.6309, -0.0280, -0.1870, 0.0308, 0.0329, -0.0106]

    cA, cD, A_list = decomposition_signal(lo_D, hi_D, data, f_samp, n)
    cA = recomposition_signal(A_list, lo_R, hi_R, cA, cD, f_samp, n)

    filtered_data = data - cA

    return filtered_data

def decomposition_signal(lo_D, hi_D, cA, f_samp, n):
    A_list = []
    for i in range(0, n):
        # Convolução sem deslocamento do sinal e extensão do sinal no ínicio e no fim de f_samp amostras
        A = 0.5*filtfilt(lo_D, 1, cA, padlen=0)
        A = np.concatenate((A[0]*np.ones((1,f_samp))[0], A, A[-1]*np.ones((1,f_samp))[0]), axis=None)
        A_list.append(A)
        D = 0.5*filtfilt(hi_D, 1, cA, padlen=0)
        D = np.concatenate((D[0]*np.ones((1,f_samp))[0], D, D[-1]*np.ones((1,f_samp))[0]), axis=None)
        # Subamostragem do Sinal excluíndo metade das amostras
        cA = [x for index, x in enumerate(A) if index%2 == 0]
        cD = [x for index, x in enumerate(D) if index%2 == 0]

    return cA, cD, A_list

def recomposition_signal(A_list, lo_R, hi_R, cA, cD, f_samp, n):
    for i in range(0,n):
        # Intercalamento do sinal com zeros
        scA = np.zeros((1, 2*len(cA)))[0]
        scA = [cA[int(index/2)] if index%2==0 else 0 for index, x in enumerate(scA)]
        scD = np.zeros((1, 2*len(cD)))[0]
        scD = [cD[int(index/2)] if index%2==0 else 0 for index, x in enumerate(scD)]
        
        if len(scA) > len(A_list[n-1-i]):
            scA = scA[0:len(scA)-1]
            scD = scD[0:len(scD)-1]

        # Aplicação do Filtro e retirada de frequência amostras no início e no final
        cA = filtfilt(lo_R,1,scA, padlen=0)
        cA = cA[f_samp:len(cA)-f_samp]

    return cA

# Obtendo métrica ACL

In [None]:
def generate_mexican_hat_filter(scale = 2):
    step = 1/scale
    X = np.arange(-5, 5, step)
    np.append(X, 5)

    Y = []
    for i in range(0, len(X)):
        Y.append(2.1741*(1/math.sqrt(2*math.pi) * (1 - X[i]**2) * np.exp(-X[i]**2/2)))
  
    return Y   

In [None]:
def wavelet_transform(channel_data, num):
    gn = [-2, 2]
    hn = [1/8, 3/8, 3/8, 1/8]

    for iter in range(num):
        wavelet_transformed_data = convolve(channel_data, gn)
        
        for iter_gn in range(int(len(gn)/2)):
            wavelet_transformed_data = np.delete(wavelet_transformed_data, 0)
            wavelet_transformed_data = np.delete(wavelet_transformed_data, len(wavelet_transformed_data) - 1)

        channel_data = convolve(channel_data, hn)
        
        for iter_hn in range(int(len(hn)/2)):
            channel_data = np.delete(channel_data, 0)
            channel_data = np.delete(channel_data, len(channel_data) - 1)
        
        new_gn = []
        new_hn = []

        for iter_gn in range(len(gn)):
            new_gn.append(gn[iter_gn])
            new_gn.append(0)
        gn = new_gn

        for iter_hn in range(len(hn)):
            new_hn.append(hn[iter_hn])
            new_hn.append(0)
        hn = new_hn

    return wavelet_transformed_data

In [None]:
def get_acl(wavelet_transformed_data, floating_window_size):
    acl = np.zeros(len(wavelet_transformed_data))

    # A métrica ACL é definida pelo produto entre a função relativa a área abaixo da onda e
    # a função representando a curva da onda.

    for iter_sample in range(len(wavelet_transformed_data) - floating_window_size):

        # y_k é um vetor incluindo amostras de k até k + L da versão filtrada relativa à escala 2λ
        y_k = wavelet_transformed_data[iter_sample : (iter_sample + floating_window_size - 1)]
        area_k = sum(abs(y_k))
        
        curve_k = 0
        for iter_yk in range(1, len(y_k)):
            curve_k = curve_k + math.sqrt(1 + (y_k[iter_yk] - y_k[iter_yk - 1])**2)
        
        acl[iter_sample] = area_k*curve_k
    
    return acl

# Extração de parâmetros

## Complexo QRS

In [None]:
def get_superenergy_signal(data, f_samp, scale = 2):
    filter = generate_mexican_hat_filter(scale)
    window_size = round(0.15 * f_samp)
    
    samples_lenght = len(data)
    result_signal = np.zeros(samples_lenght)

    filtered_data = convolve(data, filter)

    # Remoção dos dados não úteis resultantes da convolução
    gap = int(np.round(len(filter) / 2)) - 1
    filtered_data = np.copy(filtered_data[gap : (len(filtered_data) - gap)])
    
    derivative_filter = np.diff(filtered_data)
    
    envelope = hilbert(derivative_filter)
    envelope_amplitude = np.abs(envelope)
    
    result_signal = result_signal + envelope_amplitude

    result_signal_index = list(range(len(result_signal)))
    result_signal = np.array([0 if (index < window_size or index > len(result_signal) - (window_size + 1)) else signal for index, signal in zip(result_signal_index, result_signal)])

    return result_signal

In [None]:
def find_signal_peaks(data, f_samp):
    data = data / max(abs(data))
    sample_time = len(data) / f_samp

    minimum_beats = math.floor(0.7 * sample_time)
    maximum_beats = math.ceil(3.5 * sample_time)
    
    current_number_of_beats = 100
    threshold = 0
    while(current_number_of_beats < minimum_beats or current_number_of_beats > maximum_beats):    
        
        if current_number_of_beats > maximum_beats:
            threshold = threshold + 0.1
        elif current_number_of_beats < minimum_beats:
            threshold = threshold - 0.01

        try:
            peaks = find_peaks(data, height = threshold, distance = 0.3 * f_samp)[0]
            current_number_of_beats = len(peaks)

        except:
            continue

    return peaks

In [None]:
def get_qrs_peak(data_gap, data, data_gap_start, data_gap_end, data_start, data_end, f_samp, scale = 2):
    superenergy_signal = get_superenergy_signal(data_gap, f_samp, scale)
    
    superenergy_signal_start = int(data_start - data_gap_start)
    superenergy_signal_end = len(superenergy_signal) - int(data_gap_end - data_end)
    superenergy_signal = superenergy_signal[superenergy_signal_start:superenergy_signal_end]

    superenergy_signal_peaks = find_signal_peaks(superenergy_signal, f_samp)

    qrs_peaks = []
    for iter_peak in range(len(superenergy_signal_peaks)):
        current_peak = superenergy_signal_peaks[iter_peak]

        # Procurando o pico da onda R em uma janela de 120 ms
        window_size = round(0.10*f_samp)
            
        if (current_peak - window_size) >= 0:
            qrs_peaks.append(current_peak - window_size + np.argmax(abs(data[current_peak - window_size : current_peak + window_size])))
        else:
            qrs_peaks.append(np.argmax(abs(data[0 : current_peak + window_size])))
            
    return qrs_peaks

In [None]:
def get_qrs_interval(qrs_peaks, acl, f_samp, delay):
    qrs_on = []
    qrs_off = []
    
    for iter_peak in range(len(qrs_peaks)):
        
        iter_sample_on = qrs_peaks[iter_peak]
        if (iter_sample_on - round(0.12*f_samp) >= 1):
            window = acl[(iter_sample_on - round(0.12*f_samp)) : (iter_sample_on + round(0.12*f_samp))]
        else:
            window = acl[0 : (iter_sample_on + round(0.12*f_samp))]
        
        found = False
        while not(found):
            if acl[iter_sample_on] < 1.1*acl[iter_sample_on - 1] and acl[iter_sample_on] < 1.1*acl[iter_sample_on + 1] and qrs_peaks[iter_peak] - (iter_sample_on + delay) >= 0.06*f_samp and acl[iter_sample_on] < 0.7*max(window):
                found = True
                break
            else:
                iter_sample_on = iter_sample_on - 1
        qrs_on.append(iter_sample_on + delay)

        iter_sample_end = qrs_peaks[iter_peak]
        min_value = acl[iter_sample_end]
        min_position = iter_sample_end
        found = False
        while not(found):
            if acl[iter_sample_end] < 1.1*acl[iter_sample_end - 1] and acl[iter_sample_end] < 1.1*acl[iter_sample_end + 1] and iter_sample_end - qrs_peaks[iter_peak] >= 0.06*f_samp and acl[iter_sample_end] < 0.7*max(window):
                found = True
                break
            else:
                if acl[iter_sample_end] <= min_value:
                    min_value = acl[iter_sample_end]
                    min_position = iter_sample_end
                iter_sample_end = iter_sample_end + 1
                
                if iter_sample_end >= len(acl) - 1:
                    iter_sample_end = min_position
                    found = True
                    break
        qrs_off.append(iter_sample_end)

    return qrs_on, qrs_off

In [None]:
def get_qrs_amplitudes(qrs_peaks, qrs_on, data):
    qrs_amplitudes = [float("{:.4f}".format((data[peak] - data[start]))) for peak, start in zip(qrs_peaks, qrs_on)]
    
    return qrs_amplitudes

In [None]:
def get_rr_interval(qrs_peaks, f_samp):
    rr_intervals = []
    for iter_peak in range(1, len(qrs_peaks)):
        rr_intervals.append((qrs_peaks[iter_peak] - qrs_peaks[iter_peak - 1]) / f_samp)
    
    return rr_intervals

## Onda T

In [None]:
def search_t_interval(median_qrs_on, median_qrs_off, delay):
    search_t_on = []
    search_t_on.extend(median_qrs_off)
    search_t_on.pop()

    search_t_off = []
    search_t_off.extend(median_qrs_on)
    search_t_off.pop(0)

    for iter in range(len(search_t_off)):
        search_t_off[iter] = search_t_off[iter] - delay
        search_t_off[iter] = int(search_t_on[iter] + (search_t_off[iter] - search_t_on[iter]) / 2)

    return search_t_on, search_t_off

In [None]:
def get_t_peak(data, acl, t_on, t_off, delay):
    t_peaks = []
    current_channel_t_peaks = []
    current_acl = acl

    for iter in range(len(t_on)):
        search_area = current_acl[t_on[iter] : t_off[iter]]
        try:
            current_t_peak = np.where(search_area == np.amax(search_area))[0][0]
        except:
            continue
        current_channel_t_peaks.append(current_t_peak + t_on[iter])

    current_channel_real_t_peaks = []
    current_channel_data = np.copy(data)
    for iter_peak in range(len(current_channel_t_peaks)):
        current_peak = current_channel_t_peaks[iter_peak]

        if (current_peak - delay) >= 0:
            if current_channel_data[current_peak] >= 0:
                current_channel_real_t_peaks.append(current_peak - delay + np.argmax(current_channel_data[current_peak - delay : current_peak + delay]))
            else:
                current_channel_real_t_peaks.append(current_peak - delay + np.argmin(current_channel_data[current_peak - delay : current_peak + delay]))
        else:
            if current_channel_data[current_peak] >= 0:
                current_channel_real_t_peaks.append(np.argmax(current_channel_data[0 : current_peak + delay]))
            else:
                current_channel_real_t_peaks.append(np.argmin(current_channel_data[0 : current_peak + delay]))

    t_peaks = (current_channel_real_t_peaks)

    return t_peaks

In [None]:
def get_t_interval(t_peaks, qrs_off, acl, f_samp, delay):
    t_on = []
    t_off = []

    for iter_peak in range(len(t_peaks)):

        iter_sample_on = t_peaks[iter_peak]
        if (iter_sample_on - round(0.12*f_samp) >= 0):
            window = acl[(iter_sample_on - round(0.12*f_samp)) : (iter_sample_on + round(0.12*f_samp))]
        else:
            window = acl[0 : (iter_sample_on + round(0.12*f_samp))]

        found = False
        while not(found):
            if iter_sample_on <= qrs_off[iter_peak]:
                found = True
                break
            if acl[iter_sample_on] < 1.1*acl[iter_sample_on - 1] and acl[iter_sample_on] < 1.1*acl[iter_sample_on + 1] and t_peaks[iter_peak] - (iter_sample_on + delay) >= 0.1*f_samp and acl[iter_sample_on] < 0.7*max(window):
                found = True
                break
            else:
                iter_sample_on = iter_sample_on - 1
        t_on.append(iter_sample_on)

        iter_sample_end = t_peaks[iter_peak]
        min_value = acl[iter_sample_end]
        min_position = iter_sample_end
        found = False
        while not(found):
            if acl[iter_sample_end] < 1.1*acl[iter_sample_end - 1] and acl[iter_sample_end] < 1.1*acl[iter_sample_end + 1] and iter_sample_end - t_peaks[iter_peak] >= 0.1*f_samp and acl[iter_sample_end] < 0.7*max(window):
                found = True
                break
            else:
                if acl[iter_sample_end] <= min_value:
                    min_value = acl[iter_sample_end]
                    min_position = iter_sample_end
                iter_sample_end = iter_sample_end + 1

                if iter_sample_end >= len(acl) - 1:
                    iter_sample_end = min_position
                    found = True
                    break
        t_off.append(iter_sample_end)

    return t_on, t_off

In [None]:
def get_t_amplitudes(t_peaks, median_t_off, data):
    t_amplitudes = []

    current_difference = [float("{:.4f}".format((data[peak] - data[end]))) for peak, end in zip(t_peaks, median_t_off)]
    t_amplitudes.extend(current_difference)

    return t_amplitudes

## Onda P

In [None]:
def search_p_interval(median_qrs_on, median_qrs_off, delay):
    search_p_on = []
    search_p_on.extend(median_qrs_off)
    search_p_on.pop()

    search_p_off = []
    search_p_off.extend(median_qrs_on)
    search_p_off.pop(0)

    for iter in range(len(search_p_off)):
        search_p_off[iter] = search_p_off[iter] - delay
        search_p_on[iter] = int(search_p_on[iter] + (search_p_off[iter] - search_p_on[iter]) / 2)

    return search_p_on, search_p_off

In [None]:
def get_p_peak(data, acl, p_on, p_off, delay):
    p_peaks = []

    current_channel_p_peaks = []
    current_acl = acl

    for iter in range(len(p_on)):
        search_area = current_acl[p_on[iter] : p_off[iter]]
        try:
            current_p_peak = np.where(search_area == np.amax(search_area))[0][0]
        except:
            continue
        current_channel_p_peaks.append(current_p_peak + p_on[iter])

    current_channel_real_p_peaks = []
    current_channel_data = np.copy(data)
    for iter_peak in range(len(current_channel_p_peaks)):
        current_peak = current_channel_p_peaks[iter_peak]

        if (current_peak - delay) >= 0:
            if current_channel_data[current_peak] >= 0:
                current_channel_real_p_peaks.append(current_peak - delay + np.argmax(current_channel_data[current_peak - delay : current_peak + delay]))
            else:
                current_channel_real_p_peaks.append(current_peak - delay + np.argmin(current_channel_data[current_peak - delay : current_peak + delay]))
        else:
            if current_channel_data[current_peak] >= 0:
                current_channel_real_p_peaks.append(np.argmax(current_channel_data[0 : current_peak + delay]))
            else:
                current_channel_real_p_peaks.append(np.argmin(current_channel_data[0 : current_peak + delay]))

        p_peaks = (current_channel_real_p_peaks)

    return p_peaks

In [None]:
def get_p_interval(p_peaks, qrs_on, acl, f_samp, delay):
    p_on = []
    p_off = []

    for iter_peak in range(len(p_peaks)):

        iter_sample_on = p_peaks[iter_peak]
        if (iter_sample_on - round(0.12*f_samp) >= 0):
            window = acl[(iter_sample_on - round(0.12*f_samp)) : (iter_sample_on + round(0.12*f_samp))]
        else:
            window = acl[0 : (iter_sample_on + round(0.12*f_samp))]

        found = False
        while not(found):
            if acl[iter_sample_on] < 1.2*acl[iter_sample_on - 1] and acl[iter_sample_on] < 1.2*acl[iter_sample_on + 1] and p_peaks[iter_peak] - (iter_sample_on + delay) >= 0.04*f_samp and acl[iter_sample_on] < 0.7*max(window):
                found = True
                break
            else:
                iter_sample_on = iter_sample_on - 1

            if iter_sample_on <= 0:
                iter_sample_on = - delay
                found = True
                break
        p_on.append(iter_sample_on + delay)

        iter_sample_end = p_peaks[iter_peak]
        min_value = acl[iter_sample_end]
        min_position = iter_sample_end
        found = False
        while not(found):
            if iter_sample_end >= qrs_on[iter_peak + 1]:
                found = True
                break
            if acl[iter_sample_end] < 1.2*acl[iter_sample_end - 1] and acl[iter_sample_end] < 1.2*acl[iter_sample_end + 1] and iter_sample_end - p_peaks[iter_peak] >= 0.04*f_samp and acl[iter_sample_end] < 0.7*max(window):
                found = True
                break
            else:
                if acl[iter_sample_end] <= min_value:
                    min_value = acl[iter_sample_end]
                    min_position = iter_sample_end
                iter_sample_end = iter_sample_end + 1

                if iter_sample_end >= len(acl) - 1:
                    iter_sample_end = min_position
                    found = True
                    break
        p_off.append(iter_sample_end)

    return p_on, p_off

In [None]:
def get_p_amplitudes(p_peaks, median_qrs_on, data):
    p_amplitudes = []
    median_qrs_on = median_qrs_on[1:len(median_qrs_on)]
    current_difference = [float("{:.4f}".format((data[peak] - data[start]))) for peak, start in zip(p_peaks, median_qrs_on)]
    p_amplitudes.extend(current_difference)

    return p_amplitudes

## Parametros

In [None]:
def get_rr_interval(qrs_peaks, f_samp):

    rr_intervals = []
    for iter_peak in range(1, len(qrs_peaks)):
        rr_intervals.append((qrs_peaks[iter_peak] - qrs_peaks[iter_peak-1]) / f_samp)

    return rr_intervals

In [None]:
def get_pr_interval(median_p_on, median_qrs_on, f_samp):
    pr_on = []
    pr_off = []
    pr_interval = []

    for iter_sample in range(len(median_p_on)):
        pr_on.append(median_p_on[iter_sample])
        pr_off.append(median_qrs_on[iter_sample + 1])
        pr_interval.append((pr_off[iter_sample] - pr_on[iter_sample])/f_samp)

    return pr_interval

In [None]:
def get_qt_interval(median_qrs_on, median_t_off, f_samp):
    qt_on = []
    qt_off = []
    qt_interval = []

    for iter_sample in range(len(median_t_off)):
        qt_on.append(median_qrs_on[iter_sample])
        qt_off.append(median_t_off[iter_sample])
        qt_interval.append((qt_off[iter_sample] - qt_on[iter_sample])/f_samp)

    return qt_interval

In [None]:
def get_qtc_interval(qt_interval, beat_interval_mean):
    qtc_interval = []

    for interval in qt_interval:
        try:
            qtc_interval.append(interval / (np.sqrt(beat_interval_mean)))
        except:
            qtc_interval.append(None)

    return qtc_interval

In [None]:
def get_st_deviation(median_qrs_on, median_qrs_off, data):
    st_deviation = []
    st_on = median_qrs_off[0 : len(median_qrs_off) - 1]
    st_off = median_qrs_on[1 : len(median_qrs_on)]

    current_amplitude = [float("{:.4f}".format((data[on] - data[off]))) for on, off in zip(st_on, st_off)]
    st_deviation.extend(current_amplitude)
    
    return st_deviation

In [None]:
def get_pr_segment(median_p_off, median_qrs_on, f_samp):
    qrs_on = []
    pr_off = []
    pr_segment = []

    for iter_sample in range(len(median_p_off)):
        qrs_on.append(median_p_off[iter_sample])
        pr_off.append(median_qrs_on[iter_sample])
        pr_segment.append((qrs_on[iter_sample] - pr_off[iter_sample])/f_samp)

    return pr_segment

In [None]:
def get_heart_rate(rr_interval):
    heart_rate = []

    for interval in rr_interval:
        heart_rate.append(60/interval)

    return heart_rate

# Função Principal

In [None]:
def get_ecg_info(data, f_samp):
    acl = []
    ecg_extracted_data = {}
    ecg_extracted_data[f'ecg_duration'] = 0
    ecg_extracted_data[f'number_of_beats'] = 0
    ecg_extracted_data[f'qrs_amplitude'] = []
    ecg_extracted_data[f't_amplitude'] = []
    ecg_extracted_data[f'p_amplitude'] = []
    ecg_extracted_data[f'qrs_peaks'] = []
    ecg_extracted_data[f'qrs_on'] = []
    ecg_extracted_data[f'qrs_off'] = []
    ecg_extracted_data[f't_peaks'] = []
    ecg_extracted_data[f't_on'] = []
    ecg_extracted_data[f't_off'] = []
    ecg_extracted_data[f'p_peaks'] = []
    ecg_extracted_data[f'p_on'] = []
    ecg_extracted_data[f'p_off'] = []
    ecg_extracted_data[f'rr_interval'] = []
    ecg_extracted_data[f'pr_interval'] = []
    ecg_extracted_data[f'qt_interval'] = []
    ecg_extracted_data[f'pr_segment'] = []
    ecg_extracted_data[f'st_deviation'] = []

    # Aplicação do Filtro de Linha de Base
    data = baseline_filter(data, f_samp, 8)
    ecg_extracted_data[f'filtered_data'] = data

    floating_window_size = round(0.04 * f_samp)

    for segment in range(0, math.ceil(len(data)/(10*f_samp))):
        # Segmento de 10 segundos
        start_segment = int(segment*(10*f_samp))
        end_segment = int((segment+1)*(10*f_samp))
        if end_segment <= len(data-1):
            data_segment = data[start_segment:end_segment]
        else:
            data_segment = len(data)
            data_segment = data[start_segment:]
        
        # Segmento com gap de 0.5 segundos
        start_gap_segment = start_segment - int(0.5*f_samp)
        end_gap_segment = end_segment + int(0.5*f_samp)

        if start_gap_segment < 0:
            start_gap_segment = 0

        if end_gap_segment <= len(data-1):
            data_gap_segment = data[start_gap_segment:end_gap_segment]
        else:
            end_gap_segment = len(data)
            data_gap_segment = data[start_gap_segment:]

        ecg_extracted_data[f'ecg_duration'] += round((len(data_segment)/f_samp), 4)
            
        qrs_peaks = get_qrs_peak(data_gap_segment, data_segment, start_gap_segment, end_gap_segment, start_segment, end_segment, f_samp)
        ecg_extracted_data[f'qrs_peaks'].extend([peak + (segment*10*f_samp) for peak in qrs_peaks])
        ecg_extracted_data[f'number_of_beats'] += len(qrs_peaks)

        # Intervalos do complexo QRS
        wavelet_transformed_data = wavelet_transform(data_gap_segment, 3)
        acl = get_acl(wavelet_transformed_data, floating_window_size)
        
        delay = floating_window_size
        qrs_on, qrs_off = get_qrs_interval(qrs_peaks, acl, f_samp, delay)
        ecg_extracted_data[f'qrs_on'].extend([on + (segment*10*f_samp) for on in qrs_on])
        ecg_extracted_data[f'qrs_off'].extend([off + (segment*10*f_samp) for off in qrs_off])

        # Amplitudes do complexo QRS
        qrs_amplitudes = get_qrs_amplitudes(qrs_peaks, qrs_on, data_gap_segment)
        ecg_extracted_data[f'qrs_amplitude'].extend(qrs_amplitudes)

        # Intervalo de Busca Onda T
        search_t_on, search_t_off = search_t_interval(qrs_on, qrs_off, delay)

        # # Picos Onda T
        t_peaks = get_t_peak(data_segment, acl, search_t_on, search_t_off, delay)
        ecg_extracted_data[f't_peaks'].extend([peak + (segment*10*f_samp) for peak in t_peaks])

        # Intervalos Onda T
        t_on, t_off = get_t_interval(t_peaks, qrs_off, acl, f_samp, delay)
        t_intervals = []
        t_intervals.append((t_on, t_off))
        ecg_extracted_data[f't_on'].extend([on + (segment*10*f_samp) for on in t_on])
        ecg_extracted_data[f't_off'].extend([off + (segment*10*f_samp) for off in t_off])

        # Amplitudes Onda T
        t_amplitudes = get_t_amplitudes(t_peaks, t_off, data)
        ecg_extracted_data[f't_amplitude'].extend(t_amplitudes)

        # Intervalo de Busca Onda P
        search_p_on, search_p_off = search_p_interval(qrs_on, qrs_off, delay)

        # Picos Onda P
        p_peaks = get_p_peak(data_segment, acl, search_p_on, search_p_off, delay)
        ecg_extracted_data[f'p_peaks'].extend([peak + (segment*10*f_samp) for peak in p_peaks])

        # Intervalos Onda P
        p_on, p_off = get_p_interval(p_peaks, qrs_off, acl, f_samp, delay)
        p_intervals = []
        p_intervals.append((p_on, p_off))
        ecg_extracted_data[f'p_on'].extend([on + (segment*10*f_samp) for on in p_on])
        ecg_extracted_data[f'p_off'].extend([off + (segment*10*f_samp) for off in p_off])

        # Amplitudes Onda P
        p_amplitudes = get_p_amplitudes(t_peaks, p_off, data_segment)
        ecg_extracted_data[f'p_amplitude'].extend(p_amplitudes)

    # Intervalo RR
    rr_interval = get_rr_interval(ecg_extracted_data[f'qrs_peaks'], f_samp)
    ecg_extracted_data[f'rr_interval'].extend(rr_interval)

    # Intervalo PR
    pr_interval = get_pr_interval(ecg_extracted_data[f'p_on'], ecg_extracted_data[f'qrs_on'], f_samp)
    ecg_extracted_data[f'pr_interval'].extend(pr_interval)

    # Intervalo QT
    qt_interval = get_qt_interval(ecg_extracted_data[f'qrs_on'], ecg_extracted_data[f't_off'], f_samp)
    ecg_extracted_data[f'qt_interval'].extend(qt_interval)

    # Segmento PR 
    pr_segment = get_pr_segment(ecg_extracted_data[f'p_off'], ecg_extracted_data[f'qrs_on'], f_samp)
    ecg_extracted_data[f'pr_segment'].extend(pr_segment)

    # Derivação ST
    st_deviation = get_st_deviation(ecg_extracted_data[f'qrs_on'], ecg_extracted_data[f'qrs_off'], data)
    ecg_extracted_data[f'st_deviation'].extend(st_deviation)
    
    # Média das Amplitudes do QRS
    scaler = Normalizer()
    standardized_qrs_amplitude = np.concatenate(scaler.fit_transform(np.array(ecg_extracted_data[f'qrs_amplitude']).reshape(-1,1)), axis=0)
    try:
        ecg_extracted_data[f'mean_qrs_amplitude'] = round(statistics.mean(standardized_qrs_amplitude), 4)
    except:
        ecg_extracted_data[f'mean_qrs_amplitude'] = None

    # Desvio Padrão das Amplitudes do QRS
    try:
        ecg_extracted_data[f'standard_deviation_qrs_amplitude'] = round(statistics.stdev(standardized_qrs_amplitude), 4)
    except:
        ecg_extracted_data[f'standard_deviation_qrs_amplitude'] = None

    # Média das Amplitudes da Onda T
    scaler = Normalizer()
    standardized_t_amplitude = np.concatenate(scaler.fit_transform(np.array(ecg_extracted_data[f't_amplitude']).reshape(-1,1)), axis=0)
    try:
        ecg_extracted_data[f'mean_t_amplitude'] = round(statistics.mean(standardized_t_amplitude), 4)
    except:
        ecg_extracted_data[f'mean_t_amplitude'] = None

    # Desvio Padrão das Amplitudes da Onda T
    try:
        ecg_extracted_data[f'standard_deviation_t_amplitude'] = round(statistics.stdev(standardized_t_amplitude), 4)
    except:
        ecg_extracted_data[f'standard_deviation_t_amplitude'] = None

    # Média das Amplitudes da Onda T
    scaler = Normalizer()
    standardized_p_amplitude = np.concatenate(scaler.fit_transform(np.array(ecg_extracted_data[f'p_amplitude']).reshape(-1,1)), axis=0)
    try:
        ecg_extracted_data[f'mean_p_amplitude'] = round(statistics.mean(standardized_p_amplitude), 4)
    except:
        ecg_extracted_data[f'mean_p_amplitude'] = None

    # Desvio Padrão das Amplitudes da Onda T
    try:
        ecg_extracted_data[f'standard_deviation_p_amplitude'] = round(statistics.stdev(standardized_p_amplitude), 4)
    except:
        ecg_extracted_data[f'standard_deviation_p_amplitude'] = None

    # Média dos Intervalos RR
    try:
        ecg_extracted_data[f'mean_rr_interval'] = round(statistics.mean(ecg_extracted_data[f'rr_interval']), 4)
    except:
        ecg_extracted_data[f'mean_rr_interval'] = None

    # Desvio Padrão dos Intervalos RR
    try:
        ecg_extracted_data[f'standard_deviation_rr_interval'] = round(statistics.stdev(ecg_extracted_data[f'rr_interval']), 4)
    except:
        ecg_extracted_data[f'standard_deviation_rr_interval'] = None

    # Média e Desvião Padrão Intervalo PR
    try:
        ecg_extracted_data[f'mean_pr_interval'] = round(statistics.mean(ecg_extracted_data[f'pr_interval']), 4)
    except:
        ecg_extracted_data[f'mean_pr_interval'] = None
    try:
        ecg_extracted_data[f'standard_deviation_pr_interval'] = round(statistics.stdev(ecg_extracted_data[f'pr_interval']), 4)
    except:
        ecg_extracted_data[f'standard_deviation_pr_interval'] = None

    # Média e Desvião Padrão Intervalo QT
    try:
        ecg_extracted_data[f'mean_qt_interval'] = round(statistics.mean(ecg_extracted_data[f'qt_interval']), 4)
    except:
        ecg_extracted_data[f'mean_qt_interval'] = None
    try:
        ecg_extracted_data[f'standard_deviation_qt_interval'] = round(statistics.stdev(ecg_extracted_data[f'qt_interval']), 4)
    except:
        ecg_extracted_data[f'standard_deviation_qt_interval'] = None

    # Média e Desvião Padrão Segmento PR
    try:
        ecg_extracted_data[f'mean_pr_segment'] = round(statistics.mean(ecg_extracted_data[f'pr_segment']), 4)
    except:
        ecg_extracted_data[f'mean_pr_segment'] = None
    try:
        ecg_extracted_data[f'standard_deviation_pr_segment'] = round(statistics.stdev(ecg_extracted_data[f'pr_segment']), 4)
    except:
        ecg_extracted_data[f'standard_deviation_pr_segment'] = None

    # Média da Derivação ST
    scaler = Normalizer()
    standardized_st_deviation = np.concatenate(scaler.fit_transform(np.array(ecg_extracted_data[f'st_deviation']).reshape(-1,1)), axis=0)
    try:
        ecg_extracted_data[f'mean_st_deviation'] = round(statistics.mean(standardized_st_deviation), 4)
    except:
        ecg_extracted_data[f'mean_st_deviation'] = None

    # Desvio Padrão da Derivação ST
    try:
        ecg_extracted_data[f'standard_deviation_st_deviation'] = round(statistics.stdev(standardized_st_deviation), 4)
    except:
        ecg_extracted_data[f'standard_deviation_st_deviation'] = None

    # Média e Desvio Padrão da Frequência Cardíaca
    heart_rate = get_heart_rate(rr_interval)
    try:
        ecg_extracted_data[f'mean_heart_rate'] = round(statistics.mean(heart_rate), 4)
    except:
        ecg_extracted_data[f'mean_heart_rate'] = None
    try:
        ecg_extracted_data[f'standard_deviation_heart_rate'] = round(statistics.stdev(heart_rate), 4)
    except:
        ecg_extracted_data[f'standard_deviation_heart_rate'] = None
    
    return ecg_extracted_data

In [None]:
def show_ecg(data, qrs_peaks, qrs_on, qrs_off, sample):
    fig = px.line(data)

    trace_qrs_peaks = go.Scatter(
        x = qrs_peaks, 
        y = [data[i] for i in qrs_peaks], 
        name = 'R Peaks', 
        mode = 'markers', 
        marker = dict(color = 'red')
    )

    trace_qrs_on = go.Scatter(
        x = qrs_on, 
        y = [data[i] for i in qrs_on], 
        name = 'QRS On', 
        mode = 'markers',
        marker = dict(color = 'red')
    )

    trace_qrs_off = go.Scatter(
        x = qrs_off, 
        y = [data[i] for i in qrs_off], 
        name = 'QRS Off', 
        mode = 'markers', 
        marker = dict(color = 'red')
    )

    fig.add_traces(trace_qrs_peaks)
    fig.add_traces(trace_qrs_on)
    fig.add_traces(trace_qrs_off)

    fig.update_layout(
        title = f'Amostra {sample + 1}', 
        xaxis_title = 'Índice', 
        yaxis_title = 'Amplitude'
    )

    fig.show()

In [None]:
def create_dict(start_dict):
    created_dict = {}
    created_dict['normal'] = []
    
    for key in start_dict.keys():
        created_dict[key] = []
    return created_dict

In [None]:
def append_sample_to_dict(all_samples_dict, ecg_extracted_data):
    for key in ecg_extracted_data.keys():
        try:
            all_samples_dict[key].append(ecg_extracted_data[key])
        except Exception as e:
            all_samples_dict[key] = []
            for i in range(0, len(all_samples_dict['ecg_duration_1']) - 1):
                all_samples_dict[key].append(None)
            all_samples_dict[key].append(ecg_extracted_data[key])
    
    return all_samples_dict

In [None]:
with open('RECORDS-normal.txt') as normal_records:
    all_normal_samples = normal_records.readlines()
    all_normal_samples = [record.split('/')[1] for record in all_normal_samples]
    all_normal_samples = [record.strip() for record in all_normal_samples]

# Execução e Transformação em CSV

In [None]:
all_samples_dict = {}
for iter_sample in range(len(ECG_data)):
    print(f'Extraindo os dados da amostra {iter_sample + 1}')
    current_data = ECG_data[iter_sample]
    current_sample = mat_files[iter_sample].split('.')[0]

    try:
        start_time = time.time()
        ecg_extracted_data = get_ecg_info(current_data, 300)
        
        if not all_samples_dict:
            all_samples_dict = create_dict(ecg_extracted_data)

        if current_sample in all_normal_samples:
            ecg_extracted_data['normal'] = 1
        else:
            ecg_extracted_data['normal'] = 0

        all_samples_dict = append_sample_to_dict(all_samples_dict, ecg_extracted_data)

        end_time = time.time()
        print(f'Dados da amostra {iter_sample + 1} extraídos com sucesso.\nTempo decorrido: {end_time - start_time}\n')

    except Exception as error:
            print(f'Não foi possível extrair os dados da amostra {iter_sample + 1}.\n')
            print('Erro: ', error)

In [None]:
all_samples_dataframe = pd.DataFrame.from_dict(all_samples_dict)
all_samples_dataframe = all_samples_dataframe.drop(columns=['qrs_amplitude', 'qrs_peaks', 'qrs_on', 'qrs_off', 'rr_interval', 'filtered_data', 't_peaks', 't_on', 't_off', 'p_peaks', 'p_on', 'p_off', 'pr_interval', 'qt_interval', 'pr_segment', 'st_deviation', 't_amplitude', 'p_amplitude'])
all_samples_dataframe.to_csv('sample_data.csv')