데이터 로드 및 준비

In [None]:
# 1. 패키지 설치
!pip install wfdb

# 2. 데이터 다운로드 (한 번만)
import wfdb
wfdb.dl_database('qtdb', 'qtdb', records=['sel100'])


# QTDB 샘플 불러오기 (필요에 따라 경로/이름만 바꾸면 됨)
record = wfdb.rdrecord('qtdb/sel100')
ecg_raw = record.p_signal[:, 0]
annotation = wfdb.rdann('qtdb/sel100', 'q1c')
samples = annotation.sample
symbols = annotation.symbol


import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
df = pd.DataFrame({'ecg_signal': ecg_raw})
df.head(10)

데이터 시각화

In [None]:
record = wfdb.rdrecord('qtdb/sel100')

# 첫 번째 채널의 신호 (전체)
ecg_raw = record.p_signal[:, 0]

plt.figure(figsize=(15, 3))
plt.plot(ecg_raw, color='tab:blue')
plt.title("Original ECG Signal ('sel100')")
plt.xlabel("Sample Index")
plt.ylabel("Voltage (mV)")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
record = wfdb.rdrecord('qtdb/sel100')
ecg_raw = record.p_signal[:, 0]

# 앞쪽 2000포인트만 확대해서 보기
view_len = 2000
plt.figure(figsize=(15, 3))
plt.plot(ecg_raw[:view_len], color='tab:blue')
plt.title(f"Original ECG Signal ('sel100') [First {view_len} points]")
plt.xlabel("Sample Index")
plt.ylabel("Voltage (mV)")
plt.grid(True)
plt.tight_layout()
plt.show()

구간 기반 라벨링(Q wave:1, QRS wave: 2, ground:0)

In [None]:
ecg_raw = (ecg_raw - np.mean(ecg_raw)) / np.std(ecg_raw)

In [None]:
label_map = {'p': 1, 'N': 2}  # Q파: 'p', QRS: 'N'
y = np.zeros(len(ecg_raw), dtype=int)
current_wave = None
onset = None

for idx, sym in enumerate(symbols):
    sample_idx = samples[idx]
    if sym == '(':
        onset = sample_idx
        if idx + 1 < len(symbols) and symbols[idx + 1] in label_map:
            current_wave = symbols[idx + 1]
    elif sym == ')':
        offset = sample_idx
        if current_wave is not None and onset is not None:
            y[onset:offset+1] = label_map[current_wave]
        onset = None
        current_wave = None


라벨 구간만 자르기

In [None]:
# Load ECG signal
record = wfdb.rdrecord('qtdb/sel100')
ecg_raw = record.p_signal[:, 0]

# Load annotation
annotation = wfdb.rdann('qtdb/sel100', 'q1c')
samples = annotation.sample
symbols = annotation.symbol


In [None]:
idx_labeled = np.where(y != 0)[0]
start_idx = np.min(idx_labeled)
end_idx = np.max(idx_labeled)
ecg_labeled = ecg_raw[start_idx:end_idx+1]
y_labeled = y[start_idx:end_idx+1]


In [None]:
plt.figure(figsize=(15, 3))
plt.plot(ecg_labeled, label='ECG (standardized)', color='blue')
plt.plot(y_labeled / 2 + np.min(ecg_labeled), label='Label (scaled)', color='red', alpha=0.6, linewidth=2)
plt.title('Labeled ECG Region with Wave Segmentation')
plt.xlabel('Sample Index (cropped)')
plt.ylabel('Normalized Voltage (a.u.)')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
# 샘플링 주파수(PhysioNet QTDB는 250Hz가 기본)
fs = 250

# 라벨(1,2)이 붙은 구간 인덱스 추출
idx_labeled = np.where(y != 0)[0]
start_idx = np.min(idx_labeled)
end_idx = np.max(idx_labeled)
length = end_idx - start_idx + 1

print(f"Label starts at: {start_idx} (sample index)")
print(f"Label ends at:   {end_idx} (sample index)")
print(f"Label region length: {length} samples")
print(f"Label region duration: {length / fs:.2f} seconds")
print(f"Label starts at: {start_idx / fs:.2f} sec, ends at: {end_idx / fs:.2f} sec")

In [None]:
zoom_start = 152000    # 관심있는 파형 구간 시작 인덱스 (원하는 곳으로 이동)
zoom_len = 600         # 200포인트(샘플)만 보기

plt.figure(figsize=(15, 4))
plt.plot(np.arange(zoom_start, zoom_start + zoom_len), ecg_raw[zoom_start:zoom_start + zoom_len], label='ECG', color='blue')
plt.plot(np.arange(zoom_start, zoom_start + zoom_len), y[zoom_start:zoom_start + zoom_len]/2 + np.min(ecg_raw),
         label='Label (scaled)', color='red', alpha=0.7, linewidth=2)
plt.title('ECG Signal and Wave Region Label (Detail View)')
plt.xlabel('Sample Index')
plt.ylabel('Voltage (mV)')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
print(y_labeled[:500])  # 앞쪽 1000개 값만 출력

윈도우 슬라이싱 & 특징 추출

In [None]:
win_size = 4
step = 2

X_feat = []
y_win = []

for start in range(0, len(ecg_labeled) - win_size, step):
    window = ecg_labeled[start:start+win_size]
    center_label = y_labeled[start + win_size//2]
    features = [
        np.mean(window),
        np.std(window),
        np.max(window),
        np.min(window),
        np.ptp(window)
    ]
    X_feat.append(features)
    y_win.append(center_label)

X_feat = np.array(X_feat)
y_win = np.array(y_win)

In [None]:
print(y_win[:100])  # window 중심 라벨도 비슷하게 반복되는지

데이터 분할

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X_feat, y_win, test_size=0.3, random_state=42, stratify=y_win
)

KNN 데이터 분류/평가

In [None]:
from sklearn.neighbors import KNeighborsClassifier

knn = KNeighborsClassifier(n_neighbors=3)
knn.fit(X_train, y_train)
y_pred = knn.predict(X_test)

from sklearn.metrics import classification_report, confusion_matrix
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred, zero_division=0))

실험2 파형 경계점 자동 검출

In [None]:
# y_labeled는 기존 구간 기반 라벨(0:배경, 1:Q파, 2:QRS파, ...)

# Transition points: where label value changes
transitions = []
for i in range(1, len(y_labeled)):
    prev, curr = y_labeled[i-1], y_labeled[i]
    if prev != curr:
        transitions.append((i, prev, curr))

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# 1) 라벨링이 붙은 부분만 추출
idx_labeled = np.where(y != 0)[0]
start_idx = np.min(idx_labeled)
end_idx = np.max(idx_labeled)
ecg_show = ecg_raw[start_idx:end_idx+1]
y_show = y[start_idx:end_idx+1]

# 2) Transition Point (Onset/Offset) 추출
onsets_1 = [i for i in range(1, len(y_show)) if y_show[i-1]==0 and y_show[i]==1]
offsets_1 = [i for i in range(1, len(y_show)) if y_show[i-1]==1 and y_show[i]==0]
onsets_2 = [i for i in range(1, len(y_show)) if y_show[i-1]==0 and y_show[i]==2]
offsets_2 = [i for i in range(1, len(y_show)) if y_show[i-1]==2 and y_show[i]==0]

# 3) 시각화
plt.figure(figsize=(15,4))
plt.plot(ecg_show, label='ECG', color='blue')
plt.plot(y_show/2 + np.min(ecg_show), label='Label (scaled)', color='gray', alpha=0.6, linewidth=2)
plt.scatter(onsets_1, ecg_show[onsets_1], label='Q wave onset', color='red', marker='o')
plt.scatter(offsets_1, ecg_show[offsets_1], label='Q wave offset', color='red', marker='x')
plt.scatter(onsets_2, ecg_show[onsets_2], label='QRS onset', color='green', marker='o')
plt.scatter(offsets_2, ecg_show[offsets_2], label='QRS offset', color='green', marker='x')
plt.title('ECG with Detected Wave Boundaries (Onset/Offset)')
plt.xlabel('Sample Index')
plt.ylabel('Voltage (mV)')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# 예: 보고 싶은 구간 지정 (start는 0 이상, end는 len(ecg_show) 이하로)
zoom_start = 750    # 시작 인덱스 (직접 원하는 위치로 조정)
zoom_len = 400      # 확대해서 보고 싶은 길이 (샘플 수)
zoom_end = zoom_start + zoom_len

# x축 (확대된 인덱스)
x_axis = np.arange(zoom_start, zoom_end)

# 확대된 신호, 라벨
ecg_zoom = ecg_show[zoom_start:zoom_end]
y_zoom = y_show[zoom_start:zoom_end]

# 확대된 구간 내에서 transition point만 추출
onsets_1_zoom = [i for i in range(1, zoom_len) if y_zoom[i-1]==0 and y_zoom[i]==1]
offsets_1_zoom = [i for i in range(1, zoom_len) if y_zoom[i-1]==1 and y_zoom[i]==0]
onsets_2_zoom = [i for i in range(1, zoom_len) if y_zoom[i-1]==0 and y_zoom[i]==2]
offsets_2_zoom = [i for i in range(1, zoom_len) if y_zoom[i-1]==2 and y_zoom[i]==0]

plt.figure(figsize=(15, 4))
plt.plot(x_axis, ecg_zoom, label='ECG', color='blue')
plt.plot(x_axis, y_zoom/2 + np.min(ecg_zoom), label='Label (scaled)', color='gray', alpha=0.6, linewidth=2)
plt.scatter(x_axis[onsets_1_zoom], ecg_zoom[onsets_1_zoom], label='Q wave onset', color='red', marker='o')
plt.scatter(x_axis[offsets_1_zoom], ecg_zoom[offsets_1_zoom], label='Q wave offset', color='red', marker='x')
plt.scatter(x_axis[onsets_2_zoom], ecg_zoom[onsets_2_zoom], label='QRS onset', color='green', marker='o')
plt.scatter(x_axis[offsets_2_zoom], ecg_zoom[offsets_2_zoom], label='QRS offset', color='green', marker='x')
plt.title('Zoomed ECG with Detected Wave Boundaries')
plt.xlabel('Sample Index (zoomed)')
plt.ylabel('Voltage (mV)')
plt.legend()
plt.grid(True)
plt.show()

실험2 훈련 및 테스트

In [None]:
from scipy.stats import mode

win_size = 30
step = 10

X_feat = []
y_win = []

for start in range(0, len(ecg_labeled) - win_size, step):
    window = ecg_labeled[start:start + win_size]
    window_labels = y_labeled[start:start + win_size]
    # mode() 함수의 결과를 항상 .mode[0]로 가져오기
    label = mode(window_labels, keepdims=True).mode[0]
    features = [
        np.mean(window),
        np.std(window),
        np.max(window),
        np.min(window),
        np.ptp(window)
    ]
    X_feat.append(features)
    y_win.append(label)

X_feat = np.array(X_feat)
y_win = np.array(y_win)

In [None]:
# 2. Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X_feat, y_win, test_size=0.3, random_state=42, stratify=y_win
)


모델 평가

In [None]:
#KNN 모델 훈련/예측/평가
knn = KNeighborsClassifier(n_neighbors=3)
knn.fit(X_train, y_train)
y_pred = knn.predict(X_test)

print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred, zero_division=0))

In [None]:
from sklearn.ensemble import RandomForestClassifier

rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)

from sklearn.metrics import classification_report, confusion_matrix
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred, zero_division=0))

In [None]:
# 정답 경계점
gt_onset = [i for i in range(1, len(y_test)) if y_test[i-1]==0 and y_test[i]==1]
# 예측 경계점
pred_onset = [i for i in range(1, len(y_pred)) if y_pred[i-1]==0 and y_pred[i]==1]

print("GT onsets:", gt_onset)
print("Pred onsets:", pred_onset)

# 예를 들어 10샘플 이내로 맞췄으면 TP로 보는 식의 평가도 가능
def count_matched(gt, pred, tol=10):
    matched = 0
    for g in gt:
        if any(abs(p-g) <= tol for p in pred):
            matched += 1
    return matched, len(gt)

matched, total = count_matched(gt_onset, pred_onset)
print(f"Q-wave onset detection rate (within 10 samples): {matched}/{total} = {matched/total:.2f}")


정답 vs 예측 시각화

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay

cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues, values_format='d')
plt.title("Confusion Matrix (KNN Test Set)")
plt.show()

경계점 검출/정확도 평가

In [None]:
# 예: Q파 시작점(0→1) 검출 성능
gt_onset = [i for i in range(1, len(y_test)) if y_test[i-1]==0 and y_test[i]==1]
pred_onset = [i for i in range(1, len(y_pred)) if y_pred[i-1]==0 and y_pred[i]==1]

def count_matched(gt, pred, tol=10):
    matched = 0
    for g in gt:
        if any(abs(p-g) <= tol for p in pred):
            matched += 1
    return matched, len(gt)

matched, total = count_matched(gt_onset, pred_onset)
print(f"Q-wave onset detection rate (within 10 samples): {matched}/{total} = {matched/total:.2f}")


In [None]:
n_show = 150
plt.figure(figsize=(15, 3))
plt.plot(y_test[:n_show], label='True label', linewidth=2, color='red')
plt.plot(y_pred[:n_show], label='Predicted', linewidth=2, color='blue', alpha=0.6)
plt.legend()
plt.title('True vs Predicted Label Sequence (First 150 Windows)')
plt.xlabel('Window Index')
plt.ylabel('Class Label')
plt.grid(True)
plt.show()
