In [None]:
import os
import math
import random

import cv2
import dlib
import torch
import face_recognition

import numpy as np
import pandas as pd
import scipy as sp

from imutils import face_utils
from concurrent.futures import ThreadPoolExecutor

In [None]:
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

In [None]:
shape_predictor_path = "eameo-faceswap-generator/shape_predictor_68_face_landmarks.dat"
try:
    detector = dlib.get_frontal_face_detector()
    predictor = dlib.shape_predictor(shape_predictor_path)
    print("Successfully loaded")
except Exception as e:
    print(f"Error loading models: {e}")
    exit()

def eye_aspect_ratio_torch(eye):
    A = torch.norm(eye[1] - eye[5])
    B = torch.norm(eye[2] - eye[4])
    C = torch.norm(eye[0] - eye[3])
    ear = (A + B) / (2.0 * C)
    return ear

def get_head_pose_torch(shape, size, device='cpu'):
    model_points = torch.tensor([
        [0.0, 0.0, 0.0], [0.0, -330.0, -65.0], [-225.0, 170.0, -135.0],
        [225.0, 170.0, -135.0], [-150.0, -150.0, -125.0], [150.0, -150.0, -125.0]
    ], dtype=torch.float32).to(device)

    image_points = shape[[30, 8, 36, 45, 48, 54], :]

    if image_points.shape[0] < 4:
        raise ValueError("Not enough points to perform solvePnP")

    focal_length = size[1]
    center = (size[1] / 2, size[0] / 2)
    camera_matrix = torch.tensor([
        [focal_length, 0, center[0]],
        [0, focal_length, center[1]],
        [0, 0, 1]
    ], dtype=torch.float32).to(device)

    dist_coeffs = torch.zeros((4, 1), dtype=torch.float32).to(device)
    
    image_points_np = image_points.cpu().numpy()
    model_points_np = model_points.cpu().numpy()
    camera_matrix_np = camera_matrix.cpu().numpy()
    dist_coeffs_np = dist_coeffs.cpu().numpy()

    success, rotation_vector, translation_vector = cv2.solvePnP(
        model_points_np, image_points_np, camera_matrix_np, dist_coeffs_np
    )

    if not success:
        raise ValueError("solvePnP failed to find a solution")

    rotation_vector = torch.tensor(rotation_vector, dtype=torch.float32).to(device)
    translation_vector = torch.tensor(translation_vector, dtype=torch.float32).to(device)
    
    return rotation_vector, translation_vector

def rotation_vector_to_euler_angles(rotation_vector):
    rotation_matrix, _ = cv2.Rodrigues(rotation_vector)
    sy = np.sqrt(rotation_matrix[2, 1] ** 2 + rotation_matrix[2, 2] ** 2)
    singular = sy < 1e-6

    if not singular:
        x = np.arctan2(rotation_matrix[2, 1], rotation_matrix[2, 2])
        y = np.arctan2(-rotation_matrix[2, 0], sy)
        z = np.arctan2(rotation_matrix[1, 0], rotation_matrix[0, 0])
    else:
        x = np.arctan2(-rotation_matrix[1, 2], rotation_matrix[1, 1])
        y = np.arctan2(-rotation_matrix[2, 0], sy)
        z = 0

    return np.degrees(x), np.degrees(y), np.degrees(z)

def pad_features(features, target_length):
    """
    Pads or truncates each feature vector in `features` to `target_length`.

    Parameters:
    - features: List of feature vectors, each of which can be a list or tuple.
    - target_length: Desired length of each feature vector.

    Returns:
    - List of padded or truncated feature vectors.
    """
    padded_features = []
    for feature_vector in features:
        if not isinstance(feature_vector, (list, tuple)):
            raise ValueError("Each feature vector should be a list or tuple.")
        
        if isinstance(feature_vector, tuple):
            feature_vector = list(feature_vector)
        
        if len(feature_vector) < target_length:
            padded_vector = feature_vector + [0] * (target_length - len(feature_vector))
        else:
            padded_vector = feature_vector[:target_length]
        
        padded_features.append(padded_vector)
    
    return padded_features


def classify_eye_openness(ear, threshold_open=0.35, threshold_closed=0.2):
    if ear >= 0.5:
        return 0
    if ear > threshold_open:
        return 0 
    elif threshold_closed <= ear <= threshold_open:
        return 1
    else:
        return 2 

def compute_gaze_direction(shape_tensor, rotation_vector, image_size):
    left_eye_landmarks = shape_tensor[36:42]
    right_eye_landmarks = shape_tensor[42:48]
    
    left_eye_center = left_eye_landmarks.mean(dim=0)
    right_eye_center = right_eye_landmarks.mean(dim=0)
    
    eye_center = (left_eye_center + right_eye_center) / 2
    
    image_center = torch.tensor([image_size[1] / 2, image_size[0] / 2], dtype=torch.float32)
    
    gaze_vector = image_center - eye_center[:2]
    
    pitch, yaw, roll = rotation_vector_to_euler_angles(rotation_vector.cpu().numpy())
    
    return pitch, yaw, gaze_vector.tolist()

def classify_head_pose(pitch, yaw, roll, gaze_vector):
    """
    Classifies head pose based on pitch, yaw, roll, and gaze direction.
    
    Parameters:
        pitch (float): The pitch angle.
        yaw (float): The yaw angle.
        roll (float): The roll angle.
        gaze_vector (list): The gaze direction vector [x, y].
    
    Returns:
        str: A classification string representing the head pose.
    """
    gaze_direction = 0
    if gaze_vector[0] > 0.5:
        gaze_direction = 1 
    elif gaze_vector[0] < -0.5:
        gaze_direction = 2 
    
    if (pitch > 10 and pitch <= 180) or (pitch <= -180):
        position = 3 
    elif (pitch < -10 and pitch > -180) or (pitch > 180):
        position = 4 
    elif yaw > 10:
        position = 5
    elif yaw < -10:
        position = 6 
    elif roll > 10:
        position = 7
    elif roll < -10:
        position = 8
    else:
        position = 0
    
    return position, gaze_direction

def process_frame(frame):
    frame_np = frame
    gray = cv2.cvtColor(frame_np, cv2.COLOR_RGB2GRAY)
    rects = detector(gray, 0)
    features = []

    if len(rects) == 0:
        return features

    shapes = [face_utils.shape_to_np(predictor(gray, rect)) for rect in rects]
    max_width = 0
    best_features = None
    MIN_FACE_SIZE = 5000
    for i, rect in enumerate(rects):
        if rect.width() * rect.height() < MIN_FACE_SIZE:
            continue 
        shape = shapes[i]
        shape_tensor = torch.tensor(shape, dtype=torch.float32)

        leftEye = shape_tensor[42:48]
        rightEye = shape_tensor[36:42]
        ear = (eye_aspect_ratio_torch(leftEye) + eye_aspect_ratio_torch(rightEye)) / 2.0

        size = (frame_np.shape[1], frame_np.shape[0])
        rotation_vector, translation_vector = get_head_pose_torch(shape_tensor, size)
        pitch, yaw, roll = rotation_vector_to_euler_angles(rotation_vector.numpy())
        gaze_vector = compute_gaze_direction(shape_tensor, rotation_vector, size)
        position, gaze_direction = classify_head_pose(pitch, yaw, roll, gaze_vector)
        eye_category = classify_eye_openness(ear.item())

        pitch = np.clip(pitch, -90, 90)

        current_features = [eye_category, position, gaze_direction, ear.item(), pitch, yaw, roll]

        face_width = rect.width() * rect.height()
        if face_width > max_width:
            max_width = face_width
            best_features = current_features

    if best_features is not None:
        features = best_features

    return features


In [None]:
def get_forehead_roi_from_frame(frame):
    frame = frame.numpy()
    if not isinstance(frame, np.ndarray):
        raise ValueError("Frame must be a NumPy array.")

    frame_rgb = frame.astype(np.uint8)
    face_locations = face_recognition.face_locations(frame_rgb)
    roi = (0, 0, frame.shape[0], frame.shape[1])

    if face_locations:
        top, right, bottom, left = face_locations[0]
        forehead_height = (bottom - top) // 5
        roi = (left, top, right, top + forehead_height)

    return roi

def detrend(input_signal, lambda_value):
    signal_length = input_signal.shape[0]
    H = np.identity(signal_length)
    ones = np.ones(signal_length)
    minus_twos = -2 * np.ones(signal_length)
    diags_data = np.array([ones, minus_twos, ones])
    diags_index = np.array([0, 1, 2])
    D = sp.sparse.spdiags(diags_data, diags_index, (signal_length - 2), signal_length).toarray()
    filtered_signal = np.dot(
        (H - np.linalg.inv(H + (lambda_value ** 2) * np.dot(D.T, D))), input_signal)
    return filtered_signal

def _process_video(frames):
    if isinstance(frames, torch.Tensor):
        frames = frames.numpy() 

    RGB = []
    for frame in frames:
        frame_tensor = torch.tensor(frame, dtype=torch.float32)
        summation = torch.sum(frame_tensor, dim=(0, 1)).numpy()
        RGB.append(summation / (frame_tensor.shape[0] * frame_tensor.shape[1]))
    
    return np.asarray(RGB)

def green_pos_features(frames, fps=30):
    if isinstance(frames, list):
        frames = torch.tensor(np.array(frames), dtype=torch.float32)
    elif isinstance(frames, np.ndarray):
        frames = torch.tensor(frames, dtype=torch.float32)
    frames = frames.float()
    
    bvp_no_roi = []
    for frame in frames:
        roi_frame = frame[:, :, 1]
        bvp_no_roi.append(torch.mean(roi_frame).item())
    bvp_no_roi = np.array(bvp_no_roi)

    bvp_roi = []
    for frame in frames:
        roi = get_forehead_roi_from_frame(frame)
        left, top, right, bottom = roi
        roi_frame = frame[top:bottom, left:right, 1]
        bvp_roi.append(torch.mean(roi_frame).item())
    bvp_roi = np.array(bvp_roi)

    LPF = 0.7
    HPF = 2.5
    RGB = _process_video(frames)
    N = RGB.shape[0]
    H = np.zeros((1, N))
    WinSec = 1.6
    l = math.ceil(WinSec * fps)

    for n in range(N):
        m = n - l
        if m >= 0:
            Cn = np.true_divide(RGB[m:n, :], np.mean(RGB[m:n, :], axis=0))
            Cn = np.asmatrix(Cn).H
            S = np.matmul(np.array([[0, 1, -1], [-2, 1, 1]]), Cn)
            h = S[0, :] + (np.std(S[0, :]) / np.std(S[1, :])) * S[1, :]
            mean_h = np.mean(h)
            for temp in range(h.shape[1]):
                h[0, temp] = h[0, temp] - mean_h
            H[0, m:n] = H[0, m:n] + (h[0])

    BVP_pos = H
    BVP_pos = detrend(np.asmatrix(BVP_pos).H, 100)
    BVP_pos = np.asarray(np.transpose(BVP_pos))[0]
    b, a = sp.signal.butter(1, [0.75 / fps * 2, 3 / fps * 2], btype='bandpass')
    BVP_pos = sp.signal.filtfilt(b, a, BVP_pos.astype(np.double))
    BVP_pos = BVP_pos.copy()

    nyquistF = fps / 2
    lowCutOffFreq = LPF / nyquistF
    highCutOffFreq = HPF / nyquistF
    b, a = sp.signal.butter(3, [lowCutOffFreq, highCutOffFreq], btype='band')
    bvp_no_roi_filtered = sp.signal.filtfilt(b, a, bvp_no_roi - np.mean(bvp_no_roi, axis=-1, keepdims=True), axis=-1)
    bvp_roi_filtered = sp.signal.filtfilt(b, a, bvp_roi - np.mean(bvp_roi, axis=-1, keepdims=True), axis=-1)

    def calculate_features(bvp_filtered, fps = 30):
        peaks, _ = sp.signal.find_peaks(bvp_filtered, distance=fps*0.6) 
        RR_intervals = np.diff(peaks) / fps
        heart_rate = 60 / RR_intervals
        average_heart_rate = np.mean(heart_rate)
        systolic_peaks, _ = sp.signal.find_peaks(bvp_filtered)
        diastolic_peaks, _ = sp.signal.find_peaks(-bvp_filtered)
        if len(systolic_peaks) > 1:
            peak_intervals = np.diff(systolic_peaks) / fps
            avg_peak_interval = np.mean(peak_intervals)
        else:
            avg_peak_interval = np.nan
        systolic_peak_sum = np.sum(bvp_filtered[systolic_peaks])
        diastolic_peak_sum = np.sum(-bvp_filtered[diastolic_peaks])
        

        return avg_peak_interval, systolic_peak_sum, diastolic_peak_sum,average_heart_rate

    features_no_roi = np.array(calculate_features(bvp_no_roi_filtered))
    features_roi = np.array(calculate_features(bvp_roi_filtered))
    features_pos = np.array(calculate_features(BVP_pos))

    return np.concatenate([features_no_roi, features_roi, features_pos])

In [None]:
def process_subject_batch(batch, label, n=(FPS*VIDEO_LENGTH)//(FRAME_INTERVAL)):
    ear_choices = np.array([1, 2])
    eye_gaze_choices = np.array([1, 2])
    head_pose_choices = np.array([3, 4, 5, 6, 7, 8])
    subject_frames = []
    
    try:
        for frame in batch:
            frame_features = process_frame(frame)
            if frame_features:
                subject_frames.append(frame_features)
            else:
                subject_frames.append([np.random.choice(ear_choices), np.random.choice(eye_gaze_choices), np.random.choice(head_pose_choices),0, 0, 0, 0 ])
        
        if subject_frames:
            max_length = max(len(f) for f in subject_frames) if subject_frames else 0
            subject_frames = pad_features(subject_frames, max_length)
            subject_frames = np.array(subject_frames)
        else:
            subject_frames = np.zeros((n, 7))  
            
        
        if isinstance(subject_frames, np.ndarray) and subject_frames.size > 0:
            l = subject_frames.shape[0] // n
            subject_frames = subject_frames[:l * n].reshape(l, n, -1)

            categorical_features = subject_frames[:, :, :3]
            numerical_features = subject_frames[:, :, 3:]

            mode_categorical = sp.stats.mode(categorical_features, axis=1, keepdims=False).mode

            mean_numerical = numerical_features.mean(axis=1)

            subject_frames = np.concatenate((mode_categorical, mean_numerical), axis=1)

        X_batch_N = np.array(green_pos_features(batch))

        return subject_frames, X_batch_N, label
    except Exception as e:
        print(f"Error processing batch: {e}")
        return np.array([]), np.array([]), label

In [7]:
X_batch_A = []
X_batch_N = []
all_labels = []

iters = GLOBAL_BATCH_SIZE // BATCH_SIZE

with ThreadPoolExecutor(max_workers=256) as executor:
    futures = []

    for i in range(iters):
        X_batch, Y_batch = load_data(PATH, DATASET, BATCH_SIZE)

        print(f'{i+1} iterations done')
        
        for batch, label in zip(X_batch, Y_batch):
            futures.append(executor.submit(process_subject_batch, batch, label))

    for future in futures:
        subject_frames, phys_features, label = future.result()
        X_batch_A.append(subject_frames)
        X_batch_N.append(phys_features)
        all_labels.append(label)

X_batch_A = np.array(X_batch_A)
X_batch_A = np.squeeze(X_batch_A, axis=1)
X_batch_N = np.array(X_batch_N)

X_batch_A_df = pd.DataFrame(X_batch_A)
X_batch_N_df = pd.DataFrame(X_batch_N)


X_batch_A_df.fillna(X_batch_A_df.median(numeric_only=True), inplace=True)
X_batch_N_df.fillna(X_batch_N_df.median(numeric_only=True), inplace=True)

X_batch_A = X_batch_A_df.values
X_batch_N = X_batch_N_df.values

In [None]:
combined_features = np.hstack((X_batch_A, X_batch_N))
combined_features_with_labels = np.hstack((combined_features,all_labels))
df = pd.DataFrame(combined_features_with_labels, columns=['eye_category', 'eye_position', 'gaze_direction', 'ear', 'pitch', 'yaw', 'roll', 'features_no_roi_avg_peak_interval', 'features_no_roi_systolic_peak_sum', 'features_no_roi_diastolic_peak_sum', 'features_no_roi_average_heart_rate', 'features_roi_avg_peak_interval', 'features_roi_systolic_peak_sum', 'features_roi_diastolic_peak_sum', 'features_roi_average_heart_rate', 'features_pos_avg_peak_interval', 'features_pos_systolic_peak_sum', 'features_pos_diastolic_peak_sum', 'features_pos_average_heart_rate', 'engagement_labels'])

filename = f'features_{DATASET}_ablation_.csv'
df.to_csv(filename, index=False)
print(f'Successfully saved to {filename}')