In [6]:
import keras.backend as K
import keras
import scipy.io as scio
import numpy as np
import os
import re
import _ctypes
import sys
import pywt
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, find_peaks, peak_prominences
from itertools import chain
import math
from scipy.signal import medfilt
import scipy.io as io
from scipy import signal
from scipy.interpolate import interp1d #从 SciPy 导入插值函数
from scipy.signal import butter, filtfilt, find_peaks, peak_prominences
from scipy import integrate
from sklearn.preprocessing import normalize, StandardScaler, MinMaxScaler
from scipy.signal import argrelmax, argrelmin, firwin, convolve

In [7]:
'''巴特沃斯滤波'''
def butter_bandpass_1(lowcut, highcut, fs, order=4):
    nyq = 0.5 * fs
    
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    
    return b, a
        
def apply_filtering_1(signal):
    # sampling freq
    #采样频率
    fs = 45
    b, a = butter_bandpass_1(0.5, 8, fs, order=4)
    
    return filtfilt(b, a, signal)

def systolic_peaks_2(signal):
    ''' Returns list of found systolic peaks in whole signal. Identified as maxima. Required distance between peaks set to 22'''
    '''返回在整个信号中找到的收缩峰的列表。确定为最大值。峰值之间的所需距离设置为22'''
    return find_peaks(signal, distance=20)[0]

#'''谷值--起点''''
def tfn_points_2(signal):
    ''' Returns list of tfn points (beats boundaries) as minimums of signal with distance min 25 between eachother'''
    '''返回tfn点（拍边界）列表，作为信号的最小值，彼此之间的距离最小为25'''
    #在这里，我使用还原信号，得到高于0的峰值
    # here I use reverted signal and get peaks above 0 
    return find_peaks(signal*(-1), height=0, distance=20)[0]

def beat_segmentation(signal):
    
    ''' Returns list of beats from signal and list of corresponding systolic peak index'''
    '''返回信号中的搏动列表和相应收缩峰指数列表'''
    
    systolics = systolic_peaks_2(signal)
    tfns = tfn_points_2(signal)
    
    #beats:节拍;systolic
    beats, systolic = [], []
    
    for i in range(len(tfns)-1):
        start = tfns[i]
        end = tfns[i+1]
        segment = np.arange(start, end)
        l = [f in systolics for f in segment]
        
        # if there is only one systolic peak between minima its a beat
        #如果最小值之间只有一个收缩峰，那就是一次跳动
        if list(map(bool, l)).count(True) == 1: 
            # apply normalization, reshaping is required
            #应用规范化，需要重塑
            bshape = signal[segment].shape
            normalized_beat = normalize(signal[segment].reshape(1, -1))
            beats.append(normalized_beat.reshape(bshape))
            systolic.append(np.where(l)[0][0])
    return beats, systolic
#第一个低谷
def dicrotic_notch(beat, systolic):
    '''Returns index of detected dicrotic notch in a beat. If not found returns 0'''
    '''返回在拍中检测到的第一个低谷的索引。如果未找到，则返回0'''

    derviative = np.diff(beat[systolic:])
    point = find_peaks(derviative)[0]
    corrected = 0
    
    if len(point) > 0:
        corrected =  systolic + point[-1]
        
    return corrected
#第二个峰值
def diastolic_peak(beat, systolic):
    '''Returns index of detected diastolic peak in a beat. If not found returns 0'''
    '''返回检测到的搏动中舒张峰值的指数。如果未找到，则返回0'''
   
    derviative = np.diff(np.diff(beat[systolic:]))
    point = find_peaks(derviative*(-1))[0]
    corrected = 0
    
    if len(point) > 0:
        corrected = systolic + point[0]+1
        if abs(beat[corrected]) >= abs(0.2*beat[corrected - 1]):
            return corrected
        else: return 0
        
    return corrected
#第一个低谷
def dicrotic_notch(beat, systolic):
    '''Returns index of detected dicrotic notch in a beat. If not found returns 0'''
    '''返回在拍中检测到的第一个低谷的索引。如果未找到，则返回0'''
    
    derviative = np.diff(beat[systolic:])
    point = find_peaks(derviative)[0]
    corrected = 0
    
    if len(point) > 0:
        corrected =  systolic + point[0]-1
        
    return corrected
def peaks_detection(beats, systolics):
    '''Returns created dataframe with beat values and critical points indices'''
    '''返回创建的带有拍值和临界点索引的数据帧'''
    
    dicrotics = []
    diastolics = []
    
    for b, s in zip(beats, systolics):
        tnn = dicrotic_notch(b,s)
        tdn = diastolic_peak(b,s)
        #第一个低谷
        dicrotics.append(tnn)
        #第二个峰值
        diastolics.append(tdn)
    
    result = np.array([beats, systolics, dicrotics, diastolics], dtype=object)
    #print(result)
    #print(result[2])
    #print(result[3])
    #去掉那些没有发现双循环和舒张药的
    # remove those where dicrotics and diastolics weren't found
    
    result = result[..., result[2] > 0]
    #print("低:",np.array(result).shape)
    result = result[..., result[3] > 0]
    #print("高:",np.array(result).shape)
    #输出形状为（4，nb），其中nb是拍数
    # output shape is (4, nb) where nb is number of beats
    return result.T


def extract_PPG(i):
    dataAnxiety = []
    # 可视化点检测
    fil = apply_filtering_1(i)
    systolics = systolic_peaks_2(fil)
    tfns = tfn_points_2(fil)
    beats, systolics= beat_segmentation(fil)
    #print("b:",np.array(beats).shape)
    #print("s:",np.array(systolics).shape)
    beats_features = peaks_detection(beats, systolics)
    new_list2 = []
    number = 0
    #print(len(beats_features))
    for beat, systolic, dicrotic, diastolic in beats_features:
        buhege = []
        if dicrotic < diastolic:
            new_list1 = []
            for wave in list(beat):
                new_wave = (wave - np.min(list(beat))) / (np.max(list(beat)) - np.min(list(beat)))
                new_list1.append(new_wave)
            x = np.linspace(0,len(new_list1),len(new_list1))
            xvals = np.linspace(0,len(new_list1),80)
            yinterp = np.interp(xvals,x,new_list1)
            new_list2.append(list(yinterp))
        else:
            buhege.append(beat)
    #print(np.array(new_list2).shape)
#     print(len(new_list2))
    if len(new_list2) > 40:
        dataAnxiety = dataAnxiety + new_list2[0:40]
    else:
        sum_=new_list2
        for i in range(int(40//len(new_list2)+1)):
            sum_ = sum_+new_list2
        dataAnxiety = dataAnxiety + sum_[0:40]
    return np.array(dataAnxiety).reshape(1,-1,40)

def sampling_freq(signal, time=90):
    # print(len(signal))
    return int(len(signal) / time)

def butter_bandpass(lowcut, highcut, fs, order=4):
    nyq = 0.5 * fs

    low = lowcut / nyq
    # high = highcut / nyq
    ##防止high取到1，因为范围时（0，1）0和1都不可取到
    high = min(highcut, 0.999) / nyq
    b, a = butter(order, [low, high], btype='band')

    return b, a

def apply_filtering(signal):
    # sampling freq
    # 采样频率
    fs = sampling_freq(signal)
    print(fs)
    b, a = butter_bandpass(0.5, 10, fs, order=4)

    return filtfilt(b, a, signal)


def systolic_peaks_1(signal):
    return find_peaks(signal, distance=18)[0]

def tfn_points_1(signal):
    # 在这里，我使用还原信号，并获得高于0的峰值
    return find_peaks(signal * (-1), height=0, distance=25)[0]

###提取脉率变异性
def PRV(PPG):
    PRV = []
    # print(data.shape)
    # print('PPGlen',len(PPG))
    # 可视化点检测
    new_list2 = []
    fil = apply_filtering(PPG)
    systolics = systolic_peaks_1(fil)
    tfns = tfn_points_1(fil)

    PP_list1 = []
    PP_list2 = []
    cnt = 0
    while (cnt < (len(systolics) - 1)):
        PP_interval = (systolics[cnt + 1] - systolics[cnt])  # 以样本数计算节拍之间的距离
        PP_list1.append(PP_interval)  # 附加到列表
        cnt += 1
    cnt = 1
    rate = len(PPG) / 90
    c1 = rate * 0.5
    while (cnt < len(PP_list1) - 1):
        if PP_list1[cnt] > rate or PP_list1[cnt] < c1:
            PP_list1[cnt] = (PP_list1[cnt - 1] + PP_list1[cnt + 1]) / 2
        cnt = cnt + 1
    num = len(PPG) / 40
    new_lst1 = np.divide(PP_list1, num)
    if len(new_lst1) < 80:
        # 如果长度不够80，用均值填充
        # 计算 new_lst1 的均值
        mean_value = np.mean(new_lst1)
        # 创建一个迭代器，它首先产生 new_lst1 的所有元素，然后是 mean_value 的重复
        padded_iter = chain(new_lst1, [mean_value] * (80 - len(new_lst1)))
        # 将迭代器转换为列表
        new_lst2 = list(padded_iter)
    else:
        new_lst2 = new_lst1[0:80]
    #     print(len(new_lst2))
    #     print(new_lst2)
    PRV.append(new_lst2)
    return np.array(PRV).reshape(1,-1,1)

def PupilWaveLoadAndProcess(txtpath):
    emo_index = ['PeaceHeartRate', 'HappyHeartRate', 'SadHeartRate', 'AnxietyHeartRate', 'StressHeartRate']
    data=[]
    txtPath = txtpath
    calm_list = []
    happy_list = []
    sad_list = []
    anxiety_list = []
    stress_list = []

    for emo in emo_index:
        #read the txt file to extract the pupil diameter
        path = os.path.join(txtPath,"{0}.txt".format(emo))
        with open(path, "r") as f:
            StrLines = f.readlines()
        PupilDiameter = []
        for line in StrLines:
            try:
                Diameter = re.findall("ppg:(.*?),", line)
                PupilDiameter.append(float(Diameter[0]))
            except Exception:
                Diameter = re.findall('D:(.*?);', line)
                if Diameter == []:
                    Diameter = re.findall('D=(.*?);', line)

                PupilDiameter.append(float(Diameter[0]))
        #Deal with the defalut data (fill with the former data of the series)
        if emo == 'HappyHeartRate':
            happy_list = PupilDiameter
        elif emo == 'PeaceHeartRate':
            calm_list = PupilDiameter
        elif emo == 'SadHeartRate':
            sad_list = PupilDiameter
        elif emo == 'AnxietyHeartRate':
            anxiety_list = PupilDiameter
        else:
            stress_list = PupilDiameter

#     peace_avg = np.average(calm_list[-1])
#     joyful_differiential = normalization(happy_list - peace_avg)
#     sad_differiential = normalization(sad_list - peace_avg)
#     anxiety_differiential = normalization(anxiety_list - peace_avg)
#     stress_differiential = normalization(stress_list - peace_avg)

#     data1 = np.array([joyful_differiential, sad_differiential],dtype=np.float)
#     data1 = data1.reshape(1,2048,2)

#     data2 = np.array([joyful_differiential, sad_differiential, anxiety_differiential, stress_differiential],dtype=np.float)
#     data2 = data2.reshape(1,2048,4)
    calm_list = np.array(calm_list,dtype=np.float)
    happy_list = np.array(happy_list,dtype=np.float)
    sad_list = np.array(sad_list,dtype=np.float)
    anxiety_list = np.array(anxiety_list,dtype=np.float)
    stress_list =np.array(stress_list,dtype=np.float)
    
    return calm_list,happy_list,sad_list,anxiety_list,stress_list

In [8]:
path_=r'E:\数据集\北工大第二批202109-202204年校内\原始数据'

In [9]:
folders=os.listdir(path_)[:93]
folders.remove('DL023')
folders.remove('DL039')
folders.remove('DL036')
folders.reverse()
len(folders)

In [10]:
heartPRV=[]
heartPPG=[]
heartLabel=[]
for folder in folders:
    path_f=os.path.join(path_,folder)
    path_p=os.path.join(path_f,"heartRateData")
    print(path_p)
    #ori_data=PupilWaveLoadAndProcess(path_p)
    for i,data in enumerate(PupilWaveLoadAndProcess(path_p)):
        data_PRV=PRV(data)
        data_PPG=extract_PPG(data)
        heartPRV.append(data_PRV)
        heartPPG.append(data_PPG)
        heartLabel.append(i)
    print(np.array(heartPPG).shape)

E:\数据集\北工大第二批202109-202204年校内\原始数据\S039\heartRateData


FileNotFoundError: [Errno 2] No such file or directory: 'E:\\数据集\\北工大第二批202109-202204年校内\\原始数据\\S039\\heartRateData\\PeaceHeartRate.txt'

In [None]:
heartPRV_=np.array(heartPRV).reshape(-1,80,1)

In [None]:
print(heartPRV_.shape)

In [None]:
heartPPG_=np.array(heartPPG).reshape(-1,80,40)
heartlabel_=np.array(heartLabel)
print(heartPRV_.shape)
print(heartPPG_.shape)
print(heartlabel_.shape)

In [None]:
np.save('heartPRV.npy',heartPRV_)
np.save('heartPPG.npy',heartPPG_)
np.save('heartLabel.npy',heartlabel_)