In [1]:
import os
import struct
import numpy as np
import plotly.graph_objects as go
from scipy.signal import find_peaks

In [22]:
def parse_cor_ekg(data):
    pos = 0
    version = struct.unpack_from('B', data, pos)[0]
    pos += 1
    lead_number = struct.unpack_from('B', data, pos)[0]
    pos += 1
    # >H because it's big-endian
    frequency = struct.unpack_from('>H', data, pos)[0]
    pos += 2
    
    signal_values = []
    
    while pos < len(data):
        value = struct.unpack_from('>h', data, pos)[0]
        if value == 0xFFFF:
            break
        signal_values.append(-1 * value)
        pos += 2
    
    return {
        'version': version,
        'lead_number': lead_number,
        'frequency': frequency,
        'signal_values': signal_values
    }

In [23]:
def calculate_heart_rate(data, window_size=10):
    # this function is adjustable, hyperparameters are
    # window_size and (height, distance) in find_peaks function
    # if we had data labeled with correspoding heart rates, we could tune them
    signal_values = data["signal_values"]
    frequency = data["frequency"]

    smoothed_signal = np.convolve(signal_values, np.ones(window_size)/window_size, mode='valid')

    peaks, _ = find_peaks(smoothed_signal, height=np.mean(smoothed_signal) + np.std(smoothed_signal), distance=frequency/2)
    rr_intervals = np.diff(peaks) / frequency # in seconds
    
    average_rr_interval = np.mean(rr_intervals)
    heart_rate = 60 / average_rr_interval if average_rr_interval > 0 else 0

    return np.round(heart_rate, 1)


In [13]:
def make_ekg_plot(signal_values, frequency, smoothing_window=10):
    signal_values = np.array(signal_values)

    time = np.arange(len(signal_values)) / frequency
    signal_length = len(signal_values)

    
    smoothed_signal = np.convolve(signal_values, np.ones(smoothing_window)/smoothing_window, mode='valid')
    peaks, _ = find_peaks(smoothed_signal, height=np.mean(smoothed_signal) + np.std(smoothed_signal), distance=frequency/2)

    peak_times = peaks / frequency
    peak_values = signal_values[peaks]

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=time, y=smoothed_signal, mode='lines', name='EKG Signal'))
    fig.add_trace(go.Scatter(
        x=peak_times,
        y=peak_values,
        mode='markers',
        name='QRS',
        marker=dict(color='red', size=10)
    ))

    fig.update_layout(
        title='EKG Signal',
        xaxis_title='Time',
        yaxis_title='Amplitude',
        xaxis=dict(
            rangeslider=dict(visible=True),
            title='Time',
            range=[0, signal_length]
        ),
        yaxis=dict(
            title='Amplitude'
        ),
        #ßtemplate='plotly_dark',
        width=signal_length
    )

    fig.show()


In [24]:
# calculating the heart rate for each 

file_names = [item for item in os.listdir("EKG") if item != ".gitignore"]

for fn in file_names:
    with open("EKG/{}".format(fn), 'rb') as f:
        ekg_data = f.read()
    parsed_data = parse_cor_ekg(ekg_data)
    
    print(fn[:3], calculate_heart_rate(parsed_data, 10))


070 105.8
b68 108.0
e22 99.7
b31 108.1
7a7 104.0
399 107.8


In [27]:
# adhoc
fnam = "18d07f09-abc0-4686-af19-f60ffb431762-HdekXwu7En3Mb4ohy4.cor-ekg"
with open("EKG/{}".format(fnam), 'rb') as f:
    ah_ekg_data = f.read()
ah_parsed_data = parse_cor_ekg(ah_ekg_data)
    
print(calculate_heart_rate(ah_parsed_data, 10))

88.1


In [None]:
# the next cell is just for visual representation of the data

In [25]:
idx = 0
with open("EKG/{}".format(file_names[idx]), 'rb') as f:
    ekg_data_idx = f.read()
parsed_data_idx = parse_cor_ekg(ekg_data_idx)
signal_values_idx = parsed_data_idx["signal_values"]
frequency_idx = parsed_data_idx["frequency"]

make_ekg_plot(signal_values_idx, frequency_idx, 10)
