In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.widgets import Slider
from scipy.signal import medfilt, butter, filtfilt, find_peaks
import scipy.io

In [None]:
# params
fs = 250
window_len_sec = 10
plot_window_sec = 60

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!ls /content/drive/MyDrive/fbmi/ASII/

Mounted at /content/drive
ASII_uloha1_stresove_EKG.ipynb	ecg.mat  EEG_MonikaPulcova.ipynb  Untitled0.ipynb
data_FHR.mat			eeg.mat  fetal_heart_rate.ipynb


In [None]:
mat_data = scipy.io.loadmat("/content/drive/MyDrive/fbmi/ASII/ecg.mat")
print("Klíče v souboru:", mat_data.keys())

signal = mat_data["ecg118e00"].squeeze()
signal

Klíče v souboru: dict_keys(['__header__', '__version__', '__globals__', 'ecg118e00'])


array([[ -5.955,  -5.635],
       [ -5.955,  -5.635],
       [ -5.955,  -5.635],
       ...,
       [  1.12 , -11.36 ],
       [  1.175, -11.4  ],
       [ -3.95 , -16.515]])

In [None]:
baseline = medfilt(signal, kernel_size=201)
signal_detrended = signal - baseline

  baseline = medfilt(signal, kernel_size=201)


In [None]:
def bandstop_filter(data, lowcut, highcut, fs, order=2):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='bandstop')
    # Apply filter to each column of the data separately
    y = np.zeros_like(data) # Initialize output array
    for i in range(data.shape[1]): # Loop through columns
        y[:,i] = filtfilt(b, a, data[:,i]) # Filter each column
    return y
signal_filtered = bandstop_filter(signal_detrended, 48, 52, fs)

In [None]:
min_distance = int(0.8 * fs)
peak_height = 0.5 * np.max(signal_filtered)
peaks, properties = find_peaks(signal_filtered, height=peak_height, distance=min_distance)


rr_intervals = np.diff(peaks) / fs

def compute_hrv(rr_ints):
    if len(rr_ints) < 2:
        return None
    hr = 60 / np.mean(rr_ints)
    sdnn = np.std(rr_ints) 
    rmssd = np.sqrt(np.mean(np.diff(rr_ints)**2))
    diff_rr = np.abs(np.diff(rr_ints))
    nn50 = np.sum(diff_rr > 0.05)
    pnn50 = (nn50 / len(diff_rr)) * 100 if len(diff_rr) > 0 else 0
    return {"HR": hr, "SDNN": sdnn, "RMSSD": rmssd, "NN50": nn50, "pNN50": pnn50}

num_samples_window = window_len_sec * fs
hrv_results = []
for start in range(0, len(signal_filtered), num_samples_window):
    end = start + num_samples_window
    window_peaks = peaks[(peaks >= start) & (peaks < end)]
    if len(window_peaks) > 1:
        rr_window = np.diff(window_peaks) / fs
        hrv = compute_hrv(rr_window)
        hrv_results.append((start/fs, hrv))
    else:
        hrv_results.append((start/fs, None))

ValueError: `x` must be a 1-D array

In [None]:
fig, ax = plt.subplots(figsize=(12, 6))
plt.subplots_adjust(bottom=0.25)
time = np.arange(len(signal_filtered)) / fs

def update_plot(start_time):
    start_idx = int(start_time * fs)
    end_idx = start_idx + int(plot_window_sec * fs)
    ax.clear()
    ax.plot(time[start_idx:end_idx], signal_filtered[start_idx:end_idx], label="Filtered ECG")

    window_peaks = peaks[(peaks >= start_idx) & (peaks < end_idx)]
    ax.plot(time[window_peaks], signal_filtered[window_peaks], "ro", label="R peaks")

    for window_start_sec, hrv in hrv_results:
        if start_time <= window_start_sec < start_time + plot_window_sec:
            if hrv is not None:
                pos = window_start_sec + window_len_sec/2
                txt = f"HR: {hrv['HR']:.1f} BPM\nSDNN: {hrv['SDNN']:.3f}s\nRMSSD: {hrv['RMSSD']:.3f}s\npNN50: {hrv['pNN50']:.1f}%"
                ax.text(pos, np.max(signal_filtered[start_idx:end_idx])*0.8, txt,
                        fontsize=8, bbox=dict(facecolor='yellow', alpha=0.3))

    ax.set_xlabel("Time (s)")
    ax.set_ylabel("Amplitude")
    ax.set_title("60s window of ECG signal with detected R waves and HRV annotations")
    ax.legend()
    ax.set_xlim(time[start_idx], time[end_idx])
    plt.draw()

init_start = 0
update_plot(init_start)

ax_slider = plt.axes([0.15, 0.1, 0.7, 0.03])
slider = Slider(ax_slider, 'Start time (s)', 0, (len(signal_filtered)/fs - plot_window_sec),
                valinit=init_start, valstep=1)

def slider_update(val):
    update_plot(val)

slider.on_changed(slider_update)

plt.show()