In [1]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go

In [2]:
df = pd.read_csv("data_ecg.csv", delim_whitespace=True)
time_index = df[df.columns[0]]
data = df[df.columns[1]]
n_data = len(df.index)
data = np.array(data - 1.25)
fs = 1000
sampling_period = 1 / fs

In [3]:
fig = go.Figure(data=go.Scatter(x=time_index, y=data, mode="lines"))
fig.update_layout(
    title="Raw Signal",
    xaxis_title="Time (ms)",
    yaxis_title="Amplitude (mV)",
    width=1000,
    height=500,
    xaxis=dict(showline=True, showgrid=True),
    yaxis=dict(showline=True, showgrid=True),
)
fig.show()

In [4]:
def moving_average(signal, window):
    n_data = len(signal)
    mav_1 = np.zeros(n_data * 2)
    mav_2 = np.zeros(n_data * 2)

    for i in range(window):
        signal[-i] = signal[0]

    for i in range(n_data):
        count = 0
        for j in range(window):
            count += signal[i - j]
        mav_1[i] = count / window

    for i in range(n_data, n_data + window):
        mav_1[i] = mav_1[n_data]

    for i in range(n_data, 0, -1):
        count = 0
        for j in range(window):
            count += mav_1[i + j]
        mav_2[i] = count / window

    mav_2 = np.array(mav_2[:n_data])
    return mav_2

In [5]:
mav_result = moving_average(data, 18)

fig = go.Figure()
fig.add_trace(go.Scatter(x=time_index, y=data, mode="lines", name="Original Signal"))
fig.add_trace(go.Scatter(x=time_index, y=mav_result, mode="lines", name="After MAV"))
fig.update_layout(
    title="Moving Average",
    xaxis_title="Time (s)",
    yaxis_title="Amplitude (mV)",
    width=1000,
    height=500,
    xaxis=dict(showline=True, showgrid=True),
    yaxis=dict(showline=True, showgrid=True),
)
fig.show()

In [None]:
def dft(signal):
    n_data = len(signal)
    # initial array for DFT
    X_real = np.zeros(n_data * 2)
    X_imaj = np.zeros(n_data * 2)
    MagDFT = np.zeros(n_data * 2)

    # DFT
    for k in range(n_data):
        for n in range(n_data):
            X_real[k] += (signal[n]) * np.cos(2 * np.pi * k * n / n_data)
            X_imaj[k] += (signal[n]) * np.sin(2 * np.pi * k * n / n_data)

    for k in range(n_data):
        MagDFT[k] = np.sqrt(np.square(X_real[k]) + np.square(X_imaj[k]))

    return MagDFT

In [21]:
raw_data_dft = dft(data)
n = np.arange(0, n_data, 1, dtype=int)
k = np.arange(0, n_data, 1, dtype=int)

fig = go.Figure(data=go.Scatter(x=k * fs / n_data, y=raw_data_dft[k], mode="lines"))
fig.update_layout(
    title="DFT Original Signal",
    xaxis_title="Frequency (Hz)",
    yaxis_title="Magnitude",
    width=1000,
    height=500,
    xaxis=dict(showline=True, showgrid=True),
    yaxis=dict(showline=True, showgrid=True),
)
fig.show()

In [6]:
p_waves = []
qrs_complex = []
t_waves = []
for i in range(n_data):
    if i > 1370 and i <= 1500:
        p_waves.append(mav_result[i])
    else:
        p_waves.append(0)
    if i > 1500 and i <= 1660:
        qrs_complex.append(mav_result[i])
    else:
        qrs_complex.append(0)
    if i > 1660 and i <= 1950:
        t_waves.append(mav_result[i])
    else:
        t_waves.append(0)

fig = go.Figure()
fig.add_trace(go.Scatter(x=time_index, y=p_waves, mode="lines", name="P Waves"))
fig.add_trace(go.Scatter(x=time_index, y=qrs_complex, mode="lines", name="QRS Complex"))
fig.add_trace(go.Scatter(x=time_index, y=t_waves, mode="lines", name="T Waves"))
fig.update_layout(
    title="Segmentation",
    xaxis_title="Time (ms)",
    yaxis_title="Amplitude (mV)",
    width=1000,
    height=500,
    xaxis=dict(showline=True, showgrid=True),
    yaxis=dict(showline=True, showgrid=True),
)
fig.show()

In [9]:
dft_p = dft(p_waves)
dft_qrs = dft(qrs_complex)
dft_t = dft(t_waves)

n = np.arange(0, n_data, 1, dtype=int)
k = np.arange(0, n_data, 1, dtype=int)

In [23]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=k * fs / n_data, y=dft_p, mode="lines", name="DFT P Waves"))
fig.add_trace(
    go.Scatter(x=k * fs / n_data, y=dft_qrs, mode="lines", name="DFT QRS Complex")
)
fig.add_trace(go.Scatter(x=k * fs / n_data, y=dft_t, mode="lines", name="DFT T Waves"))
fig.update_layout(
    title="Segmentation DFT",
    xaxis_title="Freq (Hz)",
    yaxis_title="Magnitude",
    width=1000,
    height=500,
    xaxis=dict(showline=True, showgrid=True),
    yaxis=dict(showline=True, showgrid=True),
)
fig.show()

In [7]:
def butterworth_lpf(signal, freq_cutoff, orde):
    omega_c = 2 * np.pi * freq_cutoff
    sampling_period_squared = np.square(sampling_period)
    omega_c_squared = np.square(omega_c)
    y = np.zeros(len(signal))

    if orde == 1:
        for n in range(len(signal)):
            if n == 0:
                y[n] = omega_c * signal[n] / ((2 / sampling_period) + omega_c)
            else:
                y[n] = (
                    ((2 / sampling_period) - omega_c) * y[n - 1]
                    + omega_c * signal[n]
                    + omega_c * signal[n - 1]
                ) / ((2 / sampling_period) + omega_c)
    elif orde == 2:
        for n in range(len(signal)):
            if n < 2:
                y[n] = (omega_c_squared * signal[n]) / (
                    (4 / sampling_period_squared)
                    + (2 * np.sqrt(2) * omega_c / sampling_period)
                    + omega_c_squared
                )
            else:
                y[n] = (
                    ((8 / sampling_period_squared) - 2 * omega_c_squared) * y[n - 1]
                    - (
                        (4 / sampling_period_squared)
                        - (2 * np.sqrt(2) * omega_c / sampling_period)
                        + omega_c_squared
                    )
                    * y[n - 2]
                    + omega_c_squared * signal[n]
                    + 2 * omega_c_squared * signal[n - 1]
                    + omega_c_squared * signal[n - 2]
                ) / (
                    (4 / sampling_period_squared)
                    + (2 * np.sqrt(2) * omega_c / sampling_period)
                    + omega_c_squared
                )
    return y

In [8]:
lpf_result = butterworth_lpf(mav_result, 20, 2)

fig = go.Figure()
fig.add_trace(go.Scatter(x=time_index, y=mav_result, mode="lines", name="Before LPF"))
fig.add_trace(go.Scatter(x=time_index, y=lpf_result, mode="lines", name="After LPF"))
fig.update_layout(
    title="Low Pass Filtering",
    xaxis_title="Time (ms)",
    yaxis_title="Amplitude (mV)",
    width=1000,
    height=500,
    xaxis=dict(showline=True, showgrid=True),
    yaxis=dict(showline=True, showgrid=True),
)
fig.show()

In [None]:
def butterworth_hpf(signal, freq_cutoff, orde):
    omega_c = 2 * np.pi * freq_cutoff
    sampling_period_squared = np.square(sampling_period)
    omega_c_squared = np.square(omega_c)
    y = np.zeros(len(signal))

    if orde == 1:
        for n in range(len(signal)):
            if n == 0:
                y[n] = ((2 / sampling_period) * signal[n]) / (
                    (2 / sampling_period) + omega_c
                )
            else:
                y[n] = (
                    ((2 / sampling_period) * signal[n])
                    - ((2 / sampling_period) * signal[n - 1])
                    - ((omega_c - (2 / sampling_period)) * y[n - 1])
                ) / ((2 / sampling_period) + omega_c)
    elif orde == 2:
        for n in range(len(signal)):
            if n < 2:
                y[n] = (
                    (4 / sampling_period_squared)
                    * signal[n]
                    / (
                        omega_c_squared
                        + (2 * np.sqrt(2) * omega_c / sampling_period)
                        + (4 / sampling_period_squared)
                    )
                )
            else:
                y[n] = (
                    (4 / sampling_period_squared) * signal[n]
                    - (8 / sampling_period_squared) * signal[n - 1]
                    + (4 / sampling_period_squared) * signal[n - 2]
                    - (2 * omega_c - (8 / sampling_period_squared)) * y[n - 1]
                    - (
                        omega_c_squared
                        - (2 * np.sqrt(2) * omega_c / sampling_period)
                        + (4 / sampling_period_squared)
                    )
                    * y[n - 2]
                ) / (
                    omega_c_squared
                    + 2 * np.sqrt(2) * omega_c / sampling_period
                    + (4 / sampling_period_squared)
                )
    return y

In [10]:
hpf_result = butterworth_hpf(lpf_result, 4, 2)

fig = go.Figure()
fig.add_trace(go.Scatter(x=time_index, y=lpf_result, mode="lines", name="Before HPF"))
fig.add_trace(go.Scatter(x=time_index, y=hpf_result, mode="lines", name="After HPF"))
fig.update_layout(
    title="High Pass Filtering",
    xaxis_title="Time (ms)",
    yaxis_title="Amplitude (mV)",
    width=1000,
    height=500,
    xaxis=dict(showline=True, showgrid=True),
    yaxis=dict(showline=True, showgrid=True),
)
fig.show()

In [15]:
filtered_signal_dft = dft(hpf_result)

n = np.arange(0, n_data, 1, dtype=int)
k = np.arange(0, n_data, 1, dtype=int)

fig = go.Figure(
    data=go.Scatter(x=k * fs / n_data, y=filtered_signal_dft[k], mode="lines")
)
fig.update_layout(
    title="DFT Filtered Signal",
    xaxis_title="Time (ms)",
    yaxis_title="Amplitude (mV)",
    width=1000,
    height=500,
    xaxis=dict(showline=True, showgrid=True),
    yaxis=dict(showline=True, showgrid=True),
)
fig.show()

In [None]:
# SQUARING
squared_signal = []
for i in range(len(hpf_result)):
    squared_signal.append(np.square(hpf_result[i]))

mav_squared = moving_average(squared_signal, 25)


fig = go.Figure()
fig.add_trace(go.Scatter(x=time_index, y=squared_signal, mode="lines", name="Squared"))
fig.add_trace(go.Scatter(x=time_index, y=mav_squared, mode="lines", name="MAV"))
fig.update_layout(
    title="Squaring and MAV",
    xaxis_title="Time (ms)",
    yaxis_title="Amplitude (mV)",
    width=1000,
    height=500,
    xaxis=dict(showline=True, showgrid=True),
    yaxis=dict(showline=True, showgrid=True),
)
fig.show()

In [12]:
# THRESHOLDING
print(max(mav_squared))

threshold_result = []

for i in range(len(mav_squared)):

    if mav_squared[i] > (0.1 * max(mav_squared)):

        threshold_result.append(np.max(mav_result))

    else:

        threshold_result.append(0)


fig = go.Figure()

fig.add_trace(go.Scatter(x=time_index, y=mav_result, mode="lines", name="Signal"))

fig.add_trace(

    go.Scatter(x=time_index, y=threshold_result, mode="lines", name="Threshold")
)

fig.update_layout(

    title="Thresholding Process",

    xaxis_title="Time (ms)",

    yaxis_title="Amplitude (mV)",

    width=800,

    height=500,

    xaxis=dict(showline=True, showgrid=True),

    yaxis=dict(showline=True, showgrid=True),
)

fig.show()

0.6235567203384274


In [18]:
peaks = 0
for i in range(len(threshold_result)):
    if threshold_result[i] > threshold_result[i - 1]:
        peaks += 1

time = n_data / fs
heart_rate = int(60 * peaks / time)
print(f"Heart Rate = {heart_rate} bpm")

Heart Rate = 84 bpm
