This notebook assumes access to denoised, detrended signals and calculates the appropriate parameters for the signal.
This notebook assumes access to detrended, denoised signals and calculates various parameters (and uncertainties) from these signals.The parameters estimated within this notebook are split into three different types: time-domain, frequency-domain and non-linear:

Time domain:
 - mean RR intervals
 - std RR intervals 
 - mean amplitude
 - RMSSD of differences between successive RR intervals 
 - pNN50 (%) NN50 divided by total number of RR (NN50 is the number of sucessive RR intervals that differ by more than 50ms) 
 
 
Frequency Domain:
- absolute power of LF (0.04 - 0.15 Hz) band
- absolute power of HF (0.15 to 0.4Hz) band
- LF/HF ratio
- Total power (0-0.4 Hz)

Non-linear:
- Fractal dimension of dynamic attractor of signal
- std of poincare
- Shannon entropy
- recurrence rate
- detrended fluctuation analysis
- correlation dimension
- sample entropy


In [1]:
%run "Data_PreProcessing.ipynb" #allowing access to the filtered database with preprocessed signals

100%|████████████████████████████████████████████████████████████████████████████████| 549/549 [00:15<00:00, 35.76it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 196/196 [00:03<00:00, 54.17it/s]


In [18]:
from scipy.signal import find_peaks #for peak finding
from scipy.stats import skew, kurtosis #for signal measures
from pyhrv.hrv import hrv #for some automatic feature extraction
import biosppy #for rpeak calculation
import pyhrv.time_domain as td #for calculation of heart rate
from scipy.signal import welch #for frequency band
from scipy.interpolate import interp1d #for interpolation of signal

In [3]:
def parameter_averages(parameter, health_state):
    """
    calculates healthy and unhealthy means and std of parameter for easy comparisson
    returns np.array containing: healthy mean, healthy std, unhealthy mean, unhealhty std
    """
    encoded_health_state = [True if label == 'Unhealthy' else False for label in health_state]
    
    unhealthy_param = parameter[encoded_health_state]
    healthy_param = parameter[~np.array(encoded_health_state)]
    
    unhealthy_param_av = np.mean(unhealthy_param)
    unhealthy_param_std = np.std(unhealthy_param)
    
    healthy_param_av = np.mean(healthy_param)
    healthy_param_std = np.std(healthy_param)
    
    return np.array([healthy_param_av, healthy_param_std, unhealthy_param_av, unhealthy_param_std])

def print_averages(parameter, parameter_name):
    health_state = allowed_patients.get_diagnoses()
    
    means = parameter_averages(parameter, health_state)
    
    print(f"Unhealthy {parameter_name}: mean:{means[2]}, std: {means[3]}")
    print(f"Healthy {parameter_name}: mean:{means[0]}, std:{means[1]}")
    return None

In [4]:
#outliers in rr removed automatically in biosppy
def outliers_indices_z_score(data, threshold=2):
    mean = np.mean(data)
    std = np.std(data)
    z_scores = [(x - mean) / std for x in data]
    indices = np.argwhere([abs(z)<threshold for z in z_scores])
    return indices

def get_peaks(signal, remove_outliers=False):
    peaks, _ = find_peaks(signal, distance = 600)
    peak_to_peak = peaks[1:-1]
    
    if remove_outliers:
        amp = signal[peak_to_peak]
        amp_outlier_indices = outliers_indices_z_score(amp)
        peak_to_peaks = peak_to_peak[amp_outlier_indices]
        
    else:
        peak_to_peaks = peak_to_peak
        
    return peak_to_peaks
        

def get_rri(signal, remove_outliers=False):
    t, filtered_signal, rpeaks = biosppy.signals.ecg.ecg(signal, show=False)[:3]
    
    # calculates rr intervals
    rri = np.diff(rpeaks)

    #removal of outliers
    if remove_outliers:
        rri_outlier_indices = outliers_indices_z_score(rri).reshape(-1)
        rr_intervals = rri[rri_outlier_indices]
        
    else:
        rr_intervals = rri
        
    return rr_intervals

In [6]:
#pre-calculation of rpeaks to save time 

# Get R-peaks series using biosppy
r_peak_list_all_channels = []
for i, signal in enumerate(denoised_signals):
    r_peak_list = []
    for j in range(0, len(signal)):
        t, filtered_signal, rpeaks = biosppy.signals.ecg.ecg(signal[j], show=False)[:3]
        r_peak_list.append(rpeaks)
    r_peak_list_all_channels.append(r_peak_list)

## Time Domain

### RR signal analysis

In [7]:
def RR_analysis(rpeaks):
    peak_distances = np.diff(rpeaks)
    
    # stastitical analysis on rr intervals
    mean_RR = np.mean(peak_distances)
    std_RR = np.std(peak_distances)

    #RMSSD
    # computes differences between successive RR intervals for RMSSD
    diff_RR_intervals = np.diff(peak_distances)
    
    RMSSD_RR = np.sqrt(np.mean(diff_RR_intervals**2))
    
    #pNN50
    # the number of successive RR intervals that differ by more than 50 ms 
    NN50 = np.sum(np.abs(diff_RR_intervals) > 50)
    # divide by total number of RR intervals
    pNN50 = (NN50 / len(peak_distances)) * 100
    
    return mean_RR, std_RR, RMSSD_RR, pNN50


In [11]:
rr_means = np.zeros(shape = (no_channels, no_patients))
rr_stds = np.zeros(shape = (no_channels, no_patients))
rr_RMSSD = np.zeros(shape = (no_channels, no_patients))
rr_pNN50s = np.zeros(shape = (no_channels, no_patients))
hrs = np.zeros(shape=(no_channels, no_patients))

for j in range(0, no_channels):
    for i, signal in enumerate(denoised_signals):
        peaks = r_peak_list_all_channels[i][j]

        rr_mean, rr_std, rr_rmssd, rr_pNN50 = RR_analysis(peaks)
    
        rr_means[j][i] = rr_mean
        rr_stds[j][i] = rr_std
        rr_RMSSD[j][i] = rr_rmssd
        rr_pNN50s[j][i] = rr_pNN50
    
        nn_hr = td.hr_parameters(rpeaks=peaks)['hr_mean']
        hrs[j][i] = nn_hr

### Physiological measures

In [19]:
def get_moments(signal):
    mean = np.mean(signal)
    std = np.std(signal)
    skew_ecg = skew(signal)
    kurtosis_ecg = kurtosis(signal)
    return mean, std, skew_ecg, kurtosis_ecg

In [20]:
means = np.zeros(shape = (no_channels, no_patients))
stds = np.zeros(shape = (no_channels, no_patients))
skews = np.zeros(shape = (no_channels, no_patients))
kurtosiss = np.zeros(shape = (no_channels, no_patients))


for j in range(0, no_channels):
    for i, signal in enumerate(denoised_signals):

        mean, std, ecg_skew, ecg_kurtosis = get_moments(signal[j])
    
        means[j][i] = mean
        stds[j][i] = std
        skews[j][i] = ecg_skew
        kurtosiss[j][i] = ecg_kurtosis


## Frequency Domain

### Power Bands

In [22]:
def power_bands(signal):
    fs = 1000 #Hz
    fs_interpolate = 3 #Hz
    
    # define frequency bands
    lf_band = (0.04, 0.15)
    hf_band = (0.15, 0.40)
    
    peaks, _ = find_peaks(signal, distance=600)
    rr_intervals = np.diff(peaks)/fs # RR intervals in seconds, fs is the sampling frequency of the original signal
    
    # time points of the RR intervals
    rr_times = np.cumsum(rr_intervals)
    rr_times = np.insert(rr_times, 0, 0)  # add time zero at the beginning

    # Interpolation
    interpolated_time = np.arange(0, rr_times[-2], 1/fs_interpolate)  # Interpolated time vector
    interpolated_rr = interp1d(rr_times[:-1], rr_intervals, kind='cubic')(interpolated_time)

    #welch spectrum
    f, psd = welch(interpolated_rr, fs=fs_interpolate, nperseg=128)
    
    # Integrate the power spectral density over the frequency bands
    lf_power = np.trapz(psd[(f >= lf_band[0]) & (f <= lf_band[1])], f[(f >= lf_band[0]) & (f <= lf_band[1])])
    hf_power = np.trapz(psd[(f >= hf_band[0]) & (f <= hf_band[1])], f[(f >= hf_band[0]) & (f <= hf_band[1])])
    
    return lf_power, hf_power, lf_power/hf_power
    

In [23]:
lfs = np.zeros(shape = (no_channels, no_patients))
hfs = np.zeros(shape = (no_channels, no_patients))
ratios = np.zeros(shape = (no_channels, no_patients))
for j in range(0, no_channels):
    for i, signal in enumerate(denoised_signals):
        lf, hf, ratio = power_bands(signal[j])
        lfs[j][i] = lf
        hfs[j][i] = hf
        ratios[j][i] = ratio

## Non-Linear

### Poincare

In [24]:
def calculate_poincare_sd(sig, remove_outliers=False):
    #get rr intervals
    rr_intervals = get_rri(sig)
    
    if remove_outliers:
        rr_intervals = rr_intervals[outliers_indices_z_score(rr_intervals)]
        
    #separating into subsequent coordinates for Poincaré plot
    rr_n = rr_intervals[:-1]
    rr_n1 = rr_intervals[1:]
    
    #calculating SD1, perpendicular to y=x
    diff_rr = np.array(rr_n) - np.array(rr_n1)
    sd1 = np.sqrt(np.var(diff_rr/np.sqrt(2)))
    
    # calculating SD2, along y=x
    sum_rr = rr_n + rr_n1
    sd2 = np.sqrt(np.var(sum_rr/np.sqrt(2)))
    
    # calculating ratio
    sd_ratio = sd2/sd1
   
    #counting intervals outside SD1 and SD2
    count_outside_sd1 = np.sum(np.abs(diff_rr / np.sqrt(2)) > sd1)
    count_outside_sd2 = np.sum(np.abs(sum_rr / np.sqrt(2)) > sd2)

    out = count_outside_sd1 + count_outside_sd2
    return sd1, sd2, sd_ratio, count_outside_sd1, count_outside_sd2, out

In [26]:
sd1s = np.zeros(shape = (no_channels, no_patients))
sd2s = np.zeros(shape = (no_channels, no_patients))
sd_ratios = np.zeros(shape=(no_channels, no_patients))
out_sd1s = np.zeros(shape=(no_channels, no_patients))
out_sd2s = np.zeros(shape=(no_channels,no_patients))
n_out = np.zeros(shape = (no_channels, no_patients))
for j in range(0, no_channels):
    for i, signal in enumerate(denoised_signals):
        sd1, sd2, sd_ratio, out_sd1, out_sd2, out = calculate_poincare_sd(signal[j], remove_outliers=True)
        sd1s[j][i] = sd1
        sd2s[j][i] = sd2
        sd_ratios[j][i] = sd_ratio
        out_sd1s[j][i] = out_sd1
        out_sd2s[j][i] = out_sd2
        n_out[j][i] = out

### Shannon Entropy

In [27]:
def calculate_shannon_entropy(rr_intervals, num_bins=10):
    # discretize RR intervals into bins and calculate probabilities
    hist, bin_edges = np.histogram(rr_intervals, bins=num_bins, density=True)
    
    probabilities = hist / np.sum(hist)
    
    # Calculate Shannon Entropy using equation
    shannon_entropy = -np.sum(probabilities * np.log2(probabilities + 1e-12))  # adding a small value to avoid log(0)
    
    return shannon_entropy

In [28]:
shannon_ens = np.zeros(shape = (no_channels, no_patients))
for j in range(0, no_channels):
    for i, signal in enumerate(denoised_signals):
        shannon_en = calculate_shannon_entropy(signal[j])
        shannon_ens[j][i] = shannon_en

## Creating Parameter Dictionary

In [29]:

#create dictionary with parameters
params = {}

#time components
params['rr_mean'] = rr_means
params['rr_std'] = rr_stds
params['RMSSD'] = rr_RMSSD
params['pNN50'] = rr_pNN50s
params['mean'] = means
params['std'] = stds
params['skews'] = skews
params['kurtosis'] = kurtosiss
#frequency components
params['hf'] = hfs
params['lf'] = lfs
params['power_ratio'] = ratios
#nonlinear components
params['shannon_en'] = shannon_ens
params['sd_ratio'] = sd_ratios




In [None]:
print(params['sd_ratio'])