In [80]:
import json
import numpy as np
import pandas as pd
import neurokit2 as nk
import plotly.graph_objects as go

from arrhytmia import Arrhytmia
from mi import MI
from tools import Tools

# Create a object instance
tool = Tools()
mi = MI()
arr = Arrhytmia()

In [98]:
# filename = "ekg_data_converted.json"
# filename = "ekg.json"
# filename = "ekg-6lead.json"
filename = "ekg_2.json"
with open(filename, "r") as file:
    data = json.load(file)

lead_names = ["I", "V1", "V2", "V4", "V6", "II"]
record_data = []

print("Save channel:")
for record in data:
    print(f"{record["channel"]} as Lead {lead_names[record["channel"] - 1]}")
    record_data.append(record["data"])



Save channel:
1 as Lead I
2 as Lead V1
3 as Lead V2
4 as Lead V4
5 as Lead V6
6 as Lead II


In [99]:
for index, item in enumerate(record_data):
    print(f"Lead: {lead_names[index]}, Samples: {len(item)}")

Lead: I, Samples: 8000
Lead: V1, Samples: 95
Lead: V2, Samples: 5771
Lead: V4, Samples: 8000
Lead: V6, Samples: 8000
Lead: II, Samples: 8000


In [100]:
def normalize_signals(record_data, sampling_rate=320):
    """
    Normalize list of ECG signals (list of lists) so they all have the same length.
    Truncate longer signals, pad shorter ones.
    
    Args:
        record_data (list of list/array): ECG recordings for each channel
        sampling_rate (int): sampling frequency in Hz (default=320)
        
    Returns:
        np.ndarray: signals with shape (n_channels, n_samples)
    """
    # 1. Find the smallest length
    lengths = [len(rec) for rec in record_data]
    min_length = min(lengths)

    # 2. Define reference length
    reference_length = max(min_length, 10 * sampling_rate)

    normalized = []
    for rec in record_data:
        if len(rec) >= reference_length:
            # Truncate
            rec_fixed = rec[:reference_length]
        else:
            # Pad with zeros at the end
            pad_len = reference_length - len(rec)
            rec_fixed = np.concatenate([rec, np.zeros(pad_len)])
        normalized.append(rec_fixed)
    
    # Convert to numpy array
    return np.array(normalized, dtype=float)


signals = normalize_signals(record_data, sampling_rate=320)

# NeuroKit expects shape (samples, channels), so transpose
signals = signals.T
print(signals.shape)


(3200, 6)


In [101]:
# convert the record data into numpy.ndarray type
# record_signals = np.array(record_data, dtype=float)

# transpose the matrix to match with neurokit library
# signals = record_signals.T

In [108]:
# Set the sampling rate 
sampling_rate = 320
# n_leads = signals.shape[1]
n_leads = len(record_data)

# Create a dummy patient data
patient_data = pd.DataFrame([{
    'patient_id': 112,
    'age': 27,
    'sex': "Male",
    'recording_date': '2025-08-13 09:17:34'
}])


### Arrhytmia Ananlysis

In [109]:
signal = record_data[0]
ecg_signals, info = nk.ecg_process(signal, sampling_rate=sampling_rate)
interval = arr.calculate_interval(ecg_signals, info, sampling_rate, patient_data["sex"].iloc[0])
interval

Unnamed: 0,Parameter,Value,Unit,Status
0,Average Heart Rate (HR),83.03,bpm,Normal
1,PR Interval,196.98,ms,Normal
2,RR Interval,730.52,ms,Normal
3,QRS Duration,-1879.17,ms,Normal
4,QT Interval,-1676.35,ms,Abnormal
5,QTc (Corrected QT),-1972.83,ms,Normal


In [110]:
rythm_type = arr.rhythm_analysis(signal, sampling_rate)
rythm_type

'Normal Sinus'

In [111]:
# take lead I (signals[:, 0]) as a sample
ecg_signal = signals[:, 0]

ecg_cleaned = nk.ecg_clean(ecg_signal, sampling_rate=sampling_rate)
_, rpeaks = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate)

_, waves = nk.ecg_delineate(ecg_cleaned, rpeaks, sampling_rate=sampling_rate, method="dwt")

time = np.arange(len(ecg_cleaned)) / sampling_rate

# Plot ECG
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=ecg_cleaned, mode='lines', name='ECG Signal'))

baseline_points = []

duration = []
qrs_amplitude = []

for i in range(len(rpeaks["ECG_R_Peaks"])):
    try:
        r_peaks = rpeaks["ECG_R_Peaks"][i]
        r_peaks = int(r_peaks)
        
        fig.add_trace(go.Scatter(
            x=[time[r_peaks]], y=[ecg_cleaned[r_peaks]],
            mode='markers+text',
            marker=dict(color='pink', size=7),
            text=["R"],
            textposition="top center",
            name="R Peaks" if i == 0 else "",  # legend only once
            showlegend=(i == 0)
        ))
    except Exception:
        continue

# Final layout
fig.update_layout(
    title="ECG signal",
    xaxis_title="Time (s)",
    yaxis_title="Amplitude (mV)",
    template="plotly_white"
)

fig.show()

In [112]:
df_status, lead_plot_data = mi.analyze_all_leads(patient_data, signals, lead_names, sampling_rate)

contiguous_results = mi.identify_contiguous_regions(df_status)
contiguous_results

[Error] ECG delineation failed: integer division or modulo by zero



Too few peaks detected to compute the rate. Returning empty vector.



[('Anterior', ['V2', 'V4']),
 ('Anterolateral', ['V4', 'V6']),
 ('Lateral', ['I', 'V6'])]

In [113]:
df_status

Unnamed: 0,lead,status,duration,r/s ratio
0,I,depression_and_elevation_and_prominent_r,"[68, 50, 45, 49, 50, 50, 49, 51, 49]",0.88091
1,V1,invalid,,
2,V2,depression_and_elevation_and_prominent_r,"[90, 46, 58, 47, 94, 31, 19, 23, 25]",1.440777
3,V4,depression_and_elevation_and_prominent_r,"[19, 24, 18, 18, 39, 37, 36, 37]",0.70516
4,V6,depression_and_elevation_and_prominent_r,"[36, 48, 38, 28, 29, 50, 45, 43, 31, 52, 34, 33]",0.330016
5,II,depression_and_elevation_and_prominent_r,"[46, 41, 60, 57, 55, 75, 66]",6.424728
