In [127]:
#Necessary imports

import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import openpyxl
import pandas as pd
import dlib
import seaborn as sns


from scipy.signal import butter, filtfilt, lfilter
from scipy.fft import fft, fftfreq
from datetime import datetime

In [128]:
#Constants
low_cutoff = 0.75
high_cutoff = 2.5
fps=0
fs = 2 * high_cutoff

#Paths
dlib_path = 'dlib_files/shape_predictor_68_face_landmarks.dat'
video_path = ''

# Load face detector and facial landmark predictor
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(dlib_path)

#pulses values
pulses_forehead=[]
pulses_leftc=[]
pulses_rightc=[]
# pulses_fullf=[]

# Create a new DataFrame with new columns data


In [129]:
try:
    os.mkdir(folder_path)
    print(folder_path)
except FileExistsError:
    print("Folder already exists!")

Folder already exists!


In [130]:
#Utility functions

def setValues():
    #Constants
    low_cutoff = 0.75
    high_cutoff = 2.5
    fs = 2 * high_cutoff    
    #pulses values
    pulses_forehead=[]
    pulses_leftc=[]
    pulses_rightc=[]
    # pulses_fullf=[]


def calculate_pulse_signal(roi,pulses):
    
    B, G, R = cv2.split(roi)
    
    #Normalizing R,G,B channels
    
    Rn = R / np.mean(R)
    Gn = G / np.mean(G)
    Bn = B / np.mean(B)    
    
    #CHROM Signals
    
    Xs = 3*Rn - 2*Gn
    Ys = 1.5*Rn + Gn - 1.5*Bn
    
    Rf = apply_bandpass_filter(Rn)
    Gf = apply_bandpass_filter(Gn)
    Bf = apply_bandpass_filter(Bn)
    Xf = apply_bandpass_filter(Xs)
    Yf = apply_bandpass_filter(Ys)

    alpha = np.std(Xf) / np.std(Yf)

    Signal = 3*(1-(alpha/2))*Rf - 2*(1 + (alpha/2))*Gf + ((3*alpha)/2)*Bf
    
    pulses.append(np.mean(Signal))  

def apply_bandpass_filter(channel, order=5):
    b, a = butter_bandpass(order)
    y = filtfilt(b, a, channel, axis=0,padlen=0)
    return y

def butter_bandpass(order=5):
    nyquist = 0.5 * fs  # Nyquist frequency
    lowcut_norm = low_cutoff / nyquist 
    highcut_norm = high_cutoff / nyquist
    b, a = butter(order, [lowcut_norm, highcut_norm-0.1], btype='band')
    return b, a

    

In [131]:
# Open video file

def calculateResult(file_path):
    print(file_path)
    data = {
    'fl': np.nan,
    'fr': np.nan,
    'rl': np.nan,
    'facecount': np.nan,
    'error':0
    }
    
    cap = cv2.VideoCapture(file_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    setValues()
    
    # Process each frame in the video
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            # handle if not ret
            data['error'] = 1
            print(ret)
            return data
            break
    
        # Convert frame to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
        # Detect faces in the frame
        faces = detector(gray)
        
        if(len(faces) > 1):
            data['facecount'] = len(faces)
            return data
            break
    
        # Process each detected face
        for face in faces:
            # Detect facial landmarks
            landmarks = predictor(gray, face)
            
            # Identify forehead region based on landmarks
            forehead_r1=[0,0,0,0] #x1,x2,y1,y2
            
            forehead_r1[0] = landmarks.part(21).x # Left eyebrow-x
            forehead_r1[1] = landmarks.part(22).x  # Right eyebrow-x
            forehead_r1_height=int((forehead_r1[1] - forehead_r1[0]))
            forehead_r1[2] = landmarks.part(21).y - forehead_r1_height # Left eyebrow-y
            forehead_r1[3] = landmarks.part(22).y # Right eyebrow-y
            
            # Identify left cheek region-1 based on landmarks
            leftc_r1=[0,0,0,0] #x1,x2,y1,y2
            
            leftc_r1[0]=landmarks.part(41).x
            leftc_r1[1]=landmarks.part(39).x
            leftc_r1[2]=landmarks.part(28).y
            leftc_r1[3]=landmarks.part(30).y
    
            # Identify right cheek region-1 based on landmarks
            rightc_r1=[0,0,0,0] #x1,x2,y1,y2
            
            rightc_r1[0]=landmarks.part(42).x
            rightc_r1[1]=landmarks.part(46).x
            rightc_r1[2]=landmarks.part(28).y
            rightc_r1[3]=landmarks.part(30).y
            
            try:
                calculate_pulse_signal(frame[forehead_r1[2]:forehead_r1[3],forehead_r1[0]:forehead_r1[1]],pulses_forehead)
                calculate_pulse_signal(frame[leftc_r1[2]:leftc_r1[3],leftc_r1[0]:leftc_r1[1]],pulses_leftc)
                calculate_pulse_signal(frame[rightc_r1[2]:rightc_r1[3],rightc_r1[0]:rightc_r1[1]],pulses_rightc)   
            except Exception as e:
                print(e)
                data['error']=1
                return data
    
        # Exit if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            # handle if q pressed
            break
    # Release video capture
    cap.release()

    # data = {'full_face': pulses_fullf, 'forehead': pulses_forehead, 'left_cheek': pulses_leftc, 'right_cheek': pulses_rightc}
    datadf = {'forehead': pulses_forehead, 'left_cheek': pulses_leftc, 'right_cheek': pulses_rightc}
    df = pd.DataFrame(datadf)
    correlation_matrix = df.corr()  
    
    data = {
    'fl': round(correlation_matrix['forehead']['left_cheek'],2),
    'fr': round(correlation_matrix['forehead']['right_cheek'],2),
    'rl': round(correlation_matrix['right_cheek']['left_cheek'],2),
    'facecount': 1,
    'error':0
    }
    
    return data


In [132]:
excel_file = 'celebDF_DS.xlsx'  # Replace with the path to your Excel file
results=[]
# Read the existing Excel file    
existing_df = pd.read_excel(excel_file)
count=5
for filepath in existing_df['filepath']:
    print(filepath)
    result = calculateResult(filepath)
    results.append(result)
    print(result)
    if(count==0):
        break
    count-=1
print("Done")
# updating Results DataSet
# try:
#     results_df = pd.DataFrame(results)
#     updated_df = updated_df = pd.concat([existing_df, result_df], axis=1)
#     save_excel_file(updated_df, excel_file)        
#     print("The new columns have been added to the Excel file.")
# except Exception as e:
#     print(f"An error occurred: {e}")


C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0000.mp4
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0000.mp4
False
{'fl': nan, 'fr': nan, 'rl': nan, 'facecount': nan, 'error': 1}
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0001.mp4
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0001.mp4
False
{'fl': nan, 'fr': nan, 'rl': nan, 'facecount': nan, 'error': 1}
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0002.mp4
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0002.mp4
False
{'fl': nan, 'fr': nan, 'rl': nan, 'facecount': nan, 'error': 1}
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0003.mp4
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0003.mp4
False
{'fl': nan, 'fr': nan, 'rl': nan, 'facecount': nan, 'error': 1}
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0004.mp4
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0004.mp4
False
{'fl': nan, 'fr': nan, 'rl': nan, 'facecount': nan, 'error': 1}
C:/Users/dpava/Desktop/celeb_df_v1/Celeb-real/id0_0005.mp4
C