In [None]:
from google.colab import drive
drive._mount('/content/drive')

Mounted at /content/drive


In [None]:
# !pip install tensorflow-gpu

# load the required library
from random import shuffle
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import os
from google.colab import drive
drive._mount('/content/drive')

# which deep learning model to use
model_choice = 'ANN' # RNN or BIRNN

# model parameters 
timesteps = 60
sampling_frequency = 40
hidden_layer_units = 10

# create checkpoint path for the model
if model_choice == 'ANN':
  FILEWRITER_PATH = '파일경로/ppg_dae_ANN' + str(timesteps) + 'timesteps_' + str(hidden_layer_units) + 'notes_tensorboard'
  if not os.path.isdir(FILEWRITER_PATH):
    os.makedirs(FILEWRITER_PATH)
  CHECKPOINT_PATH = '파일경로/ppg_dae_ANN' + str(timesteps) + 'timesteps_' + str(hidden_layer_units) + 'notes_tensorboard/checkpoints'
  if not os.path.isdir(CHECKPOINT_PATH):
    os.makedirs(CHECKPOINT_PATH)

elif model_choice == 'RNN':
  FILEWRITER_PATH = '파일경로/ppg_dae_RNN' + str(timesteps) + 'timesteps_' + str(hidden_layer_units) + 'notes_tensorboard'
  if not os.path.isdir(FILEWRITER_PATH):
    os.makedirs(FILEWRITER_PATH)
  CHECKPOINT_PATH = '파일경로/ppg_dae_RNN' + str(timesteps) + 'timesteps_' + str(hidden_layer_units) + 'notes_tensorboard/checkpoints'
  if not os.path.isdir(CHECKPOINT_PATH):
    os.makedirs(CHECKPOINT_PATH) 

elif model_choice == 'BIRNN':
  FILEWRITER_PATH = '파일경로/ppg_dae_BIRNN' + str(timesteps) + 'timesteps_' + str(hidden_layer_units) + 'notes_tensorboard'
  if not os.path.isdir(FILEWRITER_PATH):
    os.makedirs(FILEWRITER_PATH)
  CHECKPOINT_PATH = '파일경로/ppg_dae_BIRNN' + str(timesteps) + 'timesteps_' + str(hidden_layer_units) + 'notes_tensorboard/checkpoints'
  if not os.path.isdir(CHECKPOINT_PATH):
    os.makedirs(CHECKPOINT_PATH) 


In [None]:
# -- deep learning definition --- #

def ANN():
  model_inputs = tf.keras.input(shape=(timesteps, ))
  encoding_layer = tf.keras.layers.Dense(utils=hidden_units, activation='sigmoid', input_shape=(timesteps, ))(model_inputs)
  out_layer = tf.keras.layers.Dense(units=timesteps, activation = 'sigmoid')(encoding_layer)

  model = tf.keras.Model(inputs=model_inputs, outputs=out_layer)

  return model

def RNN():
  model_inputs = tf.keras.input(shape=(timesteps, 1))
  # ppg 인풋 1초가 100샘플 : 0.01 초, 0.02초, .. 이런식으로 타임스탬프 순으로 입력되므로 차원 1개 더 추가된 것
  encoding_layer = tf.keras.layers.LSTM(units=hidden_units, activation='sigmoid',input_shape=(timesteps, 1))(model_inputs)
  # return_sequence : 각 타임스탬프에서 결과 나올거냐/마지막에 나올거냐에 따라 True/False
  # sigmoid : 0~1 결과값일 경우 시그모이드, 오토엔코더 쓸땐 대부분 시그모이드 씀 tanh는 0~-1값 없으므로 안쓴것
  out_layer = tf.keras.layers.Dense(units=timesteps, activation = 'sigmoid')(encoding_layer)

  model = tf.keras.Model(inputs=model_inputs, outputs=out_layer)
  
  return model

def BIRNN():
  model_inputs = tf.keras.input(shape=(timesteps, 1))
  encoding_layer = tf.keras.layers.Bidrectional(tf.keras.layers.LSTM(units=hidden_units, activation='sigmoid',input_shape=(timesteps, 1)))(model_inputs)
  out_layer = tf.keras.layers.Dense(units=timesteps, activation = 'sigmoid')(encoding_layer)

  model = tf.keras.Model(inputs=model_inputs, outputs=out_layer)

  return model

# many-to-many model
def BIRNN_MM():
  model_inputs = tf.keras.input(shape=(timesteps, 1))
  encoding_layer = tf.keras.layers.Bidrectional(tf.keras.layers.LSTM(units=hidden_units, activation='sigmoid',input_shape=(timesteps, 1)), return_sequence=True)(model_inputs)
  # True > 타임스탭마다 hidden layer activation unit 만큼 데이터 나오게 됨, hidden units * timestemps 이므로 두 차원, out layer도 두 디멘션으로 나오게 됨   
  out_layer = tf.keras.layers.Dense(units=1, activation = 'sigmoid')(encoding_layer)
  # hidden units 개수대로 나오고 그게 outlayer 1개로 이어지는 구조라 unit=1이 됨

  model = tf.keras.Model(inputs=model_inputs, outputs=out_layer)

  return model



In [None]:
# -- load data & pre=processing -- #

# set this the path data file
ppg_data_path = '/content/drive/MyDrive/filepath'

def signal_extraction(data_path):
  # [0] index > dataset에 파일 하나뿐이기 때문, 파일이 더 많을 경우 for문으로 각 파일 로드해야함
  ppg_data = np.loadtext(data_path)
  print('the shape of the original dataset is %i by %i'% np.shape(ppg_data)) # (25000,400) -> 10000개의, 40Hz * 10sec ppg data
  print('the shape of the %i timestep dataset is %i by %i'%(timesteps, int(np.cell((400-timesteps)/timesteps)*len(ppg_data)), timesteps))

  # shuffle data
  shuffle(ppg_data)

  # create emtpy lists to hold training and validation datasets
  train_input_data_list = []
  train_output_data_list =[]
  val_input_data_list =[]
  val_output_data_list = []

  # loop through the original dataset and perform data extraction/noise addition on each row
  for datanum in range(len(ppg_data)):
    # seperate the ppg_data segment in each row into timestep size
    for index in range(0,400-timesteps, timesteps):
      # range > 0~340, 60씩 

      # --- original data, normalized -- #
      a = ppg_data[datanum, index: index+timesteps].copy()
      a= a-np.min(a)
      a = a/np.max(a)
      plt.figure()
      # plt.plot(a)
      # plt.show()

      # -- gaussian noise data -- #
      noise1 = np.random.randn(len(a))*1/3
      b = a.copy() + noise1
      b = b - np.min(b)
      b = b / np.max(b)
      plt.figure()
      # plt.plot(b-1)
      # plt.show()

      # -- gaussian + low freq noise data -- #
      c = b.copy()
      slope = np.random.randn()
      for i in range(len(c)):
        c[i] = c[i] + slope/len(c)*i
      c = c - np.min(c)
      c = c/np.max(c)
      # plt.figure()
      # plt.plot(c)

      # -- g + l + saturation data -- #
      d = c.copy()
      location1 = np.random.randint(timesteps, size=1)
      location2 = location1 + np.random.randint(timesteps-location1)
      if np.random.randn() > 0:
        d[location1: location2] = np.zeros(location2-location1, float) # np.ones()도 가능,
      else:
        d[location1: location2] = np.ones(location2-location1, float)

      # put 80% of the data into the training dataset list, and 20% into the vla dataset list
      if datanum < 0.8 * len(ppg_data):
        train_input_data_list.append(a) # change this to the noise-augmented data
        train_output_data_list.appnd(a)
      else:
        val_input_data_list.append(a) # change this to the noise-augmentation data
        val_output_data_list.append(a)

  # plot last data
  plt.figure()
  plt.plot(ppg_data[datanum])
  plt.plot(ppg_data[datanum,index:index+timesteps])
  plt.show()
  plt.figure()
  plt.plot(a)
  plt.plot(b)
  plt.show()
  plt.figure()
  plt.plot(a)
  plt.plot(c)
  plt.show()
  plt.figure()
  plt.plot(a)
  plt.plot(d)
  plt.show()

  # convert the dataset lists into arrays
  train_input_data_list = np.asarray(train_input_data_list)
  train_output_data_list = np.asarray(train_output_data_list)
  val_input_data_list = np.asarray(val_input_data_list)
  val_output_data_list = np.asarray(val_output_data_list)

  return train_input_data_list, train_output_data_list, val_input_data_list, val_output_data_list

[train_input_data_list, train_answer_data_list, val_input_data_list, val_answer_data_list] = signal_extraction(ppg_data_path)

# if the deep learning model used here is recurrent. need to reshape the input data to match the keras lstm input shape
if model_choice == 'RNN' or model_choice == 'BIRNN':
  train_input_data_list = np.reshape(train_input_data_list, [len(train_input_data_list),timesteps,1])
  val_input_data_list = np.reshape(val_input_data_list, [len(val_input_data_list),timesteps, 1])

print(np.shape(train_input_data_list))
print(np.shape(train_answer_data_list))



In [None]:
# -- deep learning model training & validation -- #
if model_choice == 'ANN':
  DAE = ANN()
elif model_choice == 'RNN':
  DAE = RNN()
elif model_choice == 'BIRNN':
  DAE = BIRNN()

# compile the model with optimizer and loss function
# opt = tf.keras.optimizers.Adam(learning_rate =1)
DAE.compile(optimizer='adam', loss='mse')
cp_callback = tf.keras.callbacks.ModelCheckpoint(CHECKPOINT_PATH, verbose=1, save_best_only=True, save_waights_only=True, period=1)

# train the model 
DAE.fit(train_input_data_list, train_answer_data_list, epochs=10000, batch_size=40000, shuffle=True, validation_data=(val_input_data_list, val_answer_data_list),
        verbose=2, callbacks=[cp.callbacks])

# load the trained model
DAE.load_weights(CHECKPOINT_PATH)

# generate predictions inferences based on validation input data
prediction = DAE.predict(val_input_data_list)

# resize the validation input data, prediction, and the answer for saving purposes
prediction = np.reshape(prediction, [-1])
val_input_data_list = np.reshape(val_input_data_list, [-1])
val_answer_data_list = np.reshape(val_answer_data_list, [-1])

# save 10% of the validation results for further analyses
name = '/content/drive...' + model_choice + '_' + str(timesteps) + 'timesteps_' + str(hidden_layer_units) + 'notes_prediction_results.txt'
file = open(name, 'w')
for result_num in range(int(len(prediction)/10)):
  file.write("%f %f %f \n" % (val_input_data_list[result_num], predictions[result_num], val_answer_data_list[result_num]))
file.close()

plt.figure()
plt.plot(val_input_data_list[0: timesteps])
plt.plot(prediction[0:timesteps])
plt.plot(val_answer_data_list[0:timesteps])
plt.show()
