-
Notifications
You must be signed in to change notification settings - Fork 0
/
heart_beat.py
118 lines (104 loc) · 3.72 KB
/
heart_beat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#######################################################################
#
# heart_beat.py
#
# Based upon code by Paul van Gent:
#
# https://github.com/paulvangentcom/heartrate_analysis_python
#
# Adapted for Bobbi by: Robin van Emden - robin@pavlov.tech - 2017
#
# Totem Bobbi: http://www.totemopenhealth.com/
#
#######################################################################
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import math
measures = {}
def get_data(filename):
dataset = pd.read_pickle(filename)
return dataset
def rolmean(dataset, hrw, fs):
mov_avg = dataset.hart.rolling(center=False, window=int(hrw*fs)).mean() #Calculate moving average
avg_hr = (np.mean(dataset.hart))
mov_avg = [avg_hr if math.isnan(x) else x for x in mov_avg]
mov_avg = [x*1.1 for x in mov_avg]
dataset['hart_rollingmean'] = mov_avg
def detect_peaks(dataset):
window = []
peaklist = []
listpos = 0
for datapoint in dataset.hart:
rollingmean = dataset.hart_rollingmean[listpos]
if (datapoint < rollingmean) and (len(window) < 1):
listpos += 1
elif (datapoint > rollingmean):
window.append(datapoint)
listpos += 1
else:
if(window):
maximum = max(window)
beatposition = listpos - len(window) + (window.index(maximum))
peaklist.append(beatposition)
window = []
listpos += 1
measures['peaklist'] = peaklist
measures['ybeat'] = [dataset.hart[x] for x in peaklist]
def calc_bpm():
RR_list = measures['RR_list']
measures['bpm'] = 60000 /( np.mean(RR_list))
def plotter(dataset, title):
peaklist = measures['peaklist']
ybeat = measures['ybeat']
plt.title(title)
plt.plot(dataset.hart, alpha=0.5, color='blue', label="raw signal")
plt.plot(dataset.hart_rollingmean, color ='green', label="moving average")
plt.scatter(peaklist, ybeat, color='red', label="average: %.1f BPM" %measures['bpm'])
plt.legend(loc=4, framealpha=0.6)
plt.show()
def calc_RR(dataset, fs):
peaklist = measures['peaklist']
RR_list = []
cnt = 0
while (cnt < (len(peaklist)-1)):
RR_interval = (peaklist[cnt+1] - peaklist[cnt])
ms_dist = ((RR_interval / fs) * 1000.0)
RR_list.append(ms_dist)
cnt += 1
RR_diff = []
RR_sqdiff = []
cnt = 0
while (cnt < (len(RR_list)-1)):
RR_diff.append(abs(RR_list[cnt] - RR_list[cnt+1]))
RR_sqdiff.append(math.pow(RR_list[cnt] - RR_list[cnt+1], 2))
cnt += 1
measures['RR_list'] = RR_list
measures['RR_diff'] = RR_diff
measures['RR_sqdiff'] = RR_sqdiff
def calc_ts_measures():
RR_list = measures['RR_list']
RR_diff = measures['RR_diff']
RR_sqdiff = measures['RR_sqdiff']
measures['bpm'] = 60000 / np.mean(RR_list)
measures['ibi'] = np.mean(RR_list)
measures['sdnn'] = np.std(RR_list)
measures['sdsd'] = np.std(RR_diff)
measures['rmssd'] = np.sqrt(np.mean(RR_sqdiff))
NN20 = [x for x in RR_diff if (x>20)]
NN50 = [x for x in RR_diff if (x>50)]
measures['nn20'] = NN20
measures['nn50'] = NN50
measures['pnn20'] = float(len(NN20)) / float(len(RR_diff))
measures['pnn50'] = float(len(NN50)) / float(len(RR_diff))
def process_advanced(dataset, hrw, fs):
rolmean(dataset, hrw, fs)
detect_peaks(dataset)
calc_RR(dataset, fs)
calc_ts_measures()
def process_basic_peak(dataset, hrw, fs): #Remember; hrw was the one-sided window size (we used 0.75) and fs was the sample rate (file is recorded at 500Hz)
rolmean(dataset, hrw, fs)
detect_peaks(dataset)
calc_RR(dataset, fs)
calc_bpm()
plotter(dataset, "My Heartbeat Plot")