In [1]:
import cv2
from deepface import DeepFace
import numpy as np
from scipy.spatial.distance import cosine
import yt_dlp
import time

# Configuration constants
CASCADE_PATH = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
SCALE_FACTOR = 1.1
MIN_NEIGHBORS = 5
MIN_FACE_SIZE = (30, 30)
FRAME_SKIP = 20  # Perform analysis every 20 frames
PERSISTENCE_TIME = 15  # Number of skipped frames to persist bounding boxes
KNOWN_FACES = {
    "girl": "girl_in_yt.jpg",
    "guy": "guy_in_yt.jpg"
}

# Global profiles to store emotions for each known face
profiles = {name: {"emotions": []} for name in KNOWN_FACES.keys()}
active_faces = {}  # Dictionary to store bounding boxes and counters for detected faces

# Load and store profiles
def load_known_faces(image_paths):
    """Generate embeddings for provided images."""
    known_faces = {}
    for name, image_path in image_paths.items():
        try:
            # Load the image
            face_image = cv2.imread(image_path)
            if face_image is None:
                print(f"Error: Could not read image {image_path}")
                continue

            # Generate embedding
            embedding = DeepFace.represent(face_image, model_name='Facenet512', enforce_detection=False)
            known_faces[name] = np.array(embedding[0]['embedding'])
            print(f"Loaded embedding for {name}")
        except Exception as e:
            print(f"Error processing {name}: {e}")
    return known_faces

# Classify faces uniquely to known names
def classify_faces_uniquely(face_embeddings, known_faces):
    """Classify multiple detected faces uniquely to the known names."""
    matches = []  # To store matched names and distances
    used_names = set()  # Keep track of assigned names

    for embedding in face_embeddings:
        best_match = None
        best_distance = float('inf')

        # Find the closest match for the embedding among remaining names
        for name, known_embedding in known_faces.items():
            if name in used_names:
                continue  # Skip already assigned names
            distance = cosine(known_embedding, embedding)
            if distance < best_distance:
                best_match = name
                best_distance = distance

        if best_match:
            matches.append((best_match, best_distance))
            used_names.add(best_match)  # Mark the name as used

    return matches

# Detect faces in a frame
def detect_faces(face_cascade, frame):
    """Detect faces in the given frame."""
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(
        gray_frame, 
        scaleFactor=SCALE_FACTOR, 
        minNeighbors=MIN_NEIGHBORS, 
        minSize=MIN_FACE_SIZE
    )
    return faces

# Generate face embedding
def get_face_embedding(face_roi):
    """Generate an embedding for the given face ROI."""
    try:
        embedding = DeepFace.represent(face_roi, model_name='Facenet512', enforce_detection=False)
        return np.array(embedding[0]['embedding'])
    except Exception as e:
        print(f"Embedding error: {e}")
        return None

# Analyze emotions in the face ROI
def analyze_emotions(face_roi):
    """Analyze emotions using DeepFace and return all emotion scores."""
    try:
        result = DeepFace.analyze(face_roi, actions=['emotion'], enforce_detection=False)
        return result[0]['emotion']  # Return the entire emotion dictionary
    except Exception as e:
        print(f"Emotion analysis error: {e}")
        return {}

def get_video_url(youtube_url):
    """Fetch the direct video URL using yt-dlp."""
    ydl_opts = {
        'quiet': True,
        'format': 'best[ext=mp4]'
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(youtube_url, download=False)
        return info['url']

# Main script
def main():
    # YouTube video URL
    youtube_url = "https://www.youtube.com/watch?v=96Y6mc3C1Bg"  # Replace with your video URL
    video_url = get_video_url(youtube_url)
    print("Video URL:", video_url)

    # Open the YouTube video stream
    cap = cv2.VideoCapture(video_url)
    if not cap.isOpened():
        raise RuntimeError("Error: Could not open the YouTube video stream.")

    # Load known face embeddings
    known_faces = load_known_faces(KNOWN_FACES)

    # Load face cascade classifier
    face_cascade = cv2.CascadeClassifier(CASCADE_PATH)

    frame_counter = 0  # Counter to keep track of skipped frames

    try:
        print("Processing YouTube video. Press 'q' to quit.")
        while True:
            ret, frame = cap.read()
            if not ret:
                print("End of video or error reading the video stream.")
                break

            # Increment frame counter
            frame_counter += 1

            if frame_counter % FRAME_SKIP == 0:
                # Perform face detection and analysis every `FRAME_SKIP` frames
                faces = detect_faces(face_cascade, frame)

                face_embeddings = []  # List to store embeddings for all detected faces
                bounding_boxes = []  # List to store bounding boxes for all detected faces

                for (x, y, w, h) in faces:
                    # Extract the face ROI
                    face_roi = frame[y:y + h, x:x + w]
                    if face_roi.size == 0:
                        continue

                    # Get face embedding
                    embedding = get_face_embedding(face_roi)
                    if embedding is None:
                        continue

                    face_embeddings.append(embedding)
                    bounding_boxes.append((x, y, w, h))

                # Classify faces uniquely to known names
                matches = classify_faces_uniquely(face_embeddings, known_faces)

                for i, (name, distance) in enumerate(matches):
                    x, y, w, h = bounding_boxes[i]

                    # Analyze emotions
                    emotions = analyze_emotions(frame[y:y + h, x:x + w])

                    # Update profile with the detected emotions
                    if name in profiles:
                        profiles[name]["emotions"].append(emotions)

                    # Store the detected face and reset its persistence counter
                    active_faces[name] = {
                        "box": (x, y, w, h),
                        "emotions": emotions,
                        "counter": PERSISTENCE_TIME  # Reset the persistence counter
                    }

            # Decrement counters for inactive faces
            for name in list(active_faces.keys()):
                active_faces[name]["counter"] -= 1
                if active_faces[name]["counter"] <= 0:
                    del active_faces[name]  # Remove expired bounding boxes

            # Draw all active bounding boxes
            for name, data in active_faces.items():
                x, y, w, h = data["box"]
                emotions = data["emotions"]
                color = (0, 255, 0)  # Green for bounding box
                cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)

                # Display the name and emotion scores
                label = f"{name}"  # Add the name to the label
                y_offset = y - 10
                cv2.putText(frame, label, (x, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
                y_offset -= 20
                for emotion, score in emotions.items():
                    cv2.putText(frame, f"{emotion}: {score:.2f}%", (x, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
                    y_offset -= 15

            # Display the resulting frame
            cv2.imshow('Face Recognition with Emotion Scores', frame)

            # Adjustable frame delay to match playback speed
            time.sleep(0.04)  # Adjust this to sync the video with the actual playback rate

            # Press 'q' to exit
            if cv2.waitKey(1) & 0xFF == ord('q'):
                print("Exiting video processing.")
                break
    finally:
        cap.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


2025-01-24 00:49:56.280567: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Video URL: https://rr1---sn-n4v7snls.googlevideo.com/videoplayback?expire=1737730217&ei=SFSTZ4PTO--KsfIP1tWBsAk&ip=2601%3A644%3A601%3Afa0%3Afdcd%3A289b%3A9a7d%3A5a7c&id=o-ADqQIUZK-DcFAw-osEzKKdKPfuAxdWTg2DAINd7omDMn&itag=18&source=youtube&requiressl=yes&xpc=EgVo2aDSNQ%3D%3D&met=1737708617%2C&mh=KN&mm=31%2C29&mn=sn-n4v7snls%2Csn-o097znzr&ms=au%2Crdu&mv=m&mvi=1&pl=48&rms=au%2Cau&initcwndbps=4846250&bui=AY2Et-Nu1dZ8JJs_WbU-SgmGDop10U0u8FSW_cl8v8BUMKJEqpq1jpc53eqIootsj63fKdt85MQXhDky&spc=9kzgDYjkdeuw-K8FCjhuj6V3Zeg3Y8Czg1WaugGspUUgvWBY7eEfRrDxSERscBwJ7w&vprv=1&svpuc=1&mime=video%2Fmp4&ns=Me9V-kZjS8m4bzyDkKC7XyAQ&rqh=1&cnr=14&ratebypass=yes&dur=5104.187&lmt=1737219722122408&mt=1737708285&fvip=5&fexp=51326932%2C51331020%2C51335594%2C51353498%2C51371294%2C51384461&c=MWEB&sefc=1&txp=5538534&n=FQeY18xEzA30sw&sparams=expire%2Cei%2Cip%2Cid%2Citag%2Csource%2Crequiressl%2Cxpc%2Cbui%2Cspc%2Cvprv%2Csvpuc%2Cmime%2Cns%2Crqh%2Ccnr%2Cratebypass%2Cdur%2Clmt&sig=AJfQdSswRgIhANApxxnuxM7EFqX_vI1cY-1hiNMhhCt8

In [1]:
import cv2
from deepface import DeepFace
import numpy as np
from scipy.spatial.distance import cosine
import yt_dlp
import time
import torch
import torchvision.transforms as T

# Configuration constants
CASCADE_PATH = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
SCALE_FACTOR = 1.1
MIN_NEIGHBORS = 5
MIN_FACE_SIZE = (30, 30)
FRAME_SKIP = 20
PERSISTENCE_TIME = 15
KNOWN_FACES = {
    "girl": "girl_in_yt.jpg",
    "guy": "guy_in_yt.jpg"
}

# Initialize gaze detection model
def initialize_gaze_model():
    print("Loading Gaze-LLE model...")
    model, _ = torch.hub.load('fkryan/gazelle', 'gazelle_dinov2_vitb14', trust_repo=True)
    model.eval()  # Set the model to evaluation mode
    transform = T.Compose([
        T.Resize((448, 448)),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    print("Model and transform loaded successfully.")
    return model, transform

# Detect gaze target
def detect_gaze(model, transform, frame, bounding_boxes):
    """Estimate gaze direction and return gaze points."""
    gaze_results = []
    for bbox in bounding_boxes:
        x, y, w, h = bbox
        face_roi = frame[y:y + h, x:x + w]
        face_pil = cv2.cvtColor(face_roi, cv2.COLOR_BGR2RGB)
        face_pil = Image.fromarray(face_pil)

        # Preprocess the face for gaze estimation
        face_tensor = transform(face_pil).unsqueeze(0)
        with torch.no_grad():
            output = model({"images": face_tensor})
            heatmap = output['heatmap'][0].detach().cpu().numpy()
            max_index = np.unravel_index(np.argmax(heatmap), heatmap.shape)
            gaze_x = max_index[1] / heatmap.shape[1] * frame.shape[1]
            gaze_y = max_index[0] / heatmap.shape[0] * frame.shape[0]

        gaze_results.append((gaze_x, gaze_y))
    return gaze_results

# Check who the gaze is directed at
def determine_gaze_target(gaze_points, bounding_boxes, names):
    """Determine who each person is looking at based on gaze points."""
    gaze_targets = []
    for i, (gx, gy) in enumerate(gaze_points):
        target = None
        min_distance = float('inf')
        for j, bbox in enumerate(bounding_boxes):
            if i == j:
                continue
            x, y, w, h = bbox
            bbox_center_x = x + w / 2
            bbox_center_y = y + h / 2
            distance = np.sqrt((gx - bbox_center_x) ** 2 + (gy - bbox_center_y) ** 2)
            if distance < min_distance:
                min_distance = distance
                target = names[j]
        gaze_targets.append(target)
    return gaze_targets

# Main script
def main():
    # Initialize models
    model, transform = initialize_gaze_model()

    # YouTube video URL
    youtube_url = "https://www.youtube.com/watch?v=96Y6mc3C1Bg"
    video_url = get_video_url(youtube_url)
    print("Video URL:", video_url)

    # Open the YouTube video stream
    cap = cv2.VideoCapture(video_url)
    if not cap.isOpened():
        raise RuntimeError("Error: Could not open the YouTube video stream.")

    # Load known face embeddings
    known_faces = load_known_faces(KNOWN_FACES)

    # Load face cascade classifier
    face_cascade = cv2.CascadeClassifier(CASCADE_PATH)

    frame_counter = 0

    try:
        print("Processing YouTube video. Press 'q' to quit.")
        while True:
            ret, frame = cap.read()
            if not ret:
                print("End of video or error reading the video stream.")
                break

            frame_counter += 1

            if frame_counter % FRAME_SKIP == 0:
                faces = detect_faces(face_cascade, frame)
                face_embeddings = []
                bounding_boxes = []
                face_names = []

                for (x, y, w, h) in faces:
                    face_roi = frame[y:y + h, x:x + w]
                    if face_roi.size == 0:
                        continue

                    embedding = get_face_embedding(face_roi)
                    if embedding is None:
                        continue

                    match = classify_faces_uniquely([embedding], known_faces)
                    if match:
                        name, _ = match[0]
                        face_names.append(name)
                        bounding_boxes.append((x, y, w, h))
                        face_embeddings.append(embedding)

                # Detect gaze
                gaze_points = detect_gaze(model, transform, frame, bounding_boxes)
                gaze_targets = determine_gaze_target(gaze_points, bounding_boxes, face_names)

                # Draw bounding boxes and gaze lines
                for i, bbox in enumerate(bounding_boxes):
                    x, y, w, h = bbox
                    name = face_names[i]
                    gaze_x, gaze_y = gaze_points[i]
                    target = gaze_targets[i]

                    color = (0, 255, 0)
                    cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
                    cv2.putText(frame, f"{name} -> {target}", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
                    cv2.line(frame, (x + w // 2, y + h // 2), (int(gaze_x), int(gaze_y)), color, 2)

            # Display the resulting frame
            cv2.imshow('Face Recognition with Emotion and Gaze', frame)
            time.sleep(0.04)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                print("Exiting video processing.")
                break
    finally:
        cap.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


2025-01-24 01:31:18.311165: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


ImportError: cannot import name '__version__' from 'retinaface' (unknown location)

In [2]:
%pip install deepface
%pip install tensorflow


KeyboardInterrupt: 