In [1]:
import os
import time
import datetime  # <1>
import pickle

import numpy as np
import matplotlib.pyplot as plt
import random
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
from tqdm import tqdm_notebook
from sklearn.preprocessing import MinMaxScaler

class Person:
    def __init__(self,Time, RMR,Age,Height,Weight,HRmax,HRrest,Gender,BMI,Temp,Humidity,Rf,HR,v,EPOC,pre,k,b):
        self.Time=Time
        self.RMR=RMR
        self.Age=Age
        self.Height=Height
        self.Weight=Weight
        self.HRmax=HRmax
        self.HRrest=HRrest
        self.Gender=Gender
        self.BMI=BMI
        self.Temp=Temp
        self.Humidity=Humidity
        self.Rf=Rf
        self.HR=HR
        self.v=v
        self.EPOC=EPOC
        self.pre=pre
        self.k=k
        self.b=b

import pywt

# 小波滤噪
def wavelet_denoising(data):
    # 小波函数取db4
    db4 = pywt.Wavelet('db4')
    # 分解
    coeffs = pywt.wavedec(data, db4)
    # 高频系数置零
    coeffs[len(coeffs)-1] *= 0
    coeffs[len(coeffs)-2] *= 0
    # 重构
    meta = pywt.waverec(coeffs, db4)
    if len(data)>=15 and int(len(data)/2)*2!=len(data):
        meta=np.delete(meta,-1)
    return meta

def smoothing(x,seq):
    y=np.zeros(len(x))
    for i in range(len(x)):
        if i<seq-1:
            y[i]=x[i]
        else:
            y[i]=sum(x[i-seq+1:i+1])/seq
    return y

def pre_deno_smoth(self,seq_ave):
    break_point1=0
    break_point2=0
    for i in range(1,len(self.v)):
        if self.v[i]>0 and self.v[i-1]==0:
            break_point1=i
        if self.v[i]==0 and self.v[i-1]>0:
            break_point2=i 
    x=self.HR
    x0=wavelet_denoising(x[0:break_point1])
    x1=wavelet_denoising(x[break_point1:break_point2])
    x2=wavelet_denoising(x[break_point2:])

    x0=smoothing(x0,seq_ave)
    x1=smoothing(x1,seq_ave)
    x2=smoothing(x2,seq_ave)
    self.HR=np.append(np.append(x0,x1),x2)
    
    x=self.Rf
    x0=wavelet_denoising(x[0:break_point1])
    x1=wavelet_denoising(x[break_point1:break_point2])
    x2=wavelet_denoising(x[break_point2:])
    x0=smoothing(x0,seq_ave)
    x1=smoothing(x1,seq_ave)
    x2=smoothing(x2,seq_ave)
    self.Rf=np.append(np.append(x0,x1),x2)
    
    x=self.EPOC
    x0=wavelet_denoising(x[0:break_point1])
    x1=wavelet_denoising(x[break_point1:break_point2])
    x2=wavelet_denoising(x[break_point2:])
    x0=smoothing(x0,seq_ave)
    x1=smoothing(x1,seq_ave)
    x2=smoothing(x2,seq_ave)
    self.EPOC=np.append(np.append(x0,x1),x2)
    
    
    

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.act1 = nn.Tanh()
        self.pool1 = nn.MaxPool2d((3,3))
        self.conv2 = nn.Conv2d(16, 8, kernel_size=3, padding=1)
        self.act2 = nn.Tanh()
        self.pool2 = nn.MaxPool2d((1,3))
        self.fc1 = nn.Linear(8 * 1 * 24, 32)
        self.act3 = nn.Tanh()
        self.fc2 = nn.Linear(32, 1)

    def forward(self, x):
        out = self.pool1(self.act1(self.conv1(x)))
        out = self.pool2(self.act2(self.conv2(out)))
        out = out.view(-1, 8 * 1 * 24) # <1>
        out = self.act3(self.fc1(out))
        out = self.fc2(out)
        return out

class CNN_Solve:
    def __init__(self,Data_path,n_epochs,batch_size):
        self.n_epochs=n_epochs
        self.batch_size=batch_size
        self.Data_path=Data_path
        self.save_path='CNN2d.pt'
    def form_data(self):
        tx = torch.load(self.Data_path)
        self.seq=int((len(tx[0])-11)/3)
        tx=tx.double()
        data,label=tx.split([tx.shape[1]-1,1],dim=1)
        self.mm_data=MinMaxScaler()
        self.mm_label=MinMaxScaler()
        data=self.mm_data.fit_transform(data)
        label=self.mm_label.fit_transform(label)
        data=torch.tensor(data)
        label=torch.tensor(label)
        data_train,data_test=data.split([int(tx.shape[0]*0.8),tx.shape[0]-int(tx.shape[0]*0.8)],dim=0)
        self.label_train,self.label_test=label.split([int(tx.shape[0]*0.8),tx.shape[0]-int(tx.shape[0]*0.8)],dim=0)
        
        self.data_train=data_train
        self.data_test=data_test
        
        self.data_train_dynamic=torch.zeros(len(data_train),1,3,self.seq)
        for i in range(0,len(data_train)):
            self.data_train_dynamic[i]=torch.reshape(data_train[i][10:10+self.seq*3],(3,-1))

        self.data_test_dynamic=torch.zeros(len(data_test),1,3,self.seq)
        for i in range(0,len(data_test)):
            self.data_test_dynamic[i]=torch.reshape(data_test[i][10:10+self.seq*3],(3,-1))
        self.Train_Data =[]
        for i in range(0,len(self.label_train)):
            self.Train_Data.append((self.data_train_dynamic[i],self.label_train[i]))
            
    def training_loop(self, optimizer, model, loss_fn, train_loader):
        for epoch in range(1, self.n_epochs + 1):  # <2>
            loss_train = 0.0
            for data_train_dynamic, labels in train_loader:  # <3>
                data_train_dynamic = data_train_dynamic.to(torch.float32)
                labels = labels.to(torch.float32)

                outputs = model(data_train_dynamic)  # <4>

                loss = loss_fn(outputs, labels)  # <5>

                optimizer.zero_grad()  # <6>

                loss.backward()  # <7>

                optimizer.step()  # <8>

                loss_train += loss.item()  # <9>

            if epoch == 1 or epoch % 10 == 0:
                print('{} Epoch {}, Training loss {}'.format(
                    datetime.datetime.now(), epoch,
                    loss_train / len(train_loader)))  # <10>
    def run(self):
        train_loader = torch.utils.data.DataLoader(self.Train_Data, batch_size=self.batch_size, #64
                                           shuffle=True)  # <1>

        model = Net()  #  <2>
        optimizer = optim.SGD(model.parameters(), lr=1e-2)  #  <3>
        #loss_fn = nn.CrossEntropyLoss()  #  <4>
        loss_fn = nn.MSELoss()
        self.training_loop(   #200
            optimizer = optimizer,
            model = model,
            loss_fn = loss_fn,
            train_loader = train_loader,
        )
        torch.save(model.state_dict(),self.save_path)
    def plot(self,length=3000):
        model = Net()
        model.load_state_dict(torch.load('CNN2d.pt'))
        model = model.type(torch.FloatTensor)
#         model = model.cuda()
        out=model(self.data_test_dynamic)
        out=self.mm_label.inverse_transform(out.detach().numpy())
        label_plot=self.mm_label.inverse_transform(self.label_test)
        plt.figure(figsize=(50,20))
        length=3000
        plt.plot(out[0:length])
        plt.plot(label_plot[0:length])
        plt.tick_params(axis='both',which='major',labelsize=30)
    def form_data_for_DNN(self):
        model = Net()
        model.load_state_dict(torch.load('CNN2d.pt'))
        output_train=model(self.data_train_dynamic)
        output_test=model(self.data_test_dynamic)
        cv_train=torch.zeros(len(self.data_train))
        for i in range(0,len(self.data_train)):
            if self.data_train[i][-1]>0:
                cv_train[i]=1
        cv_test=torch.zeros(len(self.data_test))
        for i in range(0,len(self.data_test)):
            if self.data_test[i][-1]>0:
                cv_test[i]=1
        data_train_const=torch.zeros(len(self.data_train),12)
        for i in range(0,len(self.data_train)):
            if cv_train[i]==1:
                data_train_const[i]=torch.cat((self.data_train[i][0:10],self.data_train[i][-1].reshape(1),output_train[i]))
        data_test_const=torch.zeros(len(self.data_test),12)
        for i in range(0,len(self.data_test)):
            if cv_test[i]==1:
                data_test_const[i]=torch.cat((self.data_test[i][0:10],self.data_test[i][-1].reshape(1),output_test[i]))
        data_train_const=data_train_const[cv_train==1]
        data_test_const=data_test_const[cv_test==1]
        label_train_const=torch.zeros(len(self.label_train),1)
        label_test_const=torch.zeros(len(self.label_test),1)
        for i in range(0,len(self.label_train)):
            label_train_const[i]=self.label_train[i]-output_train[i]
        for i in range(0,len(self.label_test)):
            label_test_const[i]=self.label_test[i]-output_test[i]
        label_train_const=label_train_const[cv_train==1]
        label_test_const=label_test_const[cv_test==1]
        
        torch.save(data_train_const,"data_train_const_"+str(self.seq)+".pt")
        torch.save(label_train_const,"label_train_const_"+str(self.seq)+".pt")
        torch.save(cv_train,"cv_train"+str(self.seq)+".pt")
        torch.save(cv_test,"cv_test"+str(self.seq)+".pt")

        del self.data_train
        del self.data_train_dynamic
        del self.Train_Data
        
        

class DNN_Solve:
    def __init__(self,seq,learning_rate,n_epochs,batch_size):
        self.seq=seq
        self.learning_rate=learning_rate
        self.n_epochs=n_epochs
        self.batch_size=batch_size
    def run(self):
        data_train_const = torch.load("data_train_const_"+str(self.seq)+".pt")
        label_train_const = torch.load("label_train_const_"+str(self.seq)+".pt")
        index=random.shuffle([i for i in range(len(data_train_const))])
        r_data_train_const=data_train_const[index]
        r_label_train_const=label_train_const[index]
        Train_Data_const =[]
        for i in range(0,len(label_train_const)):
            Train_Data_const.append((r_data_train_const[0][i],r_label_train_const[0][i]))
            

        train_loader = torch.utils.data.DataLoader(Train_Data_const, batch_size=self.batch_size,
                                                   shuffle=True)
        torch.set_default_tensor_type(torch.DoubleTensor)
        len_input=len(data_train_const[0])
        model_DNN = nn.Sequential(
                    nn.Linear(len_input, 6),
                    nn.Tanh(),
                    nn.Linear(6,1))



        optimizer = optim.SGD(model_DNN.parameters(), lr=self.learning_rate)

        loss_fn = nn.MSELoss()

        n_epochs = self.n_epochs

        for epoch in range(n_epochs):
            for data, label in train_loader:
                data=data.double()
                label=label.double()
                outputs = model_DNN(data)
                loss = loss_fn(outputs, label)
                optimizer.zero_grad()
                loss.backward(retain_graph=True)
                optimizer.step()

            print("Epoch: %d, Loss: %f" % (epoch, float(loss)))
        torch.save(model_DNN, 'DNNplus.pt')

class Problem_Class_CNN_DNN:
    def __init__(self,seq=216,seq_ave=10,CNN_n_epochs=200,DNN_n_epochs=100,DNN_learning_rate=1e-4,batch_size=64):
        self.seq=seq
        self.seq_ave=seq_ave
        self.CNN_n_epochs=CNN_n_epochs
        self.DNN_n_epochs=DNN_n_epochs
        self.DNN_learning_rate=DNN_learning_rate
        self.batch_size=batch_size
        self.path="Tensor_"+str(self.seq)+"_"+str(self.seq_ave)+".pt"
        self.class_list=[]
    def pre_processing(self):
        c=0
        data_dir = "./Class/"
        for file in tqdm_notebook(os.listdir(data_dir)): 
            try:
                with open(f'./Class/'+'p'+str(c)+'.pickle','rb') as f:
                    locals()['p'+str(c)] = pickle.load(f)
                    pre_deno_smoth(locals()['p'+str(c)],self.seq_ave)
                    self.class_list.append(locals()['p'+str(c)])
                c+=1
            except:
                pass
        self.number_of_person=c
    def form_data(self):
        x=np.zeros((1,10+self.seq*3+1))
        for c in  tqdm_notebook(self.class_list):
            length=len(c.Rf)
            for i in range(0,length-self.seq+1):
                xx=np.array([c.RMR,c.Age,c.Height,c.Weight,c.HRmax,c.HRrest,c.Gender,c.BMI,c.Temp,c.Humidity])
                xx=np.append(xx,c.Rf[i:i+self.seq].reshape(1,-1))
                xx=np.append(xx,c.HR[i:i+self.seq].reshape(1,-1))
                xx=np.append(xx,c.v[i:i+self.seq].reshape(1,-1))
                xx=np.append(xx,c.EPOC[i+self.seq-1:i+self.seq].reshape(1,-1))
                xx=xx.reshape(1,-1)
                x=np.append(x,xx,axis=0)
            #break   #  节省时间测试加
        x=np.delete(x,0,0)   
        x=x.astype(float) 
        tx=torch.tensor(x)
        torch.save(tx,self.path)
    def run_CNN(self):
        CNN=CNN_Solve(self.path,self.CNN_n_epochs,self.batch_size)
        CNN.form_data()
        CNN.run()
        CNN.form_data_for_DNN()
        self.CNN=CNN
    def run_DNN(self):
        DNN=DNN_Solve(self.seq,self.DNN_learning_rate,self.DNN_n_epochs,self.batch_size)
        DNN.run()
    def Conclusion(self):
        CNN = Net()
        CNN.load_state_dict(torch.load('CNN2d.pt'))
        DNN = torch.load('DNNplus.pt')
        tx = torch.load(self.path)
        data,label=tx.split([tx.shape[1]-1,1],dim=1)
        mm_data=MinMaxScaler()
        mm_label=MinMaxScaler()
        data=mm_data.fit_transform(data)
        label=mm_label.fit_transform(label)
        data=torch.tensor(data)
        label=torch.tensor(label)

        data_train,data_test=data.split([int(tx.shape[0]*0.8),tx.shape[0]-int(tx.shape[0]*0.8)],dim=0)
        label_train,label_test=label.split([int(tx.shape[0]*0.8),tx.shape[0]-int(tx.shape[0]*0.8)],dim=0)

        data_test_dynamic=torch.zeros(len(data_test),1,3,self.seq)
        for i in range(0,len(data_test)):
            data_test_dynamic[i]=torch.reshape(data_test[i][10:10+self.seq*3],(3,-1))

        output_test=CNN(data_test_dynamic)

        data_test_const=torch.zeros(len(data_test),12)
        for i in range(0,len(data_test)):
            data_test_const[i]=torch.cat((data_test[i][0:10],data_train[i][-1].reshape(1),output_test[i]))

        out_e=DNN(data_test_const.double())

        for i in range(0,len(data_test_const)):
            if data_test[i][-1]>0:
                out_e[i]=DNN(data_test_const[i].double())
            else:
                out_e[i]=0

        self.out_final=out_e+output_test

        self.out_final = mm_label.inverse_transform(self.out_final.detach().numpy())
        self.label_final=mm_label.inverse_transform(label_test)
        
        self.out_final=torch.tensor(self.out_final)
        self.label_final=torch.tensor(self.label_final)
        
        for i in range(0,len(self.out_final)):
            self.out_final[i]+=0.2*data_test[i][-1]*1000/60*data_test[i][3]+3.5*data_test[i][3]
            self.label_final[i]+=0.2*data_test[i][-1]*1000/60*data_test[i][3]+3.5*data_test[i][3]
    def plot(self,length=9000):
        plt.figure(figsize=(150,20))
        plt.plot(self.out_final[0:length])
        plt.plot(self.label_final[0:length])
        plt.tick_params(axis='both',which='major',labelsize=30)
    def count(self,alpha=0.1):
        c=0
        for i in range(0,len(self.out_final)):
            if self.out_final[i]>=self.label_final[i]*(1-alpha) and self.out_final[i]<=self.label_final[i]*(1+alpha):
                c=c+1
        print(c/len(self.out_final))
