In [1]:
import json
import os
import time
import torch
import cv2
import numpy as np
import supervision as sv

from collections import defaultdict
from typing import Callable
from ultralytics import YOLO
from supervision import VideoInfo, VideoSink, Detections

In [2]:
import sys
sys.path.append('../YOLOv6')

In [3]:
from yolov6.data.data_augment import letterbox
from yolov6.layers.common import DetectBackend
from yolov6.utils.nms import non_max_suppression
from yolov6.utils.events import load_yaml

In [4]:
from supervision import VideoInfo, VideoSink, Detections

In [None]:
def process_video(
    source_path: str,
    target_path: str,
    callback: Callable[[np.ndarray, int], np.ndarray],
    start: int = 0,
    end: int = 0,
    stride: int = 1,
) -> None:
    """
    Process a video file by applying a callback function on each frame
        and saving the result to a target video file.

    Args:
        source_path (str): The path to the source video file.
        target_path (str): The path to the target video file.
        callback (Callable[[np.ndarray, int], np.ndarray]): A function that takes in
            a numpy ndarray representation of a video frame and an
            int index of the frame and returns a processed numpy ndarray
            representation of the frame.

    Examples:
        ```python
        import supervision as sv

        def callback(scene: np.ndarray, index: int) -> np.ndarray:
            ...

        process_video(
            source_path='...',
            target_path='...',
            callback=callback
        )
        ```
    """
    source_video_info = VideoInfo.from_video_path(video_path=source_path)
    with VideoSink(target_path=target_path, video_info=source_video_info) as sink:
        for index, frame in enumerate(
            sv.get_video_frames_generator(source_path, start=start, end=end, stride=stride)
        ):
            result_frame = callback(frame, index)
            sink.write_frame(frame=result_frame)

In [6]:
def process_video(
    source_path: str,
    target_path: str,
    callback: Callable[[np.ndarray, int], np.ndarray],
    start: int = 0,
    end: int = 0,
    stride: int = 1,
) -> None:
    """
    Process a video file by applying a callback function on each frame
        and saving the result to a target video file.

    Args:
        source_path (str): The path to the source video file.
        target_path (str): The path to the target video file.
        callback (Callable[[np.ndarray, int], np.ndarray]): A function that takes in
            a numpy ndarray representation of a video frame and an
            int index of the frame and returns a processed numpy ndarray
            representation of the frame.

    Examples:
        ```python
        import supervision as sv

        def callback(scene: np.ndarray, index: int) -> np.ndarray:
            ...

        process_video(
            source_path='...',
            target_path='...',
            callback=callback
        )
        ```
    """
    source_video_info = VideoInfo.from_video_path(video_path=source_path)
    with VideoSink(target_path=target_path, video_info=source_video_info) as sink:
        for index, frame in enumerate(
            sv.get_video_frames_generator(source_path, start=start, end=end, stride=stride)
        ):
            result_frame = callback(frame, index)
            sink.write_frame(frame=result_frame)

In [7]:
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
device

device(type='mps')

In [8]:
# model = DetectBackend('../models/yolov6s.pt', device=device)
model = YOLO("yolo11s.pt")
model = model.to(device)

In [9]:
stride = model.stride
img_size = [1920, 1088]  # standard size
half = False
model.model.float()
frame_stride = 2

class_names = load_yaml('../datasets/coco.yaml')['names']

In [10]:
class_names

{0: 'person',
 1: 'bicycle',
 2: 'car',
 3: 'motorcycle',
 4: 'airplane',
 5: 'bus',
 6: 'train',
 7: 'truck',
 8: 'boat',
 9: 'traffic light',
 10: 'fire hydrant',
 11: 'stop sign',
 12: 'parking meter',
 13: 'bench',
 14: 'bird',
 15: 'cat',
 16: 'dog',
 17: 'horse',
 18: 'sheep',
 19: 'cow',
 20: 'elephant',
 21: 'bear',
 22: 'zebra',
 23: 'giraffe',
 24: 'backpack',
 25: 'umbrella',
 26: 'handbag',
 27: 'tie',
 28: 'suitcase',
 29: 'frisbee',
 30: 'skis',
 31: 'snowboard',
 32: 'sports ball',
 33: 'kite',
 34: 'baseball bat',
 35: 'baseball glove',
 36: 'skateboard',
 37: 'surfboard',
 38: 'tennis racket',
 39: 'bottle',
 40: 'wine glass',
 41: 'cup',
 42: 'fork',
 43: 'knife',
 44: 'spoon',
 45: 'bowl',
 46: 'banana',
 47: 'apple',
 48: 'sandwich',
 49: 'orange',
 50: 'broccoli',
 51: 'carrot',
 52: 'hot dog',
 53: 'pizza',
 54: 'donut',
 55: 'cake',
 56: 'chair',
 57: 'couch',
 58: 'potted plant',
 59: 'bed',
 60: 'dining table',
 61: 'toilet',
 62: 'tv',
 63: 'laptop',
 64: 'mou

In [11]:
SELECTED_CLASS_NAMES = ['person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck']

In [12]:
SELECTED_CLASS_IDS = [
    {value: key for key, value in class_names.items()}[class_name]
    for class_name
    in SELECTED_CLASS_NAMES
]

In [13]:
with open('../src/coords.json') as f:
    coords = json.load(f)
    print(coords)

{'/Users/grzegorzsmereczniak/Documents/MyPW/data/monitoring_pw/15_04_25/7-8/aula1/resized/Aula_1_192.168.5.1_20250415074041_20250415083559_500661093.mp4': [[[0, 1088], [1380, 1088]], [[1380, 1088], [1570, 380]], [[1570, 380], [1200, 215]], [[1200, 215], [1070, 205]], [[1070, 205], [500, 350]], [[500, 350], [300, 450]], [[300, 450], [0, 850]], [[0, 1088], [0, 850]]], '/Users/grzegorzsmereczniak/Documents/MyPW/data/monitoring_pw/15_04_25/7-8/aula2/resized/Aula_2_192.168.5.1_20250415074052_20250415083558_500637076.mp4': [[[0, 1088], [1400, 1088]], [[1400, 1088], [1550, 800]], [[1550, 800], [1470, 450]], [[1470, 450], [1350, 340]], [[1350, 340], [720, 250]], [[720, 250], [680, 280]], [[680, 280], [680, 360]], [[680, 360], [50, 700]], [[0, 1088], [50, 700]]], '/Users/grzegorzsmereczniak/Documents/MyPW/data/monitoring_pw/15_04_25/7-8/Br_A1/resized/BRAMA_A1_Rejestrator_2_20250415074006_20250415074556_502832788.mp4': [[[830, 685], [30, 900]], [[990, 600], [830, 685]], [[1920, 800], [990, 600]]

In [14]:
def process_frame(frame):
    image = letterbox(frame, img_size, stride=stride)[0]
    image = image.transpose((2, 0, 1))[::-1].copy()  # BGR to RGB, HWC to CHW
    image = torch.from_numpy(image).float() / 255.0
    image = image.unsqueeze(0)  # add batch dimension
    
    if half:
        image = image.half()
    
    image = image.to(device)
    return image

In [15]:
def get_results(model, frame):
    with torch.no_grad():
        pred = model(frame)
        det = non_max_suppression(pred, conf_thres=0.25, iou_thres=0.45,classes=[0])[0]
    
    xyxy = det[:, :4].cpu().numpy().astype('float32')
    confidence = det[:, 4].cpu().numpy().astype('float32')
    class_id = det[:, 5].cpu().numpy().astype(int)
    mask = None
    tracker_id = None

    return (xyxy, confidence, class_id, mask, tracker_id)

In [16]:
def get_detections(xyxy, confidence, class_id, mask, tracker_id):
    return Detections(
        xyxy=xyxy,
        confidence=confidence,
        class_id=class_id,
        mask=None,
        tracker_id=tracker_id
    )

In [17]:
def draw_in_out_window(frame: np.ndarray, in_counts: dict, out_counts: dict) -> np.ndarray:
    x, y = frame.shape[1] - 300, 30
    width, height = 270, 160
    overlay = frame.copy()

    # Semi-transparent background
    cv2.rectangle(overlay, (x, y), (x + width, y + height), (255, 255, 255), -1)
    alpha = 0.6
    frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)

    # Draw text
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 0.6
    font_color = (0, 0, 0)
    line_height = 25
    offset_y = y + 25

    for class_name in SELECTED_CLASS_NAMES:
        class_id = [k for k, v in class_names.items() if v == class_name]
        in_count = in_counts[class_id[0]] if class_id else 0
        out_count = out_counts[class_id[0]] if class_id else 0
        cv2.putText(frame, f"{class_name} In: {in_count}  Out: {out_count}",
                    (x + 10, offset_y), font, font_scale, font_color, 2)
        offset_y += line_height

    return frame


In [20]:
for source_path, coords_list in coords.items():
    if source_path == '/Users/grzegorzsmereczniak/Documents/MyPW/data/monitoring_pw/15_04_25/7-8/Br_Koszykowa/resized/Brama_Koszykowa_192.168.5.149_20250415074005_20250415075242_501697548.mp4':
        video_info = sv.VideoInfo.from_video_path(source_path)
        print(f'Video info: {video_info}')
        target_path = source_path.replace('data', 'results')
        line_coords = []
        line_zones = []
        total_in = defaultdict(int)
        total_out = defaultdict(int)

        print(f'Processing {source_path} -> {target_path}')
        for cs in coords_list:
            line_coords.append((sv.Point(*cs[0]), sv.Point(*cs[1])))

        print(f'Lines coords: {line_coords}')

        for line_coord in line_coords:
            line_zones.append(sv.LineZone(start=line_coord[0], end=line_coord[1]))

        
        zone_class_counts = [
            {"in": defaultdict(int), "out": defaultdict(int)}
            for _ in line_zones
        ]

        # create instance of BoxAnnotator, LabelAnnotator, and TraceAnnotator
        box_annotator = sv.BoxAnnotator(thickness=4)
        label_annotator = sv.LabelAnnotator(text_thickness=2, text_scale=1.5, text_color=sv.Color.BLACK)
        trace_annotator = sv.TraceAnnotator(thickness=4, trace_length=50)

        # create LineZoneAnnotator instance
        line_zone_annotators = [
            sv.LineZoneAnnotator(thickness=4, text_thickness=4, text_scale=0, text_orient_to_line=True)
            for _ in line_zones
        ]

        start_time = time.time()

        # create BYTETracker instance
        byte_tracker = sv.ByteTrack(
            track_activation_threshold=0.25,
            lost_track_buffer=30,
            minimum_matching_threshold=0.8,
            frame_rate=video_info.fps,
            minimum_consecutive_frames=3)

        byte_tracker.reset()

        # def callback(frame: np.ndarray, index: int) -> np.ndarray:
        #     detections = get_detections(*get_results(model, process_frame(frame)))
        #     detections = detections[np.isin(detections.class_id, SELECTED_CLASS_IDS)]
        #     detections = byte_tracker.update_with_detections(detections)

        #     labels = [
        #         f"#{tracker_id} {class_names[class_id]} {confidence:0.2f}"
        #         for confidence, class_id, tracker_id
        #         in zip(detections.confidence, detections.class_id, detections.tracker_id)
        #     ]

        #     annotated_frame = frame.copy()
        #     annotated_frame = trace_annotator.annotate(scene=annotated_frame, detections=detections)
        #     annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections)
        #     annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)

        #     # === Dla każdej linii wykonaj trigger + annotate
        #     for zone, annotator in zip(line_zones, line_zone_annotators):
        #         zone.trigger(detections)
        #         annotated_frame = annotator.annotate(annotated_frame, line_counter=zone)

        #     return annotated_frame
        
        def callback(frame: np.ndarray, index: int) -> np.ndarray:
            results = model(frame, conf=0.1, verbose=False)[0]
            detections = sv.Detections.from_ultralytics(results)
            detections = detections[np.isin(detections.class_id, SELECTED_CLASS_IDS)]
            detections = byte_tracker.update_with_detections(detections)

            labels = [
                f"#{tracker_id} {class_names[class_id]} {confidence:0.2f}"
                for confidence, class_id, tracker_id
                in zip(detections.confidence, detections.class_id, detections.tracker_id)
            ]

            annotated_frame = frame.copy()
            annotated_frame = trace_annotator.annotate(scene=annotated_frame, detections=detections)
            annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections)
            annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)

            # === Dla każdej linii wykonaj trigger + annotate
            for i, (zone, annotator) in enumerate(zip(line_zones, line_zone_annotators)):
                crossed_in, crossed_out = zone.trigger(detections)
                annotated_frame = annotator.annotate(annotated_frame, line_counter=zone)

                for class_id, is_in in zip(detections.class_id, crossed_in):
                    if is_in:
                        zone_class_counts[i]["in"][class_id] += 1

                for class_id, is_out in zip(detections.class_id, crossed_out):
                    if is_out:
                        zone_class_counts[i]["out"][class_id] += 1

            # compute max across zones
            max_in = defaultdict(int)
            max_out = defaultdict(int)

            for zone_counts in zone_class_counts:
                for class_id, count in zone_counts["in"].items():
                    max_in[class_id] = max(max_in[class_id], count)
                for class_id, count in zone_counts["out"].items():
                    max_out[class_id] = max(max_out[class_id], count)

            annotated_frame = draw_in_out_window(annotated_frame, max_in, max_out)

            return annotated_frame

        
        process_video(
            source_path = source_path,
            target_path = target_path,
            callback=callback,
            start=0,
            end=None,
            stride=2
        )

        end_time = time.time()
        elapsed_time = end_time - start_time

        print(f"Processed {source_path} in {elapsed_time:.2f} seconds")
        break

        # generator = sv.get_video_frames_generator(source_path, stride=stride)

        # box_annotator = sv.BoxAnnotator(thickness=4)
        # label_annotator = sv.LabelAnnotator(text_thickness=2, text_scale=1.5, text_color=sv.Color.BLACK)

        # iterator = iter(generator)
        # frame = next(iterator)
        # processed_frame = process_frame(frame)

Video info: VideoInfo(width=1920, height=1088, fps=14, total_frames=11344)
Processing /Users/grzegorzsmereczniak/Documents/MyPW/data/monitoring_pw/15_04_25/7-8/Br_Koszykowa/resized/Brama_Koszykowa_192.168.5.149_20250415074005_20250415075242_501697548.mp4 -> /Users/grzegorzsmereczniak/Documents/MyPW/results/monitoring_pw/15_04_25/7-8/Br_Koszykowa/resized/Brama_Koszykowa_192.168.5.149_20250415074005_20250415075242_501697548.mp4
Lines coords: [(Point(x=1010, y=230), Point(x=0, y=800)), (Point(x=1500, y=200), Point(x=1010, y=230)), (Point(x=1800, y=260), Point(x=500, y=1088))]
Processed /Users/grzegorzsmereczniak/Documents/MyPW/data/monitoring_pw/15_04_25/7-8/Br_Koszykowa/resized/Brama_Koszykowa_192.168.5.149_20250415074005_20250415075242_501697548.mp4 in 236.78 seconds
