# 3D convolutional neural networks for remote pulse rate measurement and mapping from facial video by Frédéric Bousefsaf 


### MIT License Copyright (c) 2019 Frédéric Bousefsaf

In [83]:
#importing packages
import os
import cv2
import scipy.io
import numpy as np
import pandas as pd
from PIL import Image
import tensorflow as tf
from numpy import asarray
import matplotlib.pyplot as plt
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.models import model_from_json
from tensorflow.python.keras.utils import np_utils
from tensorflow.python.keras.layers import Input, Dense
from tensorflow.python.keras.callbacks import ModelCheckpoint
from tensorflow.python.keras.layers import ZeroPadding3D, Dense, Activation,Conv3D,MaxPooling3D,AveragePooling3D,Flatten,Dropout

In [84]:
#folder where all the images within their specific heart rate label are stored
Training_Dataset_Folder = "/content/drive/MyDrive/Method1-3DCNN/Training_data/"
Testing_Dataset_Folder = "/content/drive/MyDrive/Method1-3DCNN/Training_data/"
BATCH_SIZE = 60
IMAGE_WIDTH = 25
IMAGE_HEIGHT = 25
IMAGE_CHANNELS = 1

CONTINUE_TRAINING = True
SAVE_ALL_MODELS = False

training_loss = []
training_accuracy = []

In [None]:
#Load the list of used heart rate labels for Training and Testing
#total of 76 heart rate labels used for training and testing this model
#heart rates for training and testing must be same

hr_labels = pd.read_csv('/content/drive/MyDrive/Method1-3DCNN/hr_labels.csv', sep=',',header=None) #contains the list of heart rate labels
hr_labels = pd.DataFrame(label).to_numpy()
labels = np.zeros(len(labels))
print(labels)
#create list of heart rate labels
for i in range(len(hr_labels)):
    labels[i] = i

#assigning category tag to heart rate labels
labels_cat = np_utils.to_categorical(labels)

#converting heart rates into numpy array
hr_labels = np.reshape(hr_labels,[len(hr_labels)])
print(hr_labels)

NB_CLASSES = len(labels) - 1

In [87]:
#count the total number of images in all training heart rate labels folders
training_images_counter=0
testing_images_counter=0
for images_labels in range(len(hr_labels)):
    dir = Training_Dataset_Folder + str(hr_labels[images_labels])
    list = os.listdir(dir)
    images_count = len(list) #images_count contains images in every heart rate folder
    for i in range(images_count):
        training_images_counter+=1
#count the total number of images in all testing heart rate labels folders
for images_labels in range(len(hr_labels)):
    dir = Testing_Dataset_Folder + str(hr_labels[images_labels])
    list = os.listdir(dir)
    images_count = len(list) #images_count contains images in every heart rate folder
    for i in range(images_count):
        testing_images_counter+=1
        


In [None]:
training_images_counter=0
testing_images_counter=0

x_train = np.zeros([training_images_counter,BATCH_SIZE, IMAGE_HEIGHT,IMAGE_WIDTH])
y_train =  np.zeros([training_images_counter,len(labels)])
x_test = np.zeros([testing_images_counter,BATCH_SIZE, IMAGE_HEIGHT,IMAGE_WIDTH])
y_test =  np.zeros([testing_images_counter,len(labels)])

#Grab all the images from training dataset directory
for images_labels in range(len(hr_labels)):
    dir = Training_Dataset_Folder  + str(hr_labels[images_labels])
    list=os.listdir(dir)
    images_count = len(list) #images_count contains images in every heart rate folder
    for i in range(images_count):
            image = Training_Dataset_Folder + str(hr_labels[images_labels]) +'/'+ str(i+1) +'.jpg'
            image = asarray(Image.open(image))
            print(image.shape)
            tiled_image = np.tile(image,[60,1,1])
            x_train[training_images_counter,:,:,:] = tiled_image[:,:,:]
            y_train[training_images_counter] = labels_cat[images_labels]
            training_images_counter+=1
    

In [None]:
#Grab all the images from testing dataset directory
for images_labels in range(len(hr_labels)):
    dir = Testing_Dataset_Folder  + str(hr_labels[images_labels])
    list=os.listdir(dir)
    images_count = len(list) #images_count contains images in every heart rate folder
    for i in range(images_count):
        image = Testing_Dataset_Folder + str(hr_labels[images_labels]) +'/'+ str(i+1) +'.jpg'
        image = asarray(Image.open(image))
        print(image.shape)
        tiled_image = np.tile(image,[60,1,1])
        x_test[testing_images_counter,:,:,:] = tiled_image[:,:,:]
        y_test[testing_images_counter] = labels_cat[images_labels]
        testing_images_counter+=1
            
x_train = np.reshape(x_train, [x_train.shape[0], BATCH_SIZE, IMAGE_HEIGHT,IMAGE_WIDTH,1])
x_test = np.reshape(x_test, [x_test.shape[0], BATCH_SIZE, IMAGE_HEIGHT,IMAGE_WIDTH,1])

In [75]:
#Defining  Model for training

if(CONTINUE_TRAINING==True):
        init_batch_nb = 0
        model = Sequential()
        model.add(Conv3D(filters=32,kernel_size=(58,20,20),input_shape=(BATCH_SIZE,IMAGE_HEIGHT,IMAGE_WIDTH,IMAGE_CHANNELS)))
        model.add(MaxPooling3D(pool_size=(2,2,2)))
        model.add(Activation('relu'))
        model.add(Dropout(0.2))

        model.add(Flatten())
        model.add(Dense(512,activation='relu'))
        model.add(Dropout(0.2))
        model.add(Dense(NB_CLASSES+1, activation='softmax'))

else:
        #load model
        model = model_from_json(open('/content/drive/My Drive/Model1-3dcnn/model/model_conv3D.json').read())
        model.load_weights('/content/drive/My Drive/Model1-3dcnn/model/weights_conv3D_%04d.h5')

        #load statistics
        dummy = np.loadtxt('loss.txt')
        init_batch_nb = dummy.shape[0]
        train_loss = dummy[:,0].tolist()
        train_acc = dummy[:,1].tolist()

#Show Model summary and compile the model
model.summary()
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])


Model: "sequential_6"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv3d_6 (Conv3D)            (None, 3, 6, 6, 32)       742432    
_________________________________________________________________
max_pooling3d_6 (MaxPooling3 (None, 1, 3, 3, 32)       0         
_________________________________________________________________
activation_6 (Activation)    (None, 1, 3, 3, 32)       0         
_________________________________________________________________
dropout_12 (Dropout)         (None, 1, 3, 3, 32)       0         
_________________________________________________________________
flatten_6 (Flatten)          (None, 288)               0         
_________________________________________________________________
dense_12 (Dense)             (None, 512)               147968    
_________________________________________________________________
dropout_13 (Dropout)         (None, 512)              

In [None]:
#Training the Model
EPOCHS = 100
print('Start Training ...................')
for i in range(EPOCHS):
  print("EPOCH:" + str(i+1)) 
  for batch_nb in range(total_batch_num):
      history = model.train_on_batch(x_train, y_train)
      train_loss.append(history[0])
      train_acc.append(history[1])  
# A. Save the model only if the accuracy is greater than before
      if (SAVE_ALL_MODELS==False):
          if (batch_nb > 0):
              f1 = open('/content/drive/My Drive/Model1-3dcnn/model/statistics_loss_acc.txt', 'a')
              model.save_weights('/content/drive/My Drive/Model1-3dcnn/model/weights_conv3D_%04d.h5', overwrite=True)   # save (trained) weights
              print('A new model has been saved!\n')
          else:
              if not os.path.exists('/content/drive/My Drive/Model1-3dcnn/model'):
                  os.makedirs('/content/drive/My Drive/Model1-3dcnn/model')
              f1 = open('/content/drive/My Drive/Model1-3dcnn/model/statistics_loss_acc.txt', 'w')
              model_json = model.to_json()
              open('/content/drive/My Drive/Model1-3dcnn/model/model_conv3D.json', 'w').write(model_json)        # save model architecture

      
      # B. Save the model every iteration
      else:
          if (batch_nb > 0):
              f1 = open('/content/drive/My Drive/Model1-3dcnn/model/statistics_loss_acc.txt', 'a')
        
          else:
            if not os.path.exists('/content/drive/My Drive/Model1-3dcnn/model'):
                os.makedirs('/content/drive/My Drive/Model1-3dcnn/model')

            f1 = open('/content/drive/My Drive/Model1-3dcnn/model/statistics_loss_acc.txt', 'w')
            model_json = model.to_json()
            open('/content/drive/My Drive/Model1-3dcnn/model/model_conv3D.json', 'w').write(model_json)                       # save model architecture

          model.save_weights('/content/drive/My Drive/Model1-3dcnn/model/weights_conv3D_%04d.h5' % batch_nb, overwrite=True)    # save (trained) weights

      
     
      print('training: ' + str(batch_nb + 1) + '/' + str(total_batch_num) + ' done')
      print('training: loss=' + str(train_loss[batch_nb]) + ' acc=' + str(train_acc[batch_nb]))
      print("EPOCH:" + str(i+1) + "/" + str(EPOCHS)) 
      

      # save learning state informations
      f1.write(str(train_loss[batch_nb]) + '\t' + str(train_acc[batch_nb]) + '\n')
      f1.close()
    

In [None]:
#Test the model
history = model.evaluate(x_test, y_test, verbose=2)