In [68]:
import numpy as np
from scipy import signal
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties
import os
import pandas as pd
from tqdm import tqdm
import adi

In [69]:
# 設定中文字體
font_path = 'C:\\Windows\\Fonts\\simsun.ttc'  # 宋體字體路徑
font_prop = FontProperties(fname=font_path)

# 配置參數
SAVE_NPY = False  # 是否儲存npy檔案
PLOT_TYPES = ['break', 'break', 'break']  # 繪圖選項：['PPG', 'D1', 'D2']，'break'表示不繪圖
DATA_TYPE = 'Patient'  # 資料類型：'Patient'（病患）或'Normal'（正常人）

In [70]:
# 根據資料類型設置檔案路徑
if DATA_TYPE == 'Patient':
    DATA_DIR = 'F:\\Python\\PPG\\All data\\Patient_Signal'
    PLOT_DIR = f'{DATA_DIR}\\graph'
elif DATA_TYPE == 'Normal':
    DATA_DIR = 'F:\\Python\\PPG\\All data\\Normal_Signal'
    PLOT_DIR = f'{DATA_DIR}\\graph'
BAD_PLOT_DIR = 'F:\\Python\\PPG\\All data\\Bad_Graph'

# 設定CSV檔案路徑
PPG_CSV = f'{DATA_DIR}\\PPG.csv'
D2_CSV = f'{DATA_DIR}\\D2.csv'


In [71]:
# 刪除指定CSV檔案
def delete_csv(file_path):
    """刪除指定路徑的CSV檔案"""
    if os.path.exists(file_path):
        os.remove(file_path)
        print(f'{file_path} 已刪除')
    else:
        print(f'{file_path} 不存在')

# 初始化CSV檔案
delete_csv(PPG_CSV)
delete_csv(D2_CSV)

F:\Python\PPG\All data\Patient_Signal\PPG.csv 已刪除
F:\Python\PPG\All data\Patient_Signal\D2.csv 已刪除


In [72]:
# 巴特沃斯濾波器
def apply_butter_filter(left_data, right_data, lowcut=0.5, highcut=9, fs=1000, order=4):
    """對左右手訊號應用巴特沃斯帶通濾波"""
    nyquist = fs * 0.5
    low = lowcut / nyquist
    high = highcut / nyquist
    sos = signal.butter(order, [low, high], btype='bandpass', output='sos')
    filtered_left = signal.sosfiltfilt(sos, left_data)
    filtered_right = signal.sosfiltfilt(sos, right_data)
    return filtered_left, filtered_right

In [73]:
# 計算訊號導數
def compute_derivatives(data, level=3):
    """計算訊號的多階導數（最高到三階）"""
    values = []
    def derivative_recursive(data, level, values):
        if level == 0:
            values.append(data)
            return
        grad = np.gradient(data)
        values.append(data)
        derivative_recursive(grad, level - 1, values)
    
    derivative_recursive(data, level, values)
    origin, d1, d2, d3 = values
    return [origin, d1 * 50, d2 * 5000, d3 * 100000]  # 縮放導數值以便分析


In [74]:
# 尋找資料檔案路徑
def find_files(path):
    """遍歷指定路徑，尋找非txt檔案的路徑"""
    file_paths = []
    for root, _, files in os.walk(path):
        for file in files:
            if not file.endswith('.txt'):
                file_path = os.path.join(root, file).replace('\\', '/')
                file_paths.append(file_path)
    return file_paths

In [75]:
# 解析檔案資訊
def parse_file_info(file_path):
    """從檔案路徑提取姓名、手側（左/右）與狀態（易堵/正常）"""
    parts = file_path.split('/')
    name = parts[-3]
    state = '0' if parts[-4] == '易堵' else '1'
    hand = 'Right' if 'R' in parts[-1] else 'Left'
    return [name, hand, state]

In [76]:
# 繪製PPG圖
def plot_ppg_signal(waveform, peaks, name, cycle_idx, hand):
    """繪製PPG訊號及其峰值"""
    if PLOT_TYPES[0] == 'break':
        return
    x = np.linspace(0, len(waveform), len(waveform))
    plt.figure(figsize=(5, 3))
    plt.plot(waveform, label='PPG')
    plt.plot(x[peaks], waveform[peaks], 'r.', label='峰值')
    plt.title(f'{name} PPG訊號 {cycle_idx} {hand}', fontproperties=font_prop)
    plt.legend()
    plt.grid(True)
    if PLOT_TYPES[0] == 'save':
        plt.savefig(f'{PLOT_DIR}\\{name}, {cycle_idx + 1}th.jpg')
    elif PLOT_TYPES[0] == 'show':
        plt.show()
    plt.close()

In [77]:
# 計算PPG特徵
def compute_ppg_features(waveform, feature_points, name, cycle_idx, hand):
    """計算PPG訊號的特徵值"""
    peaks, peak_heights = find_peaks(waveform, height=0.3, distance=500)
    if len(peaks) < 2:
        return 0, None
    b1 = peaks[1]  # 第二個峰值位置
    b10 = peak_heights['peak_heights'][1]  # 第二個峰值高度
    b2 = (waveform[feature_points[5]] + waveform[feature_points[11]]) / 2  # 平均高度
    valleys, _ = find_peaks(-waveform, height=-0.3, distance=500)
    if len(valleys) < 2:
        return 0, None
    b3 = valleys[1] - valleys[0]  # 谷間距離
    b5 = np.trapz(np.abs(waveform[valleys[0]:valleys[1]+1]), dx=1)  # 積分面積
    b6 = peaks[1] - peaks[0]  # 峰間距離
    b7 = valleys[-1] - peaks[-1]  # 最後谷與峰的距離
    delta_t = b2 - b1  # 高度差
    features = np.array([b1, b2, b3, b5, b6, b7, b10, delta_t])
    plot_ppg_signal(waveform, feature_points, name, cycle_idx, hand)
    return 1 if len(features) == 8 else 0, features

In [78]:
# 繪製D1圖
def plot_d1_signal(waveform, feature_points, name, cycle_idx, hand):
    """繪製一階導數（D1）訊號及其峰值"""
    if PLOT_TYPES[1] == 'break':
        return
    x = np.linspace(0, len(waveform), len(waveform))
    plt.figure()
    plt.plot(waveform, label='D1')
    plt.plot(x[feature_points], waveform[feature_points], '*', label='峰值')
    plt.title(f'{name}, {cycle_idx + 1}th {hand}', fontproperties=font_prop)
    plt.legend()
    plt.grid()
    if PLOT_TYPES[1] == 'save':
        plt.savefig(f'{PLOT_DIR}\\{name}, {cycle_idx + 1}th d1.jpg')
    elif PLOT_TYPES[1] == 'show':
        plt.show()
    plt.close()

In [79]:
# 計算D1特徵
def compute_d1_features(waveform, name, cycle_idx, hand):
    """計算一階導數（D1）的特徵值"""
    d1_peaks, peak_heights = find_peaks(waveform, height=0, distance=800)
    if len(d1_peaks) < 2:
        return 0, None
    b4 = d1_peaks[1] - d1_peaks[0]  # 峰間距離
    b8 = peak_heights['peak_heights'][1]  # 第二峰高度
    features = np.array([b4, b8])
    plot_d1_signal(waveform, d1_peaks, name, cycle_idx, hand)
    return 1 if len(features) == 2 else 0, features

In [80]:
# 繪製D2圖
def plot_d2_signal(original, derivative, name, cycle_idx, feature_points, hand, save_dir):
    """繪製D2訊號及其特徵點"""
    if PLOT_TYPES[2] == 'break':
        return
    x = np.linspace(0, len(derivative), len(derivative))
    plt.figure()
    plt.plot(original, label='原始訊號')
    plt.plot(derivative, label='二階導數')
    plt.plot(x[feature_points], derivative[feature_points], '*', label='特徵點')
    plt.title(f'{name}, {cycle_idx + 1}th {hand}', fontproperties=font_prop)
    plt.legend()
    plt.grid()
    if PLOT_TYPES[2] == 'show':
        plt.show()
    else:
        plt.savefig(f'{save_dir}\\{name}, {cycle_idx + 1}th {hand} d2.jpg')
    plt.close()

In [81]:
# 計算D2特徵
def compute_d2_features(waveform, derivative, name, cycle_idx, hand):
    """計算二階導數（D2）的特徵值"""
    zero_crossings = np.where(np.diff(np.sign(derivative[2])))[0]
    min_distance = 30
    filtered_crossings = []
    last_idx = -min_distance
    for idx in zero_crossings:
        if idx - last_idx >= min_distance:
            filtered_crossings.append(idx)
            last_idx = idx
    zero_crossings = np.array(filtered_crossings)

    a_points = find_peaks(derivative[1], height=0.6, distance=900)[0]
    closest_indices = [np.argmin(np.abs(zero_crossings - a)) for a in a_points]
    
    if len(closest_indices) < 2:
        plot_d2_signal(waveform, derivative[1], name, cycle_idx, zero_crossings, hand, BAD_PLOT_DIR)
        return 0, None, None
    
    feature_points = np.concatenate([
        zero_crossings[closest_indices[0]:closest_indices[0]+6],
        zero_crossings[closest_indices[1]:closest_indices[1]+6]
    ])
    
    if len(feature_points) != 12 or feature_points[6] - feature_points[5] < feature_points[6] * 0.3:
        plot_d2_signal(waveform, derivative[1], name, cycle_idx, feature_points, hand, BAD_PLOT_DIR)
        return 0, None, None

    if PLOT_TYPES[2] == 'show':
        plot_d2_signal(waveform, derivative[1], name, cycle_idx, feature_points, hand, PLOT_DIR)
        return 0, feature_points, None

    points_y = derivative[1][feature_points[6:12]]
    ratios = [
        points_y[1] / points_y[0],  # Ratio_BA
        points_y[2] / points_y[0],  # Ratio_CA
        points_y[3] / points_y[0],  # Ratio_DA
        (points_y[1] - points_y[3] - points_y[2] - points_y[4]) / points_y[0],  # Ratio_BDCE_A
        (points_y[2] + points_y[3] - points_y[4]) / points_y[0]  # Ratio_CDB_A
    ]
    return 1, feature_points, ratios

In [82]:
# 將特徵寫入CSV
def write_to_csv(features, file_path, cycle_idx, hand, info):
    """將計算的特徵值寫入CSV檔案"""
    data = pd.DataFrame([[
        info[0], cycle_idx + 1, info[1], hand, info[2]
    ] + features.tolist()])
    with open(file_path, 'a', newline='', encoding='utf-8-sig') as f:
        data.to_csv(f, index=False, header=False)

In [83]:
# 處理並儲存訊號
def process_waveform(waveform, cycle_idx, hand, info, target_length=2000):
    """處理單一訊號週期，計算特徵並儲存"""
    derivatives = compute_derivatives(waveform)
    resized_wave = signal.resample(waveform, target_length)
    resized_derivatives = [signal.resample(d, target_length) for d in derivatives[1:]]
    derivatives = [resized_wave] + resized_derivatives

    d2_quality, d2_points, d2_features = compute_d2_features(resized_wave, derivatives, info[0], cycle_idx, hand)
    if d2_quality != 1:
        if SAVE_NPY:
            for suffix, data in zip(['', ' d1', ' d2'], [resized_wave, *resized_derivatives[:2]]):
                np.save(f'{DATA_DIR}\\bad signal\\{info[0]}, {cycle_idx + 1}th {hand}{suffix}.npy', data)
        return

    d1_quality, d1_features = compute_d1_features(derivatives[1], info[0], cycle_idx, hand)
    ppg_quality, ppg_features = compute_ppg_features(resized_wave, d2_points, info[0], cycle_idx, hand)
    
    if d1_quality == 1 and ppg_quality == 1:
        all_features = np.hstack((ppg_features, d1_features, d2_features))
        write_to_csv(d2_points, D2_CSV, cycle_idx, hand, info)
        write_to_csv(all_features, PPG_CSV, cycle_idx, hand, info)
        
        if SAVE_NPY:
            for suffix, data in zip(['', ' d1', ' d2'], [resized_wave, *resized_derivatives[:2]]):
                np.save(f'{DATA_DIR}\\{info[0]}, {cycle_idx + 1}th {hand}{suffix}.npy', data)

In [84]:
# 主程式
def main():
    """主程式：處理PPG資料並提取特徵"""
    channel1_id = 2  # 右手通道
    channel2_id = 4  # 左手通道
    record_id = 1    # 記錄ID

    # 根據資料類型選擇資料夾
    data_path = 'F:\\病患資料' if DATA_TYPE == 'Patient' else 'F:\\正常人Data'
    file_paths = find_files(data_path)
    print(f'找到資料筆數: {len(file_paths)}')

    for path in tqdm(file_paths, desc='處理檔案'):
        data = adi.read_file(path)
        right_data = data.channels[channel1_id - 1].get_data(record_id)
        left_data = data.channels[channel2_id - 1].get_data(record_id)

        # 應用濾波
        filtered_left, filtered_right = apply_butter_filter(left_data, right_data)
        left_wave = filtered_left[20000:300000] * 10
        right_wave = filtered_right[20000:300000] * 10

        # 尋找谷點
        left_valleys, _ = find_peaks(-left_wave, height=0, distance=150)
        right_valleys, _ = find_peaks(-right_wave, height=0, distance=150)
        min_cycles = min(len(left_valleys), len(right_valleys)) - 2

        info = parse_file_info(path)
        for i in range(0, min_cycles, 2):
            left_cycle = left_wave[left_valleys[i]:left_valleys[i + 2]]
            right_cycle = right_wave[right_valleys[i]:right_valleys[i + 2]]

            # 檢查週期品質
            left_peaks, left_heights = find_peaks(left_cycle, height=0, distance=500)
            right_peaks, right_heights = find_peaks(right_cycle, height=0, distance=500)
            if (len(left_cycle) < 1100 or len(left_peaks) != 2 or 
                len(right_cycle) < 1100 or len(right_peaks) != 2):
                continue

            # 標準化峰值高度
            left_heights = left_heights['peak_heights']
            right_heights = right_heights['peak_heights']
            if left_heights[0] < 0.5:
                left_cycle *= 0.5 / left_heights[0]
            if right_heights[0] < 0.5:
                right_cycle *= 0.5 / right_heights[0]

            # 處理左右手訊號
            process_waveform(left_cycle, i, 'Left', info)
            process_waveform(right_cycle, i, 'Right', info)

if __name__ == '__main__':
    main()

找到資料筆數: 64


處理檔案: 100%|██████████| 64/64 [00:12<00:00,  5.32it/s]
