In [1]:
import numpy as np

# 读取文件数据的函数
def read_peaks(file_path):
    with open(file_path, 'r') as file:
        peaks = [int(line.strip()) for line in file.readlines()]
    return np.array(peaks)

# 提取特征的函数
def extract_ecg_features(r_peaks, p_peaks, q_peaks, s_peaks, t_peaks):
    features = {}

    # Calculate distances
    features['R–P distance'] = np.abs(r_peaks - p_peaks)
    features['R–T distance'] = np.abs(r_peaks - t_peaks)
    features['R–Q distance'] = np.abs(r_peaks - q_peaks)
    features['R–S distance'] = np.abs(r_peaks - s_peaks)
    
    # Calculate widths
    features['P width'] = np.diff(p_peaks)
    features['T width'] = np.diff(t_peaks)
    
    features['S–T distance'] = np.abs(s_peaks - t_peaks)
    features['P–Q distance'] = np.abs(p_peaks - q_peaks)
    features['P–T distance'] = np.abs(p_peaks - t_peaks)
    
    return features

# 读取各个波峰位置文件
r_peaks = read_peaks('peak_r_zzy.txt')
p_peaks = read_peaks('peak_p_zzy.txt')
q_peaks = read_peaks('peak_q_zzy.txt')
s_peaks = read_peaks('peak_s_zzy.txt')
t_peaks = read_peaks('peak_t_zzy.txt')

# 提取特征
features = extract_ecg_features(r_peaks, p_peaks, q_peaks, s_peaks, t_peaks)

# 打印每个特征的12组值
for key, value in features.items():
    print(f'{key}: {value}')


R–P distance: [28 31 31 36 31 28 31 30 34 33 16]
R–T distance: [36 39 38 38 40 38 34 38 37 37 40]
R–Q distance: [12 11 11 11  9 11 11 11 11 12 13]
R–S distance: [6 6 6 6 7 6 6 6 6 6 6]
P width: [145 138 129 143 142 135 134 138 146 161]
T width: [151 137 134 140 137 134 137 141 145 147]
S–T distance: [30 33 32 32 33 32 28 32 31 31 34]
P–Q distance: [16 20 20 25 22 17 20 19 23 21  3]
P–T distance: [64 70 69 74 71 66 65 68 71 70 56]


In [24]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

# 读取文件数据的函数
def read_peaks(file_path):
    with open(file_path, 'r') as file:
        peaks = [int(line.strip()) for line in file.readlines()]
    return np.array(peaks)

# 提取特征的函数（不取平均值）
def extract_ecg_features(r_peaks, p_peaks, q_peaks, s_peaks, t_peaks):
    features = {}

    # Calculate distances and widths (keeping all values)
    features['R_P'] = np.abs(r_peaks - p_peaks)
    features['R_T'] = np.abs(r_peaks - t_peaks)
    features['R_Q'] = np.abs(r_peaks - q_peaks)
    features['R_S'] = np.abs(r_peaks - s_peaks)
    features['P_width'] = np.diff(p_peaks) if len(p_peaks) > 1 else np.array([0])
    features['T_width'] = np.diff(t_peaks) if len(t_peaks) > 1 else np.array([0])
    features['S_T'] = np.abs(s_peaks - t_peaks)
    features['P_Q'] = np.abs(p_peaks - q_peaks)
    features['P_T'] = np.abs(p_peaks - t_peaks)
    
    return features

# 读取和提取每个人的ECG特征
def load_person_data(person):
    r_peaks = read_peaks(f'peak_r_{person}.txt')
    p_peaks = read_peaks(f'peak_p_{person}.txt')
    q_peaks = read_peaks(f'peak_q_{person}.txt')
    s_peaks = read_peaks(f'peak_s_{person}.txt')
    t_peaks = read_peaks(f'peak_t_{person}.txt')

    features = extract_ecg_features(r_peaks, p_peaks, q_peaks, s_peaks, t_peaks)
    return features

# 定义人的ID和对应的标签
persons = ['hyf', 'ykw', 'zzy']
labels = {person: i for i, person in enumerate(persons)}

# 加载所有人的数据
data = []

for person in persons:
    features = load_person_data(person)
    max_len = max(len(feature) for feature in features.values())
    for key in features:
        features[key] = np.pad(features[key], (0, max_len - len(features[key])), 'constant')
    df = pd.DataFrame(features)
    df['label'] = labels[person]
    data.append(df)

# 合并所有人的数据
df = pd.concat(data, ignore_index=True)

# 打印特征数据
print("所有特征数据:")
print(df)

# 按标签分组，并移除每组的最后一行
df = df.groupby('label').apply(lambda x: x.iloc[:-1]).reset_index(drop=True)

# 打印处理后的特征数据
print("\n处理后的特征数据:")
print(df)

# 分离特征和标签
X = df.drop('label', axis=1)
y = df['label']

# 将数据分成训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
print("\n训练集特征 (X_train):")
print(X_train)
print("\n训练集标签 (y_train):")
print(y_train)
print("\n测试集特征 (X_test):")
print(X_test)
print("\n测试集标签 (y_test):")
print(y_test)

# 创建KNN分类器，设置k值
k = 2                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
knn = KNeighborsClassifier(n_neighbors=k)

# 训练模型
knn.fit(X_train, y_train)
print("\n模型训练完成。")

# 在测试集上进行预测
y_pred = knn.predict(X_test)
print("\n测试集预测结果 (y_pred):")
print(y_pred)

# 计算模型准确率
accuracy = accuracy_score(y_test, y_pred)
print(f'\n模型准确率: {accuracy * 100:.2f}%')


所有特征数据:
    R_P  R_T  R_Q  R_S  P_width  T_width  S_T  P_Q  P_T  label
0    16   45    9    5      141      142   40    7   61      0
1    16   46   10    5      128      144   41    6   62      0
2    30   48   10    5      160      146   43   20   78      0
3    16   48   12    5      133      140   43    4   64      0
4    25   46    7    5      143      146   41   18   71      0
5    27   47    7    5      159      146   42   20   74      0
6    15   46    8    6      134      149   40    7   61      0
7    28   48    9    6      148      147   42   19   76      0
8    29   46   11    5      155      151   41   18   75      0
9    23   48   11    5      137      141   43   12   71      0
10   29   46   10    5        0        0   41   19   75      0
11   27   48    9    7      173      177   41   18   75      1
12   28   51   10    6      185      181   45   18   79      1
13   26   49    9    7      183      187   42   17   75      1
14   28   51    7    7      181      176   44  

  df = df.groupby('label').apply(lambda x: x.iloc[:-1]).reset_index(drop=True)


In [64]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

# 读取文件数据的函数
def read_peaks(file_path):
    with open(file_path, 'r') as file:
        peaks = [int(line.strip()) for line in file.readlines()]
    return np.array(peaks)

# 提取特征的函数（不取平均值）
def extract_ecg_features(r_peaks, p_peaks, q_peaks, s_peaks, t_peaks):
    features = {}
    features['R_P'] = np.abs(r_peaks - p_peaks)
    features['R_T'] = np.abs(r_peaks - t_peaks)
    features['R_Q'] = np.abs(r_peaks - q_peaks)
    features['R_S'] = np.abs(r_peaks - s_peaks)
    features['P_width'] = np.diff(p_peaks) if len(p_peaks) > 1 else np.array([0])
    features['T_width'] = np.diff(t_peaks) if len(t_peaks) > 1 else np.array([0])
    features['S_T'] = np.abs(s_peaks - t_peaks)
    features['P_Q'] = np.abs(p_peaks - q_peaks)
    features['P_T'] = np.abs(p_peaks - t_peaks)
    return features

# 移除异常值的函数
def remove_outliers(df, columns, threshold=2.5):
    for col in columns:
        mean = df[col].mean()
        std = df[col].std()
        print(f"Processing column {col}: mean={mean}, std={std}")  # Debug output
        outliers = df[(df[col] < mean - threshold * std) | (df[col] > mean + threshold * std)]
        num_outliers = len(outliers)
        print(f"Outliers in {col} ({num_outliers}):\n{outliers}")  # Debug output for outliers
        df = df[(df[col] >= mean - threshold * std) & (df[col] <= mean + threshold * std)]
    return df

# 读取和提取每个人的ECG特征
def load_person_data(person):
    r_peaks = read_peaks(f'peak_r_{person}.txt')
    p_peaks = read_peaks(f'peak_p_{person}.txt')
    q_peaks = read_peaks(f'peak_q_{person}.txt')
    s_peaks = read_peaks(f'peak_s_{person}.txt')
    t_peaks = read_peaks(f'peak_t_{person}.txt')
    features = extract_ecg_features(r_peaks, p_peaks, q_peaks, s_peaks, t_peaks)
    return features

# 定义人的ID和对应的标签
persons = ['hyf', 'ykw', 'zzy']
labels = {person: i for i, person in enumerate(persons)}

# 加载所有人的数据
data = []

for person in persons:
    features = load_person_data(person)
    max_len = max(len(feature) for feature in features.values())
    for key in features:
        features[key] = np.pad(features[key], (0, max_len - len(features[key])), 'constant')
    df = pd.DataFrame(features)
    df['label'] = labels[person]
    data.append(df)

# 合并所有人的数据
df = pd.concat(data, ignore_index=True)

# 打印特征数据
print("所有特征数据:")
print(df)

# 按标签分组，并移除每组的最后一行
df = df.groupby('label').apply(lambda x: x.iloc[:-1]).reset_index(drop=True)

# 打印处理后的特征数据
print("\n处理后的特征数据:")
print(df)

# 去除异常值
columns = ['R_P', 'R_T', 'R_Q', 'R_S', 'P_width', 'T_width', 'S_T', 'P_Q', 'P_T']
df = remove_outliers(df, columns)

# 打印去除异常值后的特征数据
print("\n去除异常值后的特征数据:")
print(df)

# 分离特征和标签
X = df.drop('label', axis=1)
y = df['label']

# 将数据分成训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
print("\n训练集特征 (X_train):")
print(X_train)
print("\n训练集标签 (y_train):")
print(y_train)
print("\n测试集特征 (X_test):")
print(X_test)
print("\n测试集标签 (y_test):")
print(y_test)

# 创建KNN分类器，设置k值
k = 3
knn = KNeighborsClassifier(n_neighbors=k)

# 训练模型
knn.fit(X_train, y_train)
print("\n模型训练完成。")

# 在测试集上进行预测
y_pred = knn.predict(X_test)
print("\n测试集预测结果 (y_pred):")
print(y_pred)

# 计算模型准确率
accuracy = accuracy_score(y_test, y_pred)
print(f'\n模型准确率: {accuracy * 100:.2f}%')

# 加载测试数据并进行预测
def load_test_data():
    r_peaks = read_peaks('peak_r_test.txt')
    p_peaks = read_peaks('peak_p_test.txt')
    q_peaks = read_peaks('peak_q_test.txt')
    s_peaks = read_peaks('peak_s_test.txt')
    t_peaks = read_peaks('peak_t_test.txt')

    features = extract_ecg_features(r_peaks, p_peaks, q_peaks, s_peaks, t_peaks)
    max_len = max(len(feature) for feature in features.values())
    for key in features:
        features[key] = np.pad(features[key], (0, max_len - len(features[key])), 'constant')
    df = pd.DataFrame(features)
    
    return df

# 加载测试数据
test_df = load_test_data()

# 打印测试数据之前
print("\n去除异常值前的测试数据:")
print(test_df)

# 去除测试数据中的异常值
test_df = remove_outliers(test_df, columns)

test_df = test_df.iloc[:-1]

print("\n去除异常值后的测试数据:")
print(test_df)

# 使用模型进行预测
test_pred = knn.predict(test_df)

# 映射标签到人员
test_df['predicted_label'] = test_pred
test_df['predicted_person'] = test_df['predicted_label'].apply(lambda x: list(labels.keys())[list(labels.values()).index(x)])

# 打印每一条测试数据的预测结果
print("\n测试数据的分类结果:")
print(test_df)


所有特征数据:
    R_P  R_T  R_Q  R_S  P_width  T_width  S_T  P_Q  P_T  label
0    16   45    9    5      141      142   40    7   61      0
1    16   46   10    5      128      144   41    6   62      0
2    30   48   10    5      160      146   43   20   78      0
3    16   48   12    5      133      140   43    4   64      0
4    25   46    7    5      143      146   41   18   71      0
5    27   47    7    5      159      146   42   20   74      0
6    15   46    8    6      134      149   40    7   61      0
7    28   48    9    6      148      147   42   19   76      0
8    29   46   11    5      155      151   41   18   75      0
9    23   48   11    5      137      141   43   12   71      0
10   29   46   10    5        0        0   41   19   75      0
11   27   48    9    7      173      177   41   18   75      1
12   28   51   10    6      185      181   45   18   79      1
13   26   49    9    7      183      187   42   17   75      1
14   28   51    7    7      181      176   44  

  df = df.groupby('label').apply(lambda x: x.iloc[:-1]).reset_index(drop=True)


In [None]:
test_df = test_df.iloc[:-1]