# ECGPUWAVE Onsets/Offsets Detection
Notebook to detect clinically accurate P/QRS/T onsets, peaks, and offsets using WFDB's ecgpuwave tool.

In [1]:

# Install dependencies
!pip install wfdb
# Note: You also need to install WFDB tools (with ecgpuwave) separately on your system:
# Linux: sudo apt-get install wfdb-tools
# MacOS: brew install wfdb
# Windows: Download ecgpuwave from PhysioNet and add to PATH





[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:

import os
import subprocess
import wfdb
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Path to your record (adjust as needed)
record_name = 'ptbxl_data/records100/00000/00001_lr'
annotator = 'wave'
out_annot = f'{record_name}.{annotator}'

# Run ecgpuwave
exit_code = subprocess.call(['ecgpuwave', '-r', record_name, '-a', annotator])
if exit_code != 0:
    print("⚠️ ecgpuwave failed or is not installed. Install wfdb-tools and retry.")
else:
    print("✅ ecgpuwave completed successfully")


FileNotFoundError: [WinError 2] The system cannot find the file specified

In [3]:

# Read annotations
ann = wfdb.rdann(record_name, annotator)
df_ann = pd.DataFrame({'sample': ann.sample, 'symbol': ann.symbol, 'num': ann.num})
df_ann.head()


FileNotFoundError: [Errno 2] No such file or directory: 'f:/MLdataset/ptbxl_data/records100/00000/00001_lr.wave'

In [None]:

# Map symbols to descriptive labels
sym_map = {'p':'P_peak','N':'R_peak','t':'T_peak',
           '(':{'0':'P_onset','1':'Q_onset','2':'T_onset'},
           ')':{'0':'P_offset','1':'S_offset','2':'T_offset'}}

entries = []
for samp, sym, num in zip(ann.sample, ann.symbol, ann.num):
    if sym in ['p','N','t']:
        label = sym_map[sym]
    elif sym in ['(',')']:
        label = sym_map[sym].get(str(num),'unknown')
    else:
        continue
    entries.append((samp,label))

df_fid = pd.DataFrame(entries, columns=['sample','label'])
df_fid.head(10)


In [None]:

# Load ECG signal
sig, _ = wfdb.rdsamp(record_name)
fs = 100  # Sampling frequency for low-res PTBXL
ecg = sig[:,1]  # Lead II

# Plot ECG with detected fiducials
plt.figure(figsize=(15,4))
plt.plot(ecg, color='black', label='ECG')

colors = {'P_onset':'cyan','P_offset':'blue','P_peak':'navy',
          'Q_onset':'lime','R_peak':'red','S_offset':'orange',
          'T_onset':'magenta','T_peak':'purple','T_offset':'pink'}

for label, group in df_fid.groupby('label'):
    if label in colors:
        plt.scatter(group['sample'], ecg[group['sample']], label=label, color=colors[label], s=30)

plt.legend(loc='upper left', ncol=3)
plt.title('ECGPUWAVE Detected P/QRS/T Onsets, Peaks, and Offsets')
plt.xlabel('Sample Index')
plt.ylabel('Amplitude')
plt.show()


In [None]:

# Extract arrays for computation
P_on = df_fid[df_fid['label']=='P_onset']['sample'].values
Q_on = df_fid[df_fid['label']=='Q_onset']['sample'].values
S_off = df_fid[df_fid['label']=='S_offset']['sample'].values
T_off = df_fid[df_fid['label']=='T_offset']['sample'].values

def safe_interval(arr1, arr2):
    return np.nanmean([(b-a)/fs*1000 for a,b in zip(arr1,arr2) if b>a]) if len(arr1) and len(arr2) else np.nan

PR = safe_interval(P_on, Q_on)
QRS = safe_interval(Q_on, S_off)
QT = safe_interval(Q_on, T_off)

print(f"PR interval: {PR:.2f} ms")
print(f"QRS duration: {QRS:.2f} ms")
print(f"QT interval: {QT:.2f} ms")
