In [131]:
import redis
def get_raw_data():
    rds = redis.StrictRedis(host='docker.vm', port=6379, db=14)
    for key in rds.scan_iter("data_raw:*"):
        [Accelerations,timespan]=rds.lrange(key, 0, -1)
        
        Accelerations=[int(v) for v in Accelerations.decode('ascii').split(",")]
        timespan=[int(v) for v in timespan.decode('ascii').split(",")]
        print(len(Accelerations),len(timespan))
        
        assert len(Accelerations) == len(timespan)*3, f"data corrupt with key {key}"
        
        all_accelerometer,all_timespan=chop_data_to_slice(Accelerations,timespan)
        for a_chunk,ts_chunk in zip(all_accelerometer,all_timespan):
            yield a_chunk,ts_chunk
    return (None,None)

In [74]:
def span_2_time(ts):
    if len(ts)<2:
        return None    
    t=[0]*len(ts)
    for i in range(1,len(t)):
        t[i]=t[i-1] + ts[i]
    return t
def time_2_span(t):
    if len(t)==0:
        return t
    span=[0]*len(t)
    for i in range(0,len(t)-1):
        span[i+1]=t[i+1]-t[i]
    return span

In [106]:
#1. 把加速度数据切割成不同的段
#2. 时间重整化，t0时刻设置为0
#chop into data slice
def chop_data_to_slice(a_,ts_):
    assert len(a_) == 3*len(ts_),f"corrupt data slice {i}"
    all_accelerometer,all_timespan=[],[]
    while len(ts_)>0:
        for i in range(1,len(ts_)):
            if ts_[i]>365*86400*1000:
                all_accelerometer.append(a_[0:i*3])
                all_timespan.append(ts_[:i]) #按原始采样定义，每个ts是和之前一次采样的时间差
                # to the rest
                a_,ts_=a_[i*3:],ts_[i:]
                break
            elif i==len(ts_)-1:
                all_accelerometer.append(a_)
                all_timespan.append(ts_)
                a_,ts_=[],[]
    lens=[len(v) for v in all_timespan]
    print(f"😊 data successfully chopped into {len(all_accelerometer)} pieces ,average length {sum(lens)/len(lens)}, min {min(lens)}, max {max(lens)}")
    return all_accelerometer,all_timespan
all_accelerometer,all_timespan=chop_data_to_slice(Accelerations.copy(),timespan.copy())


😊 data successfully chopped into 164 pieces ,average length 1781.8902439024391, min 1306, max 1802


In [4]:
import math
import matplotlib.pyplot as plt
import numpy as np


# 保留有效的心跳峰值，两个信号间距要求是大致等宽的
def reserve_valid_heart_beat_peak(data,time):
    valid_time=[time[i] for i,v in enumerate(data) if v!=0]
    nonzero_index=[i for i,v in enumerate(data) if v!=0]
    time_span=time_2_span(valid_time)
    data_reserved=[0]*len(data)
    for i in range(1,len(time_span)-1):
        assert time_span[i]!=0,"time span should not be zero"
        ratio=time_span[i+1]/time_span[i]
        if max(ratio,1.0/ratio) <1.15:
            ind=nonzero_index[i-1]
            data_reserved[ind]=data[ind]
            ind=nonzero_index[i]
            data_reserved[ind]=data[ind]
            ind=nonzero_index[i+1]
            data_reserved[ind]=data[ind]
    return data_reserved
            
    

In [5]:
def add_believable_data(bestPeaks,data,tm):
    # remove unbelievable
    peak_index=[i for i,v in enumerate(bestPeaks) if v!=0]
    peak_time=[tm[i] for i,v in enumerate(bestPeaks) if v!=0]
    peak_timespan=time_2_span(peak_time)
    average_timespan=sum(peak_timespan[1:])/len(peak_timespan[1:])
    modified=False
    maxValue=max(data)
    #如果中间插入一点合理，那就插入一点
    for i in range(1,len(peak_timespan)):
        ratio=peak_timespan[i]/average_timespan
        if ratio> 1.83 and ratio <2.25:
            ib,ie=peak_index[i-1],peak_index[i]
            inset_at=round((ib+ie)/2)
            maxNeibor=inset_at-1 if abs(bestPeaks[inset_at-1])>abs(bestPeaks[inset_at+1]) else inset_at+1
            inset_at=maxNeibor if abs(bestPeaks[maxNeibor])>abs(bestPeaks[inset_at]) else inset_at
            #print("add",tm[ib],tm[ie],tm[inset_at])
            bestPeaks[inset_at] =maxValue*2
            modified=True
    return modified

In [6]:
def only_believable_timespan(bestPeaks,data,tm):
    # remove unbelievable
    peak_index=[i for i,v in enumerate(bestPeaks) if v!=0]
    peak_time=[tm[i] for i,v in enumerate(bestPeaks) if v!=0]
    peak_timespan=time_2_span(peak_time)
    
    #分析评价心跳间隔。中间没有检测出心跳，大段跳过的，忽略作为时间段
    timespan_believed=[v for v in peak_timespan if v<1538 and v>0] 
    if len(timespan_believed)<4:
        return None
    average_timespan=sum(timespan_believed) / len(timespan_believed)
    
    #把可信的心跳时间的重新画出来
    saved_peaks=[0]*len(bestPeaks)
    
    #左右两段间距都不对的点被移除
    for i,ts in enumerate(peak_timespan):
        if ts>1538 or ts==0:
            continue
            
        ratio=ts/average_timespan
        if max(ratio,1.0/ratio) <1.6:
            ind1,ind2=peak_index[i-1],peak_index[i]
            saved_peaks[ind1]=bestPeaks[ind1]
            saved_peaks[ind2]=bestPeaks[ind2]
    #print(f"bestPeaks {[i for i,v in enumerate(bestPeaks) if v!=0]} len {len(peak_index)} average_timespan {average_timespan}")
    #print (f" saved_peaks: {[tm[i] for i,v in enumerate(saved_peaks) if v!=0]} left {len([i for i,v in enumerate(saved_peaks) if v!=0])}")
    return saved_peaks

In [90]:
for i in range(7,-1,-1):
    print(i)

7
6
5
4
3
2
1
0


In [91]:
class SearchPeriodState:
    
    def __init__(self,t0,periodic):        
        self.t0=t0
        self.periodic=periodic
        self.shot_span=periodic*0.19
        #当前搜索的位置的中心位置  
        self.shotted = [t0]
        self.set_tb_te(t0)
    def set_tb_te(self,time):
        self.tb=time-self.shot_span
        self.te=time+self.shot_span
    def tcur(self):
        return (self.tb+self.te)/2      
        
    def set_search_left(self,lastT=None):
        if lastT==None:
            #关于匹配点的位置，接收和现实有关的修正
            lastT= self.shotted[-1] if abs( self.shotted[-1]-self.tcur()) < self.shot_span else self.tcur() 
            
            #信念接受一半，事实证据接受一半
            #lastT= (self.shotted[-1]+self.tcur() )/2 if abs( self.shotted[-1]-self.tcur()) < self.shot_span else self.tcur() 
        self.set_tb_te(lastT-self.periodic)
        
    def set_search_right(self,lastT=None):
        if lastT==None:
            #关于匹配点的位置，接收和现实有关的修正
            lastT= self.shotted[-1] if abs( self.shotted[-1]-self.tcur()) < self.shot_span else self.tcur() 
            
            #信念接受一半，事实证据接受一半，因为作为现实的点可能存在偏差
            #lastT= (self.shotted[-1]+self.tcur() )/2 if abs( self.shotted[-1]-self.tcur()) < self.shot_span else self.tcur() 
            
        self.set_tb_te(lastT+self.periodic)
        
    def ShotHandle(self,time):
        if time in self.shotted: return
        #self.shot不会为0
        if not self.shot(self.shotted[-1]):
            self.shotted.append(time) 
        #替换为更准确的数据点
        elif abs(self.tcur()-time)<abs(self.tcur()-self.shotted[-1]):
            self.shotted[-1]=time
            
    def left_to_t0(self):
        return self.tcur()<self.t0
    def right_to_t0(self):
        return self.tcur()>self.t0  
    def Enter(self,time):
        #位于左侧，向左检测情形
        enter_left=self.left_to_t0() and time<=self.te
        #位于右侧，向右检测情形
        enter_right=self.right_to_t0() and time>=self.tb
        return enter_left or enter_right
    
    def Leave(self,time):
        leave_left=self.left_to_t0() and time<=self.tb
        leave_right=self.right_to_t0() and time>=self.te
        return leave_left or leave_right
    def LeaveHandle(self):
        if self.left_to_t0(): 
            self.set_search_left()
        else :
            self.set_search_right()
    
    def shot(self,time):
        return  time>=self.tb and time<=self.te
        
def get_duty_cycle(peak_time,index,periodic):  
    time0=peak_time[index]
    state=SearchPeriodState(peak_time[index],periodic)
    state.set_search_left(time0)
    #沿着时间减小方向搜索是否存在心跳信号
    for ind in range(index-1,-1,-1):        
        time=peak_time[ind]
        
        if not state.Enter(time):
            continue
        while state.Leave(time):
            state.LeaveHandle()
        if state.shot(time):
            state.ShotHandle(time)
            
    state.set_search_right(time0)
    
    for ind in range(index+1,len(peak_time)):        
        time=peak_time[ind]
        if not state.Enter(time):
            continue
        while state.Leave(time):
            state.LeaveHandle()
        if state.shot(time):
            state.ShotHandle(time)
    return state 
            
#从原始信号中选出最能符合最多平滑周期约束的信号
def best_periodical(bestPeaks,tm):
    peak_index=[i for i,v in enumerate(bestPeaks) if v!=0]
    peak_time=[tm[i] for i,v in enumerate(bestPeaks) if v!=0]
    t2i=dict(zip(peak_time,peak_index))
    time_span=time_2_span(peak_time)
    
    #假设任意两个峰值信号，作为心跳信号。分析这个假设下出现的心跳周期数
    duty_cycles_states=[get_duty_cycle(peak_time,i-1,ts) for i,ts in enumerate(time_span)]
    duty_cycles=[ len(s.shotted)/len(peak_time) if len(peak_time) >0 else 0 for s in duty_cycles_states]
    
    #找出最大的心跳周期
    maxind=np.argmax(duty_cycles)
    #print(f"duty_cycles len{len(duty_cycles)} maxind {maxind} max timespan {time_span[maxind]} duty_cycle {duty_cycles[maxind]} t0 {duty_cycles_states[maxind].t0}")
    
    #把数据写回到时间序列
    result=[0]*len(bestPeaks)
    result[t2i[duty_cycles_states[maxind].t0]]=100
    for t in duty_cycles_states[maxind].shotted:
        result[t2i[t]]=bestPeaks[t2i[t]]
    return result
    

In [92]:
def pickout_peak(data,time,peak_time_span):
    value=[0]*len(data)
    for i,v in enumerate(data):
        t0 = time[i]
        for j in range(i-1,0,-1):
            if t0-time[j] > peak_time_span:
                break
            if data[j]>=v:
                v=0
                break
        if v==0:continue
        for j in range(i+1,len(data)):
            if time[j]-t0 > peak_time_span:
                break
            if data[j]>v: #这里不取=号，等大点保留前一个
                v=0
                break        
        value[i]=v
    return value

In [135]:
import json
# 60*1000/220=272,272,60*1000/39=1538,1538  
scan_iteration_foreach_window,time_window_min,time_window_max = 5,(60*1000/220)*0.9,(60*1000/39)*0.9
# x^scan_iteration_foreach_window=time_window_max/time_window_min,ig. 1538/272
factor=math.exp(math.log(time_window_max/time_window_min)/(scan_iteration_foreach_window-1))
time_windows=[round(time_window_min*math.pow(factor,i)) for i in range(scan_iteration_foreach_window)]
def labeling_heart_beat(accelerations,time_span):  
    startTime=time_span[0]
    if rds_to.exists(str(startTime)): return True
    
    time=span_2_time(time_span)
    
    x=[ v for i,v in enumerate(accelerations) if i%3==0 ] #axis is x or y or z
    y=[ v for i,v in enumerate(accelerations) if i%3==1 ] #axis is x or y or z
    z=[ v for i,v in enumerate(accelerations) if i%3==2 ] #axis is x or y or z 
    assert len(accelerations)%3==0 and len(x)== len(y) and len(y)==len(z),"data corrupt! check pls"
    assert time!=None and len(time)==len(x),f"time corrupt! check pls {time_span} accelerations {accelerations}"
    a=[math.sqrt(v**2+y[i]**2+z[i]**2) for i,v in enumerate(x)  ]
    xy=[math.sqrt(v**2+y[i]**2) for i,v in enumerate(x)  ]
    yz=[math.sqrt(v**2+z[i]**2) for i,v in enumerate(y)  ]
    zx=[math.sqrt(v**2+z[i]**2) for i,v in enumerate(x)  ]
    
    BestPeakNum=-1
    bestPeaks=[]
    Periodical=[]
    bestAxis=[]
    channelI=-1
    bestSpans=-1
    add=True
    
    for span in time_windows: 
        for channel,axis in enumerate([x,y,z,a]): #axis is x or y or z
            if channel!=2 :continue
            absaxis=[abs(v) for v in axis]
            peaks_candidate=pickout_peak(absaxis,time,span)
            peaks_periodical=best_periodical(peaks_candidate,time)
            
            peakNum=len([v for v in peaks_periodical if v!=0])
            if peakNum<10:                break
            
#             #插值补充点
            believable_added=add_believable_data(peaks_periodical,axis,time)
            
            #print(f"spanspan {span} channel {channel} peakNum4 {peakNum}")
            
            
            if peakNum>=BestPeakNum:
                #print(f"span {span} channel {channel} peakNum4 {peakNum}")
                add=believable_added
                bestPeaks=peaks_candidate.copy()
                BestPeakNum=peakNum
                Periodical=peaks_periodical
                channelI=channel
                bestAxis=axis
                bestSpans=span
    
    if BestPeakNum>0:
        label= [1 if v!=0 else 0 for v in Periodical]
        rds_to.set(str(startTime), json.dumps({"accelero":accelerations,"timespan":time,"label":label}))
        # print(f"saved to redis ,key:{startTime}",{BestPeakNum})
        # plt.figure(partI,figsize=(30, 6))
        # plt.xlabel(f"data {startTime} ,channel ${channelI}"+f",bestSpans ${bestSpans}"+f",BestPeakNum{BestPeakNum} add {add} ")
        # plt.plot(time,bestAxis)
        # plt.plot(time,bestPeaks)
        # plt.plot(time,Periodical)
        return True
    return False
    
    
rds_to = redis.StrictRedis(host='docker.vm', port=6379, db=15)
data_source=get_raw_data() 
while True:
    accelerations,time_span= next(data_source)
    #print(len(accelerations))
    if accelerations!=None and time_span!=None:        
        success=labeling_heart_beat(accelerations,time_span)

    
   

978384 326128
😊 data successfully chopped into 183 pieces ,average length 1782.120218579235, min 1306, max 1802


StopIteration: (None, None)