## Model Training

U-Net Model Training. Model takes MST Maps as input and produces predicted vPPG or PPG signals. Model adapted from Yu et al (see references).

In [None]:
import os
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.preprocessing.image import img_to_array, load_img
import numpy as np
import pandas as pd
from scipy.io import loadmat
import glob
import visualkeras
import matplotlib.pyplot as plt

In [None]:
#Subjects used for training were placed in a seperate folder; use base path to define location of training subject videos

base_path = os.getcwd() #change path based on location of training folder
videos = glob.glob(os.path.join(base_path,"subject*/vid.avi"))
subjects_train =  []

for i in videos:
    subject_number = i[-10:-8]
    try:
        subject_number = subject_number.replace('t','')
    except:
        pass
    subjects_train.append(f"subject{subject_number}")

print(subjects_train)

In [None]:
X_train = []
y_train = []

for subject in subjects_train:
    iterate = len(glob.glob(f"{base_path}/{subject}/Map*")) #change based on save location of MST Maps
    for i in range(1,iterate+1):
        img_path_rgb = os.path.join(base_path, subject, f'Map{i}', 'img_rgb.png')
        img_path_yuv = os.path.join(base_path, subject, f'Map{i}', 'img_yuv.png')
        
        img_rgb = load_img(img_path_rgb,color_mode = 'rgb')
        img_yuv = load_img(img_path_yuv, color_mode = 'rgb')
        combined_image = np.concatenate((img_rgb, img_yuv),axis = -1)
        
        X_train.append(combined_image)
        
        bvp_path = os.path.join(base_path,subject, f'Map{i}', 'bvp.mat')
        bvp_data = loadmat(bvp_path)
        bvp =  bvp_data['bvp'][0]
        
        y_train.append(bvp)
    
X_train = np.array(X_train)
y_train = np.array(y_train)
print(X_train.shape)
print(y_train.shape)


In [None]:
def double_conv_block(input, num_filters):
    kernel_size = (3,3)
    stride = 1
    x = layers.Conv2D(filters=num_filters, kernel_size=(3,3), strides=1, padding='same')(input)
    x = layers.BatchNormalization()(x)
    x = layers.PReLU()(x)

    x = layers.Conv2D(filters=num_filters, kernel_size=(3,3), strides=1, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.PReLU()(x)

    return x


In [None]:
input_tensor = layers.Input(shape=(63,256,6))

x0 = layers.ZeroPadding2D(padding=((0,1),(0,0)))(input_tensor)
x1 = double_conv_block(x0,32)

x2 = layers.AveragePooling2D((2,2))(x1)
x2 = double_conv_block(x2,64)

x3 = layers.AveragePooling2D((2,2))(x2)
x3 = double_conv_block(x3,128)

x4 = layers.AveragePooling2D((2,2))(x3)
x4 = double_conv_block(x4,256)

x5 = layers.AveragePooling2D((2,2))(x4)
x5 = double_conv_block(x5,512)

x6 = layers.Conv2DTranspose(filters = int(512/2),strides = (2,2),kernel_size = (1,1),padding = 'same')(x5)

x7 = layers.Concatenate(axis=-1)([x4,x6])   #Concatenate along the channel dimension
x7 = double_conv_block(x7,256)
x7 = layers.Conv2DTranspose(filters = int(256/2),strides = (2,2),kernel_size = (1,1),padding = 'same')(x7)

x8 = layers.Concatenate(axis=-1)([x3,x7])
x8 = double_conv_block(x8,128)
x8 = layers.Conv2DTranspose(filters = int(128/2),strides = (2,2),kernel_size = (1,1),padding = 'same')(x8)

x9 = layers.Concatenate(axis=-1)([x2,x8])
x9 = double_conv_block(x8,64)
x9 = layers.Conv2DTranspose(filters = int(64/2),strides = (2,2),kernel_size = (1,1),padding = 'same')(x9)

x10 = layers.Concatenate(axis=-1)([x1,x9])        
x10 = double_conv_block(x9,32)

x11 = layers.AveragePooling2D(pool_size = (64,1), strides=(64,1),padding = 'valid')(x10) #Global Average Pooling Layer
x11 = layers.Conv2D(filters=1, kernel_size=(1,1), strides=1, padding='same')(x11)

x11 = layers.Flatten()(x11)


In [None]:
model = models.Model(inputs=input_tensor, outputs=x11)

In [None]:
model.summary()

In [None]:
def plot_fig(i, history):
    fig = plt.figure()
    plt.plot(range(1,epochs+1),history.history['val_accuracy'],label='validation')
    plt.plot(range(1,epochs+1),history.history['accuracy'],label='training')
    plt.legend()
    plt.xlabel('epochs')
    plt.ylabel('accuracy')
    plt.xlim([1,epochs])
#   plt.ylim([0,1])
    plt.grid(True)
    plt.title("Model Accuracy")
    plt.show()
    plt.close(fig)

In [None]:
def negative_pearson_loss(y_true,y_pred): 
    y_true = tf.reshape(y_true,[-1])
    y_pred = tf.reshape(y_pred,[-1])

    mean_y_true = tf.reduce_mean(y_true)
    mean_y_pred = tf.reduce_mean(y_pred)

    y_true_cent = y_true - mean_y_true
    y_pred_cent = y_pred - mean_y_pred

    numerator = tf.reduce_sum(y_true_cent * y_pred_cent)

    denominator = tf.sqrt(tf.reduce_sum(y_true_cent ** 2) * tf.reduce_sum(y_pred_cent ** 2))

    pearson_corr = numerator/(denominator + 1e-8) #prevents division by zero

    loss = 1 - pearson_corr

    return loss


In [None]:
def manhattan_and_mse_loss(y_true, y_pred):
    # Perform Fourier Transform on the predictions (keep complex64)
    y_pred_freq = tf.signal.fft(tf.cast(y_pred, tf.complex64))
    y_pred_power = tf.abs(y_pred_freq)**2  # Power spectrum of predicted

    # Perform Fourier Transform on the true values (keep complex64)
    y_true_freq = tf.signal.fft(tf.cast(y_true, tf.complex64))
    y_true_power = tf.abs(y_true_freq)**2  # Power spectrum of true values

    # Extract peak indices from the power spectra (excluding DC component)
    # Apply argmax on the magnitude (real part) of the power spectrum
    peak_index_pred = tf.argmax(tf.abs(y_pred_power[1:]), axis=-1) + 1  # Excludes DC component
    peak_index_true = tf.argmax(tf.abs(y_true_power[1:]), axis=-1) + 1  # Excludes DC component

    # Compute predicted heart rate and true heart rate (bpm)
    predicted_hr = tf.cast(peak_index_pred, tf.float32) * 60  # Multiply by 60 for bpm
    true_hr = tf.cast(peak_index_true, tf.float32) * 60  # Multiply by 60 for bpm

    # Calculate Manhattan loss
    manhattan_loss = tf.abs(predicted_hr - true_hr)

    # Calculate MSE loss based on power spectra
    mse_loss = tf.reduce_mean(tf.square(y_pred_power - y_true_power))

    # Combine losses with specified weights
    loss = 0.2 * manhattan_loss + 0.1 * mse_loss

    return loss



In [None]:
def combined_loss(y_true,y_pred):
    loss1 = negative_pearson_loss(y_true,y_pred) #negative pearson loss (coefficient = 1)
    loss2 = manhattan_and_mse_loss(y_true,y_pred) #Manhattan and MSE loss (coefficients already incorporated)

    tot_loss = loss1 + loss2

    return tot_loss

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
epochs = 50

model.compile(optimizer=optimizer,loss=combined_loss, metrics = ['root_mean_squared_error','mean_squared_error'])

history1 = model.fit(X_train,y_train,
                      batch_size = 64,
                      epochs = 50,
                      verbose = 1)


model.save('trained_rPPG_U-Net.keras')


### Optional Graphs for Visualization

In [None]:
# Plot training and validation loss over epochs
import matplotlib.pyplot as plt

# Check if 'val_loss' is available in history to plot
if 'val_loss' in history1.history:
    plt.plot(history1.history['loss'], label='Training Loss')
    plt.plot(history1.history['val_loss'], label='Validation Loss')
else:
    plt.plot(history1.history['loss'], label='Training Loss')

plt.title('Model Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
def plot_fig(i, history):
    fig = plt.figure()
    # Plot root mean squared error for training
    plt.plot(range(1, epochs + 1), history.history['root_mean_squared_error'], label='Training RMSE')
    plt.legend()
    plt.xlabel('Epochs')
    plt.ylabel('Root Mean Squared Error')
    plt.xlim([1, epochs])
    plt.grid(True)
    plt.title("Training RMSE Over Epochs")
    plt.show()
    plt.close(fig)

# After training, use this function to plot the results
plot_fig(1, history1)

### References

Yu SN, Wang CS, Yu Ping Chang. Heart Rate Estimation From Remote Photoplethysmography Based on Light-Weight U-Net and Attention Modules.  IEEE access. 2023;11:54058-54069. doi:https://doi.org/10.1109/access.2023.3281898 