In [4]:
!conda activate --name gpu python=3.9
!conda activate gpu # or connect kernel to gpu env
!conda install -c conda-forge cudatoolkit=11.2 cudnn=8.1.0
!pip install tensorflow==2.10 # must be below 2.11

In [None]:
!pip install opencv-python pandas numpy mediapipe pynvml

In [8]:
import pathlib
import tensorflow as tf
import cv2
import mediapipe as mp
import numpy as np
import time
from datetime import datetime
import pandas as pd

import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"

temp = pathlib.PosixPath
pathlib.PosixPath = pathlib.WindowsPath

In [None]:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5, max_num_faces=1)

def get_head_pose_direction(face_region):
    image_rgb = cv2.cvtColor(face_region, cv2.COLOR_BGR2RGB)
    image_rgb.flags.writeable = False
    results = face_mesh.process(image_rgb)
    image_rgb.flags.writeable = True
    face_2d = []
    if results.multi_face_landmarks:
        for face_landmarks in results.multi_face_landmarks:
            for idx, lm in enumerate(face_landmarks.landmark):
                x, y = int(lm.x * 100), int(lm.y * 100)
                if idx == 1:
                    nose_x, nose_y = x, y
                face_2d.append([x, y])
            if nose_x < 40:
                direction = "Left"
            elif nose_x > 60:
                direction = "Right"
            elif nose_y < 40:
                direction = "Up"
            elif nose_y > 60:
                direction = "Down"
            else:
                direction = "Forward"
            
            return direction


def load_models():
    # Create the face detector with GPU support
    detector = cv2.FaceDetectorYN.create(
        "models/face_detection_yunet_2023mar.onnx", "", (0, 0),
        backend_id=cv2.dnn.DNN_BACKEND_CUDA,
        target_id=cv2.dnn.DNN_TARGET_CUDA
    )

    model = tf.keras.models.load_model("models/model.h5", compile=False)
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return detector, model

def enhance_image(image):
    image = cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21)
    kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]])
    image = cv2.filter2D(src=image, ddepth=-1, kernel=kernel)
    img_yuv = cv2.cvtColor(image, cv2.COLOR_BGR2YUV)
    img_yuv[:, :, 0] = cv2.equalizeHist(img_yuv[:, :, 0])
    image = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2BGR)
    
    return image

def detect_eye_contact(right_eye, left_eye, model):
    # Enhance images
    right_eye = enhance_image(right_eye)
    left_eye = enhance_image(left_eye)
    
    # Resize images and normalize pixel values
    right_eye = cv2.resize(right_eye, (60, 60)) / 255.0
    left_eye = cv2.resize(left_eye, (60, 60)) / 255.0
    
    # Convert to TensorFlow tensors
    right_eye_tensor = tf.convert_to_tensor(right_eye, dtype=tf.float32)
    left_eye_tensor = tf.convert_to_tensor(left_eye, dtype=tf.float32)
    
    # Expand dimensions to add batch dimension
    right_eye_tensor = tf.expand_dims(right_eye_tensor, axis=0)
    left_eye_tensor = tf.expand_dims(left_eye_tensor, axis=0)
    
    # Convert grayscale images to RGB if needed
    if right_eye_tensor.shape[-1] == 1:
        right_eye_tensor = tf.image.grayscale_to_rgb(right_eye_tensor)
    if left_eye_tensor.shape[-1] == 1:
        left_eye_tensor = tf.image.grayscale_to_rgb(left_eye_tensor)
    
    # Concatenate right and left eyes to form a batch
    batch = tf.concat([right_eye_tensor, left_eye_tensor], axis=0)
    
    # Move batch tensor to GPU if available
    if tf.test.is_gpu_available():
        with tf.device('/GPU:0'):
            predictions = model.predict(batch)
            print('with GPU')
    else:
        predictions = model.predict(batch)
    
    # Get predicted labels for right and left eyes
    labels = ['ClosingEye', 'EyeContact', 'LeftLook', 'RightLook']
    right_eye_pred_label = labels[int(predictions[0].argmax(axis=-1))]
    left_eye_pred_label = labels[int(predictions[1].argmax(axis=-1))]
    
    # Check if either eye predicts "EyeContact"
    return right_eye_pred_label == "EyeContact" or left_eye_pred_label == "EyeContact"


In [6]:
import pynvml

def gpu_working_check():
    pynvml.nvmlInit()
    handle = pynvml.nvmlDeviceGetHandleByIndex(0)  # Use index 0 for the first GPU
    gpu_info = pynvml.nvmlDeviceGetUtilizationRates(handle)
    print("GPU utilization:", gpu_info.gpu)
    pynvml.nvmlShutdown()

In [None]:
# Load the models
detector, model = load_models()

# Initialize variables for tracking eye contact time
user_eye_contact_data = []

# Initialize a user count
user_count = 0

In [11]:
def detectFacEye(image, conf, eye_crop_size=10):
    global user_count, start_time
    height, width, _ = image.shape
    detector.setInputSize((width, height))
    
    _, faces = detector.detect(image)
    if faces is not None:
        for face in faces:
            x, y, w, h = map(int, face[:4])
            
            right_eye_x, right_eye_y = int(face[4]), int(face[5])
            left_eye_x, left_eye_y = int(face[6]), int(face[7])
            right_eye = image[right_eye_y - eye_crop_size:right_eye_y + eye_crop_size, right_eye_x - eye_crop_size:right_eye_x + eye_crop_size]
            left_eye = image[left_eye_y - eye_crop_size:left_eye_y + eye_crop_size, left_eye_x - eye_crop_size:left_eye_x + eye_crop_size]
            eye_contact = detect_eye_contact(right_eye, left_eye, model)
            
            factor = 0.9
            expand_x = int(factor * w)
            expand_y = int(factor * h)
            new_x = max(0, x - expand_x)
            new_y = max(0, y - expand_y)
            new_w = min(width, w + 2 * expand_x)
            new_h = min(height, h + 2 * expand_y)

            face_region = image[new_y:new_y + new_h, new_x:new_x + new_w]
            face_region = enhance_image(face_region)
            # pred = detect_face_direction(face_region)
            pred = get_head_pose_direction(face_region)
            
            
            # if pred in ['Front', 'FrontLeft', 'FrontRight']:
            if pred in ["Forward", "Up"]:
                if eye_contact:
                    # Log the eye contact time for this user
                    elapsed_time = time.time() - start_time
                    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    user_count += 1
                    user_id = f"user_{user_count}"
                    
                    user_eye_contact_data.append({
                        "User ID": user_id,
                        "Eye Contact Duration (seconds)": elapsed_time,
                        "Date and Time": current_time
                    })
                    
                    color = (0, 255, 0)
                    text = f"EyeContact {user_id} {elapsed_time:.2f}s {pred}"
                    
                    start_time = time.time()
                else:
                        color = (0, 0, 255)
                        text = f"NoEyeContact {pred}"
            else:
                color = (0, 0, 255)
                text = f"NoEyeContact {pred}"
            
            cv2.rectangle(image, (x, y), (x + w, y + h), color, 2)
            cv2.rectangle(image, (right_eye_x - 5, right_eye_y - 5), (right_eye_x + 5, right_eye_y + 5), color, 2)
            cv2.rectangle(image, (left_eye_x - 5, left_eye_y - 5), (left_eye_x + 5, left_eye_y + 5), color, 2)
            cv2.putText(image, text, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2, cv2.LINE_AA)

In [22]:
def process_video(input_video_path, conf, eye_crop_size=10):
    global start_time
    cap = cv2.VideoCapture(input_video_path)
    
    #width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    #height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    #fourcc = cv2.VideoWriter_fourcc(*'XVID')
    #out = cv2.VideoWriter(output_video_path, fourcc, 20.0, (width, height))

    # Initialize start time
    start_time = time.time()
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        #gpu_working_check()  # Check GPU utilization
        
        detectFacEye(frame, conf=conf, eye_crop_size=eye_crop_size)
        cv2.imshow('Face-Detection', frame)
        
        print('Processed frame {}/{}'.format(cap.get(cv2.CAP_PROP_POS_FRAMES), total_frames))
        
        #out.write(frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    #out.release()
    cv2.destroyAllWindows()
    
    # Save the eye contact data to an Excel file
    #df = pd.DataFrame(user_eye_contact_data)
    #df.to_excel("data.xlsx", index=False)
    #print("Eye contact times saved to eye_contact_times.xlsx")

In [50]:
process_video('rtsp://admin:hM571632@192.168.1.64/Streaming/Channels/102', conf=0.5, eye_crop_size=10)

GPU utilization: 22
Processed frame 1.0/-2562047788015215
GPU utilization: 22
Processed frame 2.0/-2562047788015215
GPU utilization: 22
Processed frame 3.0/-2562047788015215
GPU utilization: 22
Processed frame 4.0/-2562047788015215
GPU utilization: 22
Processed frame 5.0/-2562047788015215
GPU utilization: 22
Processed frame 6.0/-2562047788015215
GPU utilization: 22
Processed frame 7.0/-2562047788015215
GPU utilization: 22
Processed frame 8.0/-2562047788015215
GPU utilization: 22
Processed frame 9.0/-2562047788015215
GPU utilization: 22
Processed frame 10.0/-2562047788015215
GPU utilization: 22
Processed frame 11.0/-2562047788015215
GPU utilization: 22
Processed frame 12.0/-2562047788015215
GPU utilization: 22
Processed frame 13.0/-2562047788015215
GPU utilization: 22
Processed frame 14.0/-2562047788015215
GPU utilization: 22
Processed frame 15.0/-2562047788015215
GPU utilization: 13
Processed frame 16.0/-2562047788015215
GPU utilization: 13
Processed frame 17.0/-2562047788015215
GPU ut