# Import


Libraries

In [None]:
!pip install wfdb tqdm hrv-analysis
!pip install -Iv neurokit2==0.1.0

In [None]:
import neurokit2 as nk
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import hrvanalysis as hrvana
import wfdb
import keras
import scipy
import os
import math
from sklearn.preprocessing import MinMaxScaler
from wfdb import plot
from tqdm import tqdm

Mount GDrive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


List of filenames

In [None]:
datafolder = r"/content/drive/MyDrive/dbs/ltafdb/"
files = os.listdir(datafolder)
filenames = list(set([datafolder + f.split('.')[0] for f in files]))
filenames.sort()
filenames.remove(datafolder + '30')
filenames.remove(datafolder + '45')
filenames.remove(datafolder + '75') # оставим 1 запись для тестирования модели

In [None]:
filenames[-1]

# Functions

In [None]:
EXCLUDED_COLUMNS = ['HRV_ULF', 'HRV_VLF', 'HRV_CSI_Modified', 'HRV_S'] # exclude 4 signs
INCLUDED_COLUMNS = ['HRV_pNN50', 'HRV_CVI', 'HRV_HTI', 'HRV_CVNN', 'HRV_ApEn', 'HRV_SampEn', 
                    'HRV_SD1SD2', 'HRV_LFHF', 'HRV_IALS', 'HRV_PAS', 'HRV_PI', 'HRV_AI'] # get 12 signs in total

Funcs for slicing record into 5 min length samples

In [None]:
def slice_record_to_5min_intervals(filename, intervals_max_count = 180):
    """ Читает запись и делит её на последовательные пятиминутные отрезки,
        их максимальное количество = intervals_max_count
        Args:
            filename: string, полный путь до записи без расширения
            intervals_max_count: int, максимальное количество интервалов, 
                на которые делить запись. Если она короче - вернет меньше интервалов
        Returns:
            list of lists: внутренние списки - списки сэмплов, входящих в интервалы
    """
    print("Now slicing record with path : " + filename)
    annotation = wfdb.rdann(filename,"atr")
    fs = annotation.fs
    sample = annotation.sample
    record = wfdb.rdrecord(filename)    
    signal = record.__dict__["p_signal"][sample[0]:sample[-1], 0]
    
    # return slice_signal_to_5min_intervals(signal, fs)
    return slice_intersected_5min_intervals(signal, fs)

In [None]:
def slice_signal_to_5min_intervals(signal, sampling_freq, intervals_max_count = 180):
    """ Делит сигнал на последовательные пятиминутные отрезки,
        их максимальное количество = intervals_max_count
        Args: 
            signal: list-like, сигнал для деления
            sampling_freq: int, частота дискретизации(Гц)
            intervals_max_count: int,  максимальное количество интервалов, 
                на которые делить запись. Если она короче - вернет меньше интервалов
        Returns:
            list of lists: внутренние списки - списки сэмплов, входящих в интервалы
            максимальная длина внешнего списка = intervals_max_count
    """
    interval_length = 5 * 60 * sampling_freq
    signal_slices = []
    
    for i in range(0, len(signal), interval_length):
        if (len(signal_slices) >= intervals_max_count):
            break
        signal_slices.append(signal[i:i + interval_length]) 

    return signal_slices

In [None]:
def slice_intersected_5min_intervals(signal, sampling_freq, intervals_max_count = 250, shift_minutes = 1):
    """ Делит сигнал на пересекающиеся 5-минутные отрезки 
        Args: 
            signal: list-like, сигнал
            sampling_freq: int, частота дискретизации(Гц)
            intervals_max_count: int, максимальное количество интервалов на возвращение,
                может вернуть меньше указанного значения
            shift_minutes: int > 0, шаг, примеры:
                если 1, то возвратит интервалы 0-5, 1-6, 2-7 и т.д.
                если 2, то интервалы 0-5, 2-7, 4-9 и т.д.
        Returns:
            list of lists - внутренние списки - сэмплы на 5 минут,
            максимальное len() внешнего списка = intervals_max_count
    """
    interval_length = 5 * 60 * sampling_freq
    shift = shift_minutes * 60 * sampling_freq
    signal_slices = []
    
    for i in range(0, len(signal), shift):
        if (len(signal_slices) >= intervals_max_count):
            break
        current_slice = signal[i:i + interval_length]
        signal_slices.append(current_slice)

    return signal_slices

Funcs for getting peaks and HRV array

In [None]:
# https://stackoverflow.com/questions/6518811/interpolate-nan-values-in-a-numpy-array
def interpolate_nans(original):
    """ Линейно интерполирует NaN и бесконечные значения в коллекции
        Args:
            original: array-like, коллекция, в кот-ой нужно интерполировать значения
        Returns:
            тот же объект, но все NaN и бесконечные значения заменены на 
            линейно интерполированные значения
    """
    nans, x = ~np.isfinite(original), lambda z: z.nonzero()[0]
    original[nans]= np.interp(x(nans), x(~nans), original[~nans])
    
    return original

In [None]:
def wave_to_peaks(signal, fs):
    """ Декоратор (Wrapper function) neurokit's 'ecg_findpeaks' и 'ecg_clean'.
        Очищает сигнал и возвращает сэмплы, на которых найдены R-пики,
        как в документации neurokit по функциям.
        Args: 
            signal: list-like, сигнал
            fs: int, частота дискретизации(Гц)
        Returns:
            dict, доступ к пикам по ключу 'ECG_R_Peaks'
    """
    signal = hrvana.interpolate_nan_values(rr_intervals=signal,interpolation_method="linear")
    cleaned = nk.ecg_clean(signal, sampling_rate = fs)
    peaks = nk.ecg_findpeaks(cleaned, sampling_rate=fs, show=False)['ECG_R_Peaks']
    
    return peaks

In [None]:
def rpeaks_to_hrv(rri, fs, hrv_indices):
    """ Возвращает HRV значения из R-пиков
        Args:
            rri: dict, доступ к пикам по ключу 'ECG_R_Peaks'
            fs: int, частота дискретизации (Гц)
        Returns:
            1-мерный массив показателей
    """
    clean_rri = rri*(1000/fs)
    clean_rri = hrvana.remove_ectopic_beats(rr_intervals=clean_rri, method="malik")
    clean_rri = hrvana.interpolate_nan_values(rr_intervals=clean_rri,interpolation_method="linear")

    clean_rri = np.array(clean_rri)
    clean_rri = clean_rri[~np.isnan(clean_rri)]
    
    peaks_unec = np.zeros(len(clean_rri)+1)
    cv = 0

    for count, value in enumerate(clean_rri):
        cv += value
        peaks_unec[count+1] = cv

    peaks_unec *= (128.0/1000.0)   
    rpeaks = {'ECG_R_Peaks':peaks_unec}
    hrvdat = nk.hrv(rpeaks, sampling_rate=128, show=False)
    hrvdat = hrvdat.drop(columns = hrv_indices)
    
    return np.ravel(hrvdat.to_numpy())

In [None]:
def get_specified_hrv(rri, fs, hrv_indices):
    """ Возвращает указанные показатели HRV
        Args:
            rri: dict, доступ к пикам по ключу 'ECG_R_Peaks'
            fs: int, частота дискретизации (Гц)
            hrv_indices: list, список HRV-показателей
        Returns:
            1-мерный numpy массив значений указанных показателей
    """
    clean_rri = rri*(1000/fs)
    clean_rri = hrvana.remove_ectopic_beats(rr_intervals=clean_rri, method="malik")
    clean_rri = hrvana.interpolate_nan_values(rr_intervals=clean_rri,interpolation_method="linear")

    clean_rri = np.array(clean_rri)
    clean_rri = clean_rri[~np.isnan(clean_rri)]
    
    peaks_unec = np.zeros(len(clean_rri)+1)
    cv = 0

    for count, value in enumerate(clean_rri):
        cv += value
        peaks_unec[count+1] = cv

    peaks_unec *= (128.0/1000.0)   
    rpeaks = {'ECG_R_Peaks':peaks_unec}
    hrv_full = nk.hrv(rpeaks, sampling_rate=128, show=False)
    target_hrvs = hrv_full[hrv_indices]
    
    return np.ravel(target_hrvs.to_numpy())

Funcs for normalizing the HRV values

In [None]:
def flatten_1stlevel(list_of_lists_of_lists):
    """ >>> flatten_1stlevel([[[1, 2], [3, 4]],[[5, 6], [7, 8]],[[9, 0]]])
        [[1, 2], [3, 4], [5, 6], [7, 8], [9, 0]]
        >>> flatten_1stlevel([[1, 2], [[3,4], [[5, 6], 7]], []])
        [1, 2, [3, 4], [[5, 6], 7]]
        >>> flatten_1stlevel([[1, 2], [3, 4]])
        [1, 2, 3, 4]
        >>> flatten_1stlevel([1, 2, 3])
        [1, 2, 3]
    """
    flat_1stlevel = []
    
    for list_of_lists in list_of_lists_of_lists:
        try:
            flat_1stlevel.extend(list_of_lists)
        except TypeError:
            flat_1stlevel.append(list_of_lists)

    return flat_1stlevel

In [None]:
def standardize_hrv_values(hrv_arrays, return_scaler=False):
    """ Нормализует значения показателей с помощью MinMaxScaler
        Args:
            hrv_arrays: list of arrays, список массивов показателей по пациенту, структура:
            [[[1, 2], [3, 4]],[[5, 6], [7, 8]],[[9, 0], [10, 11]]]
            return_scaler: bool, true - возвращает обученный MinMaxScaler
        Returns:
            (всегда) list of numpy arrays - стандартизированных значений,
            if return_scaler - также возвратит MinMaxScaler

    """
    scaler = MinMaxScaler()
    person_hrvs_count = len(hrv_arrays) # количество пациентов
    all_hrvs = flatten_1stlevel(hrv_arrays)
    scaler = scaler.fit(all_hrvs)
    std_hrvs = scaler.transform(all_hrvs)
    
    if not return_scaler:
        return np.split(std_hrvs, person_hrvs_count)
    
    return np.split(std_hrvs, person_hrvs_count), scaler

# Create a dataset without EXCLUDED_COLUMNS (in total: 48 signs)

In [None]:
dataset = []

for filename in tqdm(filenames):
    annotation = wfdb.rdann(filename,"atr")
    fs = annotation.fs
    interv = slice_record_to_5min_intervals(filename)
    hrvs = [] 

    for five_min in interv:
        rpeaks = wave_to_peaks(five_min, fs)
        rri = np.diff(rpeaks)
        hrv = rpeaks_to_hrv(rri, fs, EXCLUDED_COLUMNS)
        hrvs.append(hrv)
    
    hrvs = [hrvs]
    dataset.append(hrvs)

# Create a dataset with INCLUDED_COLUMNS (in total: 12 signs)

In [None]:
dataset = []

for filename in tqdm(filenames):
    annotation = wfdb.rdann(filename,"atr")
    fs = annotation.fs
    interv = slice_record_to_5min_intervals(filename)
    hrvs = [] 

    for five_min in interv:
        rpeaks = wave_to_peaks(five_min, fs)
        rri = np.diff(rpeaks)
        hrv = get_specified_hrv(rri, fs, INCLUDED_COLUMNS)
        hrvs.append(hrv)
    
    hrvs = [hrvs]
    dataset.append(hrvs)

# Normalize the HRV values

In [None]:
hrv_by_person = flatten_1stlevel(dataset)
normalized_hrv = standardize_hrv_values(hrv_by_person)
normalized_arr = np.array(normalized_hrv)
print(normalized_arr.shape, '\n', normalized_arr[0])

# Save a dataset

In [None]:
np.save('', normalized_arr)

# Testing

Testing of "slice_signal_to_5min_intervals" func

In [None]:
dummy_signal = [i for i in range(0, 100000)]
dummy_fs = 3
dummy_slices = slice_signal_to_5min_intervals(dummy_signal, dummy_fs)

for s in dummy_slices:
    print(s[:3],'...',s[-3:])

Testing of "create_dataset_by_person" func

In [None]:
dummy_data = np.arange(0, 800).reshape(10, 8, 10)
dummy_ds = create_dataset_by_person(dummy_data[0])

for i in dummy_ds.as_numpy_iterator():
    print(i)

for x, y in create_dataset_by_person(test_arrs[0]):
    print('x = ', x.numpy())
    print('y = ', y.numpy())

Testing of "standardize_hrv_values" func

In [None]:
import random


people = 5
arrs_count = 16
arr_length = 5
sample_pop = range(100)
test_arrs = []

for ps in range(people):
    this_person = []
    
    for ar in range(arrs_count):
        this_person.append(random.sample(sample_pop, arr_length))
    test_arrs.append(this_person)
  
print(test_arrs[0])
std_dummy_data = standardize_hrv_values(test_arrs)
print(std_dummy_data[0])