In [1]:
import cv2
import numpy as np
import threading
import queue
import time

BUFFER_SIZE = 5  # Number of frames to buffer between producer and consumer
DISPLAY_FPS = 30 # Target FPS for display. Setting this too high might cause stuttering.

hog = cv2.HOGDescriptor()
hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())

# Queues for inter-thread communication
frame_queue = queue.Queue(maxsize=BUFFER_SIZE)
processed_frame_queue = queue.Queue(maxsize=BUFFER_SIZE)

# Event to signal when the video reading is done
video_finished = threading.Event()
processing_finished = threading.Event()

# --- Producer Thread: Reads frames from video ---
def frame_reader(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        video_finished.set()
        return

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_queue.put(frame)
    
    cap.release()
    video_finished.set()
    print("Frame reader finished.")

# --- Consumer Thread: Processes frames ---
def frame_processor():
    while True:
        if video_finished.is_set() and frame_queue.empty():
            break

        try:
            # This prevents the thread from blocking indefinitely if the producer is slow
            # or has finished and there are no more frames.
            frame = frame_queue.get(timeout=0.1) 
        except queue.Empty:
            continue # No frame available, try again

        frame_resized = cv2.resize(frame, (640, 480))

        boxes, weights = hog.detectMultiScale(frame_resized, winStride=(8,8), scale=1.02, padding=(4,4))

        for i, (x,y,w,h) in enumerate(boxes):
            if weights[i] < 0.13:
                continue
            #elif weights[i] < 0.3 and weights[i] > 0.13:
            #    cv2.rectangle(frame_resized, (x, y), (x+w, y+h), (0, 0, 255), 2)
            #elif weights[i] < 0.7 and weights[i] > 0.3:
            #    cv2.rectangle(frame_resized, (x, y), (x+w, y+h), (50, 122, 255), 2)
            elif weights[i] > 0.7:
                cv2.rectangle(frame_resized, (x, y), (x+w, y+h), (0, 255, 0), 2)

        cv2.putText(frame_resized, 'High', (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        #cv2.putText(frame_resized, 'Moderate', (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (50, 122, 255), 2)
        #cv2.putText(frame_resized, 'Low', (10, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        
        processed_frame_queue.put(frame_resized)
        frame_queue.task_done()
    
    processing_finished.set() # Signal that processing is complete
    print("Frame processor finished.")

def main():
    video_path = '../853889-hd_1920_1080_25fps.mp4'

    # Start the producer thread
    reader_thread = threading.Thread(target=frame_reader, args=(video_path,))
    reader_thread.daemon = True # Allow the main program to exit even if this thread is running
    reader_thread.start()

    # Start the consumer thread
    processor_thread = threading.Thread(target=frame_processor)
    processor_thread.daemon = True
    processor_thread.start()

    last_display_time = time.time()

    while True:
        if video_finished.is_set() and processing_finished.is_set() and processed_frame_queue.empty():
            break
        
        try:
            # Get processed frame for display
            frame_to_display = processed_frame_queue.get(timeout=0.1) 
        except queue.Empty:
            # If the queue is empty, wait a bit before trying again to avoid busy-waiting
            time.sleep(0.01) 
            continue

        cv2.imshow("Frame", frame_to_display)
        processed_frame_queue.task_done() # Indicate that a task from processed_frame_queue is done

        key = cv2.waitKey(1)
        if key == 27: # ESC key to exit
            break

    print("Exiting main display loop.")

    cv2.destroyAllWindows()
    print("Program finished.")

if __name__ == "__main__":
    main()

Frame reader finished.
Frame processor finished.
Exiting main display loop.
Program finished.


In [None]:
import cv2
import mediapipe as mp
import numpy as np
import threading
import queue
import time

# https://ai.google.dev/edge/mediapipe/solutions/vision/hand_landmarker

# --- High-Five Detection Logic (from previous response) ---
def is_all_fingers_up(hand_landmarks):
    """
    Checks if all fingers (thumb, index, middle, ring, pinky) are extended upwards
    in a high-five pose.

    Args:
        hand_landmarks: A MediaPipe HandLandmark object containing the 3D coordinates
                        of the hand landmarks.

    Returns:
        True if all fingers are detected as up, False otherwise.
    """

    if not hand_landmarks:
        return False

    fingers = {
        'thumb': [mp_hands.HandLandmark.THUMB_CMC, mp_hands.HandLandmark.THUMB_MCP, mp_hands.HandLandmark.THUMB_IP, mp_hands.HandLandmark.THUMB_TIP],
        'index': [mp_hands.HandLandmark.INDEX_FINGER_MCP, mp_hands.HandLandmark.INDEX_FINGER_PIP, mp_hands.HandLandmark.INDEX_FINGER_DIP, mp_hands.HandLandmark.INDEX_FINGER_TIP],
        'middle': [mp_hands.HandLandmark.MIDDLE_FINGER_MCP, mp_hands.HandLandmark.MIDDLE_FINGER_PIP, mp_hands.HandLandmark.MIDDLE_FINGER_DIP, mp_hands.HandLandmark.MIDDLE_FINGER_TIP],
        'ring': [mp_hands.HandLandmark.RING_FINGER_MCP, mp_hands.HandLandmark.RING_FINGER_PIP, mp_hands.HandLandmark.RING_FINGER_DIP, mp_hands.HandLandmark.RING_FINGER_TIP],
        'pinky': [mp_hands.HandLandmark.PINKY_MCP, mp_hands.HandLandmark.PINKY_PIP, mp_hands.HandLandmark.PINKY_DIP, mp_hands.HandLandmark.PINKY_TIP],
    }

    angle_threshold = 30  # degrees - fine-tune this as needed

    all_fingers_extended = True
    wrist_y = hand_landmarks.landmark[mp_hands.HandLandmark.WRIST].y

    for finger_name, landmarks in fingers.items():
        if finger_name == 'thumb':
            mcp = hand_landmarks.landmark[landmarks[1]]
            ip = hand_landmarks.landmark[landmarks[2]]
            tip = hand_landmarks.landmark[landmarks[3]]

            if tip.y > mcp.y + (wrist_y - mcp.y) * 0.2:
                all_fingers_extended = False
                break

            vec_mcp_ip = np.array([ip.x - mcp.x, ip.y - mcp.y])
            vec_ip_tip = np.array([tip.x - ip.x, tip.y - ip.y])

            dot_product = np.dot(vec_mcp_ip, vec_ip_tip)
            magnitude_mcp_ip = np.linalg.norm(vec_mcp_ip)
            magnitude_ip_tip = np.linalg.norm(vec_ip_tip)

            if magnitude_mcp_ip == 0 or magnitude_ip_tip == 0:
                continue

            angle_rad = np.arccos(dot_product / (magnitude_mcp_ip * magnitude_ip_tip))
            angle_deg = np.degrees(angle_rad)

            if angle_deg > 160:
                 all_fingers_extended = False
                 break
        else:
            mcp = hand_landmarks.landmark[landmarks[0]]
            pip = hand_landmarks.landmark[landmarks[1]]
            dip = hand_landmarks.landmark[landmarks[2]]
            tip = hand_landmarks.landmark[landmarks[3]]

            if tip.y > pip.y or pip.y > mcp.y:
                all_fingers_extended = False
                break

            vec_mcp_pip = np.array([pip.x - mcp.x, pip.y - mcp.y, pip.z - mcp.z])
            vec_pip_dip = np.array([dip.x - pip.x, dip.y - pip.y, dip.z - pip.z])
            angle_pip = np.degrees(np.arccos(np.dot(vec_mcp_pip, vec_pip_dip) / (np.linalg.norm(vec_mcp_pip) * np.linalg.norm(vec_pip_dip) + 1e-6)))

            vec_pip_dip_2 = np.array([dip.x - pip.x, dip.y - pip.y, dip.z - pip.z])
            vec_dip_tip = np.array([tip.x - dip.x, tip.y - dip.y, tip.z - dip.z])
            angle_dip = np.degrees(np.arccos(np.dot(vec_pip_dip_2, vec_dip_tip) / (np.linalg.norm(vec_pip_dip_2) * np.linalg.norm(vec_dip_tip) + 1e-6)))

            if angle_pip > angle_threshold or angle_dip > angle_threshold:
                all_fingers_extended = False
                break

    return all_fingers_extended

# --- Global Flags and Queues ---
cap_queue = queue.Queue(maxsize=10) # Queue for raw frames from camera
processed_queue = queue.Queue(maxsize=10) # Queue for processed frames to display
running = True

# --- MediaPipe Initialization ---
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

# --- Thread Functions ---

def frame_capturer(cap, frame_queue, running_flag):
    """
    Thread function to capture frames from the camera.
    """
    while running_flag.is_set():
        success, frame = cap.read()
        if not success:
            print("Ignoring empty camera frame from capturer.")
            time.sleep(0.01) # Avoid busy-waiting
            continue
        try:
            frame_queue.put(frame, block=False) # Use non-blocking put
        except queue.Full:
            # Optionally, drop the oldest frame if the queue is full to prioritize fresh frames
            # print("Frame capture queue full, dropping frame.")
            pass
    print("Frame capturer thread stopped.")

def frame_processor(input_queue, output_queue, running_flag):
    """
    Thread function to process frames using MediaPipe.
    """
    with mp_hands.Hands(
        min_detection_confidence=0.7,
        min_tracking_confidence=0.5,
        max_num_hands=1) as hands:
        while running_flag.is_set():
            try:
                image = input_queue.get(timeout=0.1) # Get with a timeout to allow graceful exit
            except queue.Empty:
                continue

            # Flip the image horizontally for a natural selfie-view display.
            image = cv2.flip(image, 1)

            image.flags.writeable = False
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            results = hands.process(image)

            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            high_five_detected = False
            if results.multi_hand_landmarks:
                for hand_landmarks in results.multi_hand_landmarks:
                    mp_drawing.draw_landmarks(
                        image,
                        hand_landmarks,
                        mp_hands.HAND_CONNECTIONS,
                        mp_drawing_styles.get_default_hand_landmarks_style(),
                        mp_drawing_styles.get_default_hand_connections_style())

                    if is_all_fingers_up(hand_landmarks):
                        high_five_detected = True
                        break

            try:
                output_queue.put((image, high_five_detected), block=False)
            except queue.Full:
                # print("Processed frame queue full, dropping frame.")
                pass
    print("Frame processor thread stopped.")

# --- Main Execution ---
if __name__ == "__main__":
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open video stream.")
        exit()

    # Use a threading.Event to signal threads to stop
    running_event = threading.Event()
    running_event.set() # Set the event to true initially

    # Start the threads
    capturer_thread = threading.Thread(target=frame_capturer, args=(cap, cap_queue, running_event))
    processor_thread = threading.Thread(target=frame_processor, args=(cap_queue, processed_queue, running_event))

    capturer_thread.start()
    processor_thread.start()

    print("Threads started. Press 'ESC' to exit.")

    while running_event.is_set():
        try:
            # Get the latest processed frame for display
            display_image, high_five_status = processed_queue.get(timeout=0.05)
            
            if high_five_status:
                cv2.putText(display_image, "HIGH FIVE!", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3, cv2.LINE_AA)
            else:
                cv2.putText(display_image, "No High Five", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 0, 255), 3, cv2.LINE_AA)

            cv2.imshow('MediaPipe Hands - High Five Detector (Multithreaded)', display_image)

            if cv2.waitKey(5) & 0xFF == 27:
                running_event.clear() # Signal threads to stop
                break
        except queue.Empty:
            # No frame available for display yet, continue loop
            continue
        except Exception as e:
            print(f"An error occurred in main loop: {e}")
            running_event.clear()
            break

    print("Main loop ending, waiting for threads to finish...")
    capturer_thread.join()
    processor_thread.join()
    print("All threads finished.")

    cap.release()
    cv2.destroyAllWindows()
    print("Application closed.")

Threads started. Press 'ESC' to exit.
Main loop ending, waiting for threads to finish...
Frame processor thread stopped.
Frame capturer thread stopped.
All threads finished.
Application closed.
