In [3]:
!pip install wfdb

!wget -r -N -c -np https://physionet.org/files/mitdb/1.0.0/
# 利用wget完成数据下载

--2024-05-07 04:33:50--  https://physionet.org/files/mitdb/1.0.0/
Resolving physionet.org (physionet.org)... 18.13.52.205
Connecting to physionet.org (physionet.org)|18.13.52.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘physionet.org/files/mitdb/1.0.0/index.html’

physionet.org/files     [ <=>                ]  22.86K  --.-KB/s    in 0.02s   

Last-modified header missing -- time-stamps turned off.
2024-05-07 04:33:51 (1007 KB/s) - ‘physionet.org/files/mitdb/1.0.0/index.html’ saved [23410]

Loading robots.txt; please ignore errors.
--2024-05-07 04:33:51--  https://physionet.org/robots.txt
Reusing existing connection to physionet.org:443.
HTTP request sent, awaiting response... 200 OK

    The file is already fully retrieved; nothing to do.

--2024-05-07 04:33:51--  https://physionet.org/files/mitdb/1.0.0/mitdbdir/
Reusing existing connection to physionet.org:443.
HTTP request sent, awaiting response... 304 Not Modifie

In [None]:
# 导入库
import wfdb
import numpy as np
import matplotlib.pyplot as plt
from spt.plot_params import set_roman_plot_params
import pywt
import pandas as pd

In [None]:
set_roman_plot_params()
# plt.rcParams['font.family'] = 'serif'
# plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']

record = wfdb.rdrecord('./physionet.org/files/mitdb/1.0.0/109', # 文件所在路径
                       sampfrom=0, # 读取100这个记录的起点，从第0个点开始读
                       sampto=4000, # 读取记录的终点，到1000个点结束
                       physical=False, # 若为True则读取原始信号p_signal，如果为False则读取数字信号d_signal，默认为False
                       channels=[0]) # 读取那个通道，也可以用channel_names指定某个通道;如channel_names=['MLII']
# 小波去噪
def denoise(data):
    # 小波变换
    coeffs = pywt.wavedec(data=data, wavelet='db5', level=8)
    cD9, cD8, cD7, cD6, cD5, cD4, cD3, cD2, cD1 = coeffs

    # 阈值去噪
    threshold = (np.median(np.abs(cD1)) / 0.6745) * (np.sqrt(2 * np.log(len(cD1))))
    cD1.fill(0)
    cD2.fill(0)
    for i in range(1, len(coeffs) - 2):
        coeffs[i] = pywt.threshold(coeffs[i], threshold)

    # 小波反变换,获取去噪后的信号
    rdata = pywt.waverec(coeffs=coeffs, wavelet='db5')
    return rdata
# 转为数字信号
signal = record.d_signal[0:1000]
amplitude = (signal - np.min(signal))
amplitude = amplitude - np.mean(amplitude)
amplitude = amplitude / np.max(amplitude)
amplitude = pd.DataFrame(amplitude)
amplitude.to_csv('V_ecg.csv')  #未去噪的V类ECG数据
# plt.plot(amplitude)
# plt.show()

signal = denoise(signal)



In [None]:
# 获取心拍，并分类
record = wfdb.rdrecord('./physionet.org/files/mitdb/1.0.0/100', # 文件所在路径
                       sampfrom=0, # 读取100这个记录的起点，从第0个点开始读
                       sampto=4000, # 读取记录的终点，到1000个点结束
                       physical=False, # 若为True则读取原始信号p_signal，如果为False则读取数字信号d_signal，默认为False
                       channels=[0]) # 读取那个通道，也可以用channel_names指定某个通道;如channel_names=['MLII']
signal = record.d_signal[0:1000]
amplitude = (signal - np.min(signal))
amplitude = amplitude - np.mean(amplitude)
amplitude = amplitude / np.max(amplitude)
amplitude = pd.DataFrame(amplitude)
amplitude.to_csv('N_ecg.csv')  #未去噪的V类ECG数据


signal_ann = wfdb.rdann("./physionet.org/files/mitdb/1.0.0/100", "atr", sampfrom=0, sampto=1000)
# 将读取到的annatations的心拍绘制到心电图上

print(signal_ann.symbol)



['+', 'N', 'N', 'N', 'N']


In [None]:
# 找出F类心拍
url= './physionet.org/files/mitdb/1.0.0/208'
record = wfdb.rdrecord(url, # 文件所在路径
                       sampfrom=0, # 读取100这个记录的起点，从第0个点开始读
                       sampto=4000, # 读取记录的终点，到1000个点结束
                       physical=False, # 若为True则读取原始信号p_signal，如果为False则读取数字信号d_signal，默认为False
                       channels=[0]) # 读取那个通道，也可以用channel_names指定某个通道;如channel_names=['MLII']
signal = record.d_signal[0:4000]
amplitude = (signal - np.min(signal))
amplitude = amplitude - np.mean(amplitude)
amplitude = amplitude / np.max(amplitude)
amplitude = pd.DataFrame(amplitude)
amplitude.to_csv('F_ecg.csv')  #未去噪的V类ECG数据


signal_ann = wfdb.rdann(url, "atr", sampfrom=0, sampto=1000)
# 将读取到的annatations的心拍绘制到心电图上

print(signal_ann.symbol)

['+', 'F', 'V', 'N', 'F', 'V']


In [None]:
# 找出S类心拍
url= './physionet.org/files/mitdb/1.0.0/202'
record = wfdb.rdrecord(url, # 文件所在路径
                       sampfrom=0, # 读取100这个记录的起点，从第0个点开始读
                       sampto=38000, # 读取记录的终点，到1000个点结束
                       physical=False, # 若为True则读取原始信号p_signal，如果为False则读取数字信号d_signal，默认为False
                       channels=[0]) # 读取那个通道，也可以用channel_names指定某个通道;如channel_names=['MLII']
signal = record.d_signal[0:8000]
amplitude = (signal - np.min(signal))
amplitude = amplitude - np.mean(amplitude)
amplitude = amplitude / np.max(amplitude)
amplitude = pd.DataFrame(amplitude)
# amplitude.to_csv('S_ecg.csv')  #未去噪的V类ECG数据


signal_ann = wfdb.rdann(url, "atr", sampfrom=0)
# 将读取到的annatations的心拍绘制到心电图上
s_beats = np.where(signal_ann.symbol == 'S')[0]
print(s_beats)

[]


  s_beats = np.where(signal_ann.symbol == 'S')[0]


In [1]:
import os
import wfdb

# Change directory to the desired folder
os.chdir('./physionet.org/files/mitdb/1.0.0/')

# Initialize an empty list to store the S-type heartbeats
s_type_beats = []

# Loop through all files in the directory
for filename in os.listdir():
    # Check if the file ends with '.atr'
    if filename.endswith('.atr'):
        # Extract the filename without extension
        base_filename = os.path.splitext(filename)[0]

        # Read the annotation file
        annotation = wfdb.rdann(base_filename, 'atr')

        # Loop through each annotation
        for annotation_sample, annotation_symbol in zip(annotation.sample, annotation.symbol):
            # Check if the symbol is 'S'
            if annotation_symbol == 'S':
                # Add the sample number to the list
                s_type_beats.append(annotation_sample)

# Print the list of S-type heartbeats
print(s_type_beats)

ModuleNotFoundError: No module named 'wfdb'