In [None]:
import cv2
import mediapipe as mp
from collections import deque
import os

def live_pose_video_splitter(
    output_dir="output_videos",
    pose_detection_confidence=0.2,
    pose_tracking_confidence=0.2,
    buffer_size=60,
    max_frames_no_person_after=60,
    fourcc_str="mp4v",
    fps=30,
    consecutive_frames_needed=5
):
    """
    Live monitoring of webcam to split into two videos whenever a person is detected:
      1) no_person_<ID>.mp4
      2) person_<ID>.mp4

    Debounced logic:
      - Keep a rolling buffer of `buffer_size` frames.
      - Require a person to be detected for `consecutive_frames_needed` consecutive frames
        before we decide "a person is in frame".
      - Similarly, require `consecutive_frames_needed` consecutive "no-person" frames
        before deciding "the person has left."

    Process in detail:
      1) In the idle state (no videos yet), keep buffering. 
      2) When we detect a person for N consecutive frames:
         - Start new videos (no_person_<ID>.mp4, person_<ID>.mp4).
         - Dump the buffer into the no_person video (the "before" segment).
         - Write ongoing frames to the person video while the person remains in frame.
      3) Once the person is absent for N consecutive frames:
         - Start counting up to `max_frames_no_person_after`.
         - Write those frames to the no_person video.
         - Close both videos. Increment video_id.
         - Return to idle state.
    """

    os.makedirs(output_dir, exist_ok=True)

    # Initialize Mediapipe Pose
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(
        static_image_mode=False,
        model_complexity=1,
        enable_segmentation=False,
        min_detection_confidence=pose_detection_confidence,
        min_tracking_confidence=pose_tracking_confidence
    )

    # Initialize webcam capture
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open webcam.")
        return

    # Get frame dimensions
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Prepare video writer codec
    fourcc = cv2.VideoWriter_fourcc(*fourcc_str)

    # Rolling buffer for the last `buffer_size` frames
    buffer_frames = deque(maxlen=buffer_size)

    # State variables
    person_in_frame = False           # whether we've declared that a person is in frame
    consecutive_person_frames = 0     # how many consecutive frames we've seen a person
    consecutive_noperson_frames = 0   # how many consecutive frames we've seen no person

    # Video control
    video_id = 1
    out_no_person = None
    out_person = None
    recording_person = False
    recording_no_person = False
    no_person_after_count = 0

    def start_new_recordings():
        nonlocal out_no_person, out_person, video_id, recording_person, recording_no_person
        no_person_path = os.path.join(output_dir, f"no_person_{video_id}.mp4")
        person_path = os.path.join(output_dir, f"person_{video_id}.mp4")

        out_no_person = cv2.VideoWriter(no_person_path, fourcc, fps, (width, height))
        out_person = cv2.VideoWriter(person_path, fourcc, fps, (width, height))

        recording_no_person = True
        recording_person = True

        print(f"[INFO] Starting new recordings: {no_person_path}, {person_path}")

    def close_recordings():
        nonlocal out_no_person, out_person, recording_person, recording_no_person
        if out_no_person is not None:
            out_no_person.release()
            out_no_person = None
        if out_person is not None:
            out_person.release()
            out_person = None
        recording_person = False
        recording_no_person = False

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("[INFO] Cannot read from camera or end of stream.")
                break

            # Pose detection
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose.process(rgb_frame)
            has_person = (results.pose_landmarks is not None)

            # Maintain rolling buffer
            buffer_frames.append(frame)

            # Update consecutive frame counters
            if has_person:
                consecutive_person_frames += 1
                consecutive_noperson_frames = 0
            else:
                consecutive_noperson_frames += 1
                consecutive_person_frames = 0

            # --- Handle transitions ---
            # 1. If we currently have no person_in_frame,
            #    check if we've seen a person for enough consecutive frames to trigger "enter."
            if not person_in_frame:
                if consecutive_person_frames >= consecutive_frames_needed:
                    # A person has "officially" entered
                    person_in_frame = True
                    no_person_after_count = 0  # reset

                    # Start new recordings for no_person & person
                    start_new_recordings()

                    # Dump the buffer into the no_person video
                    if out_no_person:
                        for bf in buffer_frames:
                            out_no_person.write(bf)

            else:
                # 2. If we currently have person_in_frame == True,
                #    check if we've lost the person for enough consecutive frames to trigger "leave."
                if consecutive_noperson_frames >= consecutive_frames_needed:
                    # Person has "officially" left
                    person_in_frame = False
                    no_person_after_count = 0  # Will start counting after frames for the no_person tail

            # --- Recording logic ---
            if person_in_frame:
                # We are in the "person present" phase
                if recording_person and out_person:
                    out_person.write(frame)
            else:
                # Person not in frame
                if recording_person:
                    # The person has recently left, so we need to record the tail of no_person frames
                    no_person_after_count += 1
                    if recording_no_person and out_no_person:
                        out_no_person.write(frame)

                    # Once we've written the required tail frames, close the videos
                    if no_person_after_count >= max_frames_no_person_after:
                        close_recordings()
                        video_id += 1

                # else: We haven't yet detected a person at all (still in idle),
                #       or we finished a cycle and are waiting for the next person.
                #       Just keep buffering.

            # Show the live feed (optional)
            cv2.imshow("Live Feed", frame)
            if cv2.waitKey(1) & 0xFF == 27:  # ESC to quit
                break

    except KeyboardInterrupt:
        print("[INFO] Interrupted by user.")
    finally:
        # Cleanup
        close_recordings()
        cap.release()
        cv2.destroyAllWindows()
        print("[INFO] Exiting...")

if __name__ == "__main__":
    live_pose_video_splitter()


[INFO] Starting new recordings: output_videos\no_person_1.mp4, output_videos\person_1.mp4
[INFO] Starting new recordings: output_videos\no_person_2.mp4, output_videos\person_2.mp4
[INFO] Starting new recordings: output_videos\no_person_2.mp4, output_videos\person_2.mp4
[INFO] Interrupted by user.
[INFO] Exiting...


: 

In [4]:
import cv2

for idx in range(5):
    cap = cv2.VideoCapture(idx)
    if cap.isOpened():
        print(f"Camera index {idx} works!")
        cap.release()
    else:
        print(f"Camera index {idx} NOT accessible.")


Camera index 0 works!
Camera index 1 NOT accessible.
Camera index 2 NOT accessible.
Camera index 3 NOT accessible.
Camera index 4 NOT accessible.


In [3]:
import cv2
from collections import deque
import os

# Import the YOLO class from ultralytics
from ultralytics import YOLO

def live_yolo_video_splitter(
    output_dir="output_videos",
    yolo_model_path="yolov8n.pt",
    confidence_threshold=0.5,
    buffer_size=60,
    max_frames_no_person_after=60,
    fourcc_str="mp4v",
    fps=30,
    consecutive_frames_needed=5
):
    """
    Live monitoring of webcam to split into two videos whenever a person is detected:
    1) no_person_<ID>.mp4
    2) person_<ID>.mp4

    Debounced logic:
    - Keep a rolling buffer of `buffer_size` frames.
    - Require a person to be detected for `consecutive_frames_needed` consecutive frames
      before we decide "a person is in frame".
    - Similarly, require `consecutive_frames_needed` consecutive "no-person" frames
      before deciding "the person has left."

    Process in detail:
    1) In the idle state (no videos yet), keep buffering.
    2) When we detect a person for N consecutive frames:
        - Start new videos (no_person_<ID>.mp4, person_<ID>.mp4).
        - Dump the buffer into the no_person video (the "before" segment).
        - Write ongoing frames to the person video while the person remains in frame.
    3) Once the person is absent for N consecutive frames:
        - Start counting up to `max_frames_no_person_after`.
        - Write those frames to the no_person video.
        - Close both videos. Increment video_id.
        - Return to idle state.
    """

    # Create the output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Initialize the YOLOv8 model
    model = YOLO(yolo_model_path)

    # Initialize webcam capture
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open webcam.")
        return

    # Get frame dimensions
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Prepare video writer codec
    fourcc = cv2.VideoWriter_fourcc(*fourcc_str)

    # Rolling buffer for the last `buffer_size` frames
    buffer_frames = deque(maxlen=buffer_size)

    # State variables
    person_in_frame = False
    consecutive_person_frames = 0
    consecutive_noperson_frames = 0

    # Video control
    video_id = 1
    out_no_person = None
    out_person = None
    recording_person = False
    recording_no_person = False
    no_person_after_count = 0

    def start_new_recordings():
        nonlocal out_no_person, out_person, video_id, recording_person, recording_no_person
        no_person_path = os.path.join(output_dir, f"no_person_{video_id}.mp4")
        person_path = os.path.join(output_dir, f"person_{video_id}.mp4")

        out_no_person = cv2.VideoWriter(no_person_path, fourcc, fps, (width, height))
        out_person = cv2.VideoWriter(person_path, fourcc, fps, (width, height))

        recording_no_person = True
        recording_person = True

        print(f"[INFO] Starting new recordings:\n    {no_person_path}\n    {person_path}")

    def close_recordings():
        nonlocal out_no_person, out_person, recording_person, recording_no_person
        if out_no_person is not None:
            out_no_person.release()
            out_no_person = None
        if out_person is not None:
            out_person.release()
            out_person = None
        recording_person = False
        recording_no_person = False

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("[INFO] Cannot read from camera or end of stream.")
                break

            # Maintain rolling buffer
            buffer_frames.append(frame)

            # --- YOLOv8 inference ---
            results = model(frame, conf=confidence_threshold)
            # results is a list of 'Results' objects; we take the first since there's only one image
            # Check if there is at least one detection with class=0 (person)
            yolo_boxes = results[0].boxes  # Boxes object
            has_person = any(int(box.cls[0]) == 0 for box in yolo_boxes)

            # Update consecutive frame counters
            if has_person:
                consecutive_person_frames += 1
                consecutive_noperson_frames = 0
            else:
                consecutive_noperson_frames += 1
                consecutive_person_frames = 0

            # --- Handle state transitions ---
            if not person_in_frame:
                # If no person was previously declared, check if we've seen
                # a person for enough consecutive frames
                if consecutive_person_frames >= consecutive_frames_needed:
                    person_in_frame = True
                    no_person_after_count = 0

                    # Start recordings
                    start_new_recordings()

                    # Dump the buffer into the no_person video
                    if out_no_person:
                        for bf in buffer_frames:
                            out_no_person.write(bf)

            else:
                # If a person was in frame, check if they've been absent enough frames
                if consecutive_noperson_frames >= consecutive_frames_needed:
                    person_in_frame = False
                    no_person_after_count = 0  # Will begin counting tail frames

            # --- Recording logic ---
            if person_in_frame:
                # Still in "person present" phase
                if recording_person and out_person:
                    out_person.write(frame)
            else:
                # Person not in frame
                if recording_person:
                    # We recently lost the person, so we need to record the tail of no_person frames
                    no_person_after_count += 1
                    if recording_no_person and out_no_person:
                        out_no_person.write(frame)

                    # Once we've written the required tail frames, close the videos
                    if no_person_after_count >= max_frames_no_person_after:
                        close_recordings()
                        video_id += 1

                # else: idle state or in-between cycles, just buffer

            # Show the live feed (optional)
            cv2.imshow("Live Feed", frame)
            if cv2.waitKey(1) & 0xFF == 27:  # ESC to quit
                break

    except KeyboardInterrupt:
        print("[INFO] Interrupted by user.")
    finally:
        # Cleanup
        close_recordings()
        cap.release()
        cv2.destroyAllWindows()
        print("[INFO] Exiting...")


if __name__ == "__main__":
    live_yolo_video_splitter()
    


0: 480x640 1 person, 1 chair, 26.0ms
Speed: 1.0ms preprocess, 26.0ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 chair, 1 bed, 24.3ms
Speed: 1.0ms preprocess, 24.3ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 23.4ms
Speed: 1.0ms preprocess, 23.4ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 chair, 23.6ms
Speed: 1.0ms preprocess, 23.6ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 24.3ms
Speed: 1.0ms preprocess, 24.3ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)
[INFO] Starting new recordings:
    output_videos\no_person_1.mp4
    output_videos\person_1.mp4

0: 480x640 1 person, 1 tv, 23.9ms
Speed: 1.0ms preprocess, 23.9ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 chair, 1 tv, 24.0ms
Speed: 1.0ms preprocess, 24.0ms inference, 1.0ms postprocess pe