In [None]:
pip install ultralytics opencv-python

In [None]:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
import imageio
from ultralytics import YOLO
import cv2
import random

VIDEO CREDIT.https://youtu.be/5rkwqp6nnr4?si=jdkWhgZ9adCqKLTk by MARCH NETWORKS:

Although I had only one video, I simulated a multi-camera setup by dividing the video into two zones — representing different camera views. By tracking people as they move from one zone to another using unique IDs, I ensured accurate counting without duplication and enabled real-time queue monitoring.

In [None]:
input_path = r"/kaggle/input/quevideos/que.mp4"
output_path = r"/kaggle/working/step4_fully_customizable_output.mp4"

def get_video_details(video_path):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error: Cannot open video file.")
        return

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = frame_count / fps if fps > 0 else 0

    # print details
    print(f"Resolution: {width}x{height}")
    print(f"FPS: {fps}")
    print(f"Total Frames: {frame_count}")
    print(f"Duration (seconds): {duration:.2f}")

    if height >= 1080:
        quality = "1080p (Full HD or higher)"
    elif height >= 720:
        quality = "720p (HD)"
    elif height >= 480:
        quality = "480p (SD)"
    else:
        quality = "Lower than 480p"

    print(f"Quality Category: {quality}")

    cap.release()


get_video_details(input_path)

def display_middle_300(video_path):
    reader = imageio.get_reader(video_path)
    total_frames = reader.count_frames()
    start_frame = max(0, total_frames // 2 - 150)

    fig = plt.figure(figsize=(6, 6), dpi=100)
    plt.subplots_adjust(left=0, right=1, top=1, bottom=0)
    plt.axis('off')

    mov = []
    for i, frame in enumerate(reader):
        if i < start_frame:
            continue
        if i >= start_frame + 300:
            break
        img = plt.imshow(frame, animated=True)
        plt.axis('off')
        mov.append([img])

    reader.close()

    anime = animation.ArtistAnimation(fig, mov, interval=50, repeat_delay=1000)
    plt.close()
    return anime

HTML(display_middle_300(input_path).to_html5_video())

## Conceptualize the Camera Setup

### Video Source

The video used in this project is taken from **YouTube**, uploaded by **March Networks**. It shows a queue from a **top-side camera view** set up above a cashier window.


### Camera Setup (based on the video)

* The camera is placed **above and behind the cashier**.
* It looks down at the line of people **coming from the left and moving toward the right**.
* We can see people’s **full bodies** from this angle.

### Number of Cameras

* The original task asks for at least **2 camera views**, but I only had **one video**.
* To solve this, I **split the video frame into two zones**:

  * **Camera 1** (left side) = where people enter the queue.
  * **Camera 2** (right side) = where people reach the counter.
* These zones act like **two different cameras**, helping us **track people across the queue**.


### Visibility and Method Used

* I used **YOLOv8m** (an AI model) to **detect and track people** by drawing boxes around them.
* It follows each person using an ID so no one gets counted twice.


In [None]:
from ultralytics import YOLO
import cv2
import random

# Load YOLOv8 model
model = YOLO("yolov8m.pt")
output_path = r"/kaggle/working/step4_fully_customizable_output.mp4"

cap = cv2.VideoCapture(input_path)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

zone_divider = width // 2
id_first_seen = {}
frame_number = 0

# configuration controls
BOX_THICKNESS = 2
BOX_FONT_SCALE = 0.5
BOX_FONT_THICKNESS = 1
BOX_TEXT_BG_PADDING = 3
FONT = cv2.FONT_HERSHEY_SIMPLEX

CASHIER_BOX_THICKNESS = 2
CASHIER_BOX_COLOR = (0, 0, 255)  # Red
CASHIER_TEXT = "Cashier"

COUNT_FONT_SCALE = 0.8
COUNT_FONT_THICKNESS = 2
ZONE1_COLOR = (255, 100, 100)
ZONE2_COLOR = (100, 255, 100)


QUEUE_AREA_COLOR = (0, 255, 255)
QUEUE_AREA_THICKNESS = 2
QUEUE_COLOR = (255, 255, 0)


# cashier area (bottom right corner)
cashier_area = (int(width * 0.75), int(height * 0.7), width, height)

# define the queue area as the full width and vertical region just in front of cashier area
queue_area_top = cashier_area[1] - 135 # if cashier_area[1] - 150 > 0 else 0
queue_area_bottom = cashier_area[3]
queue_area = (0, queue_area_top, width, queue_area_bottom)

def generate_color(id_num):
    random.seed(id_num)
    return (random.randint(50, 255), random.randint(50, 255), random.randint(50, 255))

def is_inside_area(cx, cy, area):
    x1, y1, x2, y2 = area
    return x1 <= cx <= x2 and y1 <= cy <= y2

def boxes_intersect(box1, box2):
    x1_min, y1_min, x1_max, y1_max = box1
    x2_min, y2_min, x2_max, y2_max = box2

    if x1_max < x2_min or x2_max < x1_min:
        return False
    if y1_max < y2_min or y2_max < y1_min:
        return False
    return True

# start processing the video
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    frame_number += 1

    results = model.track(frame, persist=True, classes=[0], verbose=False)
    annotated_frame = frame.copy()

    zone1_count = 0
    zone2_count = 0
    total_in_queue = 0

    for box in results[0].boxes:
        if box.id is None:
            continue

        track_id = int(box.id.item())
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        cx, cy = (x1 + x2) // 2, (y1 + y2) // 2

        in_cashier = is_inside_area(cx, cy, cashier_area)
        in_queue_area = boxes_intersect((x1, y1, x2, y2), queue_area)

        # start timer for new IDs
        if track_id not in id_first_seen:
            id_first_seen[track_id] = frame_number

        time_waited = (frame_number - id_first_seen[track_id]) / fps  # in seconds
        confidence = box.conf.item() if box.conf is not None else 0

        # only draw boxes if person is in cashier or queue area
        if in_cashier or in_queue_area:

            box_color = CASHIER_BOX_COLOR if in_cashier else generate_color(track_id)
            label_text = f"ID {track_id} | {confidence * 100:.1f}% | "

            if in_cashier:
                label_text += "Cashier"
            else:
                label_text += f"Queue | {time_waited:.1f}s"

                # Count zones for queue
                if cx < zone_divider:
                    zone1_count += 1
                else:
                    zone2_count += 1
                total_in_queue += 1

            # draw bounding box
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), box_color, BOX_THICKNESS)

            # draw label background and text
            (text_width, text_height), _ = cv2.getTextSize(label_text, FONT, BOX_FONT_SCALE, BOX_FONT_THICKNESS)
            text_x = x1
            text_y = y1 + text_height + 5 if y1 + text_height + 5 < y2 else y1 - 10
            bg_top_left = (text_x, text_y - text_height - BOX_TEXT_BG_PADDING)
            bg_bottom_right = (text_x + text_width, text_y + BOX_TEXT_BG_PADDING)
            cv2.rectangle(annotated_frame, bg_top_left, bg_bottom_right, box_color, -1)
            cv2.putText(annotated_frame, label_text, (text_x, text_y), FONT, BOX_FONT_SCALE, (255, 255, 255), BOX_FONT_THICKNESS)


    # draw cashier area
    cv2.rectangle(annotated_frame,
                  (cashier_area[0], cashier_area[1]),
                  (cashier_area[2], cashier_area[3]),
                  CASHIER_BOX_COLOR, CASHIER_BOX_THICKNESS)
    cv2.putText(annotated_frame, "Cashier Area", (cashier_area[0], cashier_area[1] - 10),
                FONT, 0.7, CASHIER_BOX_COLOR, 1)

    # draw queue area
    cv2.rectangle(annotated_frame,
                  (queue_area[0], queue_area[1]),
                  (queue_area[2], queue_area[3]),
                  QUEUE_AREA_COLOR, QUEUE_AREA_THICKNESS)
    cv2.putText(annotated_frame, "Queue Area", (queue_area[0], queue_area[1] - 10),
                FONT, 0.7, QUEUE_AREA_COLOR, 1)

    # draw divider line between zones
    cv2.line(annotated_frame, (zone_divider, 0), (zone_divider, height), QUEUE_AREA_COLOR, 1)


    # display counts on frame
    cv2.putText(annotated_frame, f"Camera 1 Person Count: {zone1_count}", (20, 40),
                FONT, COUNT_FONT_SCALE, ZONE1_COLOR, COUNT_FONT_THICKNESS)
    cv2.putText(annotated_frame, f"Camera 2 Person Count: {zone2_count}", (20, 80),
                FONT, COUNT_FONT_SCALE, ZONE2_COLOR, COUNT_FONT_THICKNESS)
    cv2.putText(annotated_frame, f"Total persons in Queue: {total_in_queue}", (20, 120),
                FONT, COUNT_FONT_SCALE, (255, 255, 255), COUNT_FONT_THICKNESS)

    out.write(annotated_frame)

cap.release()
out.release()
print(f"✅ Updated annotated video with waiting time saved to: {output_path}")

In [None]:
output_path = r"/kaggle/working/step4_fully_customizable_output.mp4"
def display_middle_300(video_path):
    reader = imageio.get_reader(video_path)
    total_frames = reader.count_frames()
    start_frame = max(0, total_frames // 2 - 150)

    fig = plt.figure(figsize=(6, 6), dpi=100)
    plt.subplots_adjust(left=0, right=1, top=1, bottom=0)
    plt.axis('off')

    mov = []
    for i, frame in enumerate(reader):
        if i < start_frame:
            continue
        if i >= start_frame + 300:
            break
        img = plt.imshow(frame, animated=True)
        plt.axis('off')
        mov.append([img])

    reader.close()

    anime = animation.ArtistAnimation(fig, mov, interval=50, repeat_delay=1000)
    plt.close()
    return anime

HTML(display_middle_300(output_path).to_html5_video())

In [None]:
output_path = r"/kaggle/input/clear-detection/clear_boundary.mp4"
def display_middle_300(video_path):
    reader = imageio.get_reader(video_path)
    total_frames = reader.count_frames()
    start_frame = max(0, total_frames // 2 - 150)

    fig = plt.figure(figsize=(6, 6), dpi=100)
    plt.subplots_adjust(left=0, right=1, top=1, bottom=0)
    plt.axis('off')
    mov = []
    for i, frame in enumerate(reader):
        if i < start_frame:
            continue
        if i >= start_frame + 300:
            break
        img = plt.imshow(frame, animated=True)
        plt.axis('off')
        mov.append([img])

    reader.close()

    anime = animation.ArtistAnimation(fig, mov, interval=50, repeat_delay=1000)
    plt.close()
    return anime

HTML(display_middle_300(output_path).to_html5_video())