# Importing Libraries

In [2]:
import cv2
# Reading and writing video files.
# Displaying video frames in real-time (cv2.imshow).
# Drawing on frames.

from tqdm import tqdm # Tracking the progress of video frame processing.
from ultralytics import YOLO # a pretrained object detection model

import supervision as sv
# Handling video frames.
# Annotating frames with bounding boxes, labels, and traces.
# Defining and working with polygonal zones.

from typing import Dict, Iterable, List, Optional, Set
import numpy as np

In [3]:
COLORS = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"])

# Zone Movement Detection

In [4]:
# Polygon co-ordinates that mark the roads that lead into roundabout
ZONE_IN_POLYGONS = [
    np.array([[592, 282], [900, 282], [900, 82], [592, 82]]),
    np.array([[950, 860], [1250, 860], [1250, 1060], [950, 1060]]),
    np.array([[592, 582], [592, 860], [392, 860], [392, 582]]),
    np.array([[1250, 282], [1250, 530], [1450, 530], [1450, 282]]),
]

# Polygon co-ordinates that mark the roads that lead out of the roundabout
ZONE_OUT_POLYGONS = [
    np.array([[950, 282], [1250, 282], [1250, 82], [950, 82]]),
    np.array([[592, 860], [900, 860], [900, 1060], [592, 1060]]),
    np.array([[592, 282], [592, 550], [392, 550], [392, 282]]),
    np.array([[1250, 860], [1250, 560], [1450, 560], [1450, 860]]),
]

### Zone Initialization

- Zone initialization happens in the **initiate_polygon_zones** function. 
- It takes a list of polygon coordinates as input and wraps each polygon in an sv.PolygonZone object.
- The PolygonZone class:
  -  From the supervision library. Represents a polygon and provides utilities for detecting whether objects are inside it.
  - The triggering_anchors parameter specifies which part of the detected object's bounding box is checked for containment. Here, it's set to CENTER, meaning the center point of the bounding box must fall inside the polygon to trigger it.


In [5]:
def initiate_polygon_zones(
    polygons: List[np.ndarray],
    triggering_anchors: Iterable[sv.Position] = [sv.Position.CENTER],
) -> List[sv.PolygonZone]:
    return [
        sv.PolygonZone(
            polygon=polygon,
            triggering_anchors=triggering_anchors,
        )
        for polygon in polygons
    ]


### Zone Triggering

Zone triggering happens in the **process_frame** function within the **VideoProcessor** class, in the for loop:

```python
for zone_in, zone_out in zip(self.zones_in, self.zones_out):
    detections_in_zone = detections[zone_in.trigger(detections=detections)]
    detections_in_zones.append(detections_in_zone)
    detections_out_zone = detections[zone_out.trigger(detections=detections)]
    detections_out_zones.append(detections_out_zone)
```

Let's break it done using the first two lines of the loop.

- <u>zone_in.trigger(detections=detections)</u>
  -  Takes all detections at once and checks if the triggering anchor (e.g., the center of the bounding box) for each detection is inside the polygon (zone_in).
  -  Returns a boolean mask, i.e., a NumPy array of True/False values, with eEach value corresponding to whether the center of a detection's bounding box is inside the polygon. E.g for 5 detections, example output would be: [True, False, True, False, False].


-  <u>detections[zone_in.trigger(detections=detections)]</u>
   -  Returns a subset of the original detections, where the mask is True. From the above example, would return the 1st and 3rd detections.
   -  If all are false, it still returns a Detections object, just with empty tracker_id list
   -  A sample:

```python
[
    Detections(tracker_id=[101, 102], ...),  # Detections in ZONE_IN_POLYGONS[0]
    Detections(tracker_id=[], ...),     # No detections in ZONE_IN_POLYGONS[1]
    Detections(tracker_id=[103], ...),    # Detections in ZONE_IN_POLYGONS[2]
    Detections(tracker_id=[], ...),     # No detections in ZONE_IN_POLYGONS[3]
]
```

After the above line, the detections_in_zone is appended to a list (detections_in_zones), which collects the detections for all zone_in polygons in the frame. 
The same happens for zone_out detections. After identifying detections in entry and exit zones, the DetectionsManager class determines if the object transitioned from an entry zone to an exit zone.


<hr>

# Detection Mgt.

- A supervision.Detections object, a container for detection data, is made up of:
  - Bounding box coordinates for the detections that are inside the polygon.
  - Tracker IDs for those detections.
  - Class IDs (if available).

- The DetectionsManager class keeps track of which object (by tracker ID) entered which entry zone, and whether it exited via an exit zone. It also maintains counts of these transitions.

**PROPERTIES**
-  <u>tracker_id_to_zone_id</u>
   -  Dictionary mapping each object's unique tracker ID to the ID of the zone it entered.
   -  Example: {101: 0, 102: 2} meaning: Object with tracker ID 101 entered through ZONE_IN_POLYGONS[0], and object with tracker ID 102 entered through ZONE_IN_POLYGONS[2].
-  <u>counts</u>
   -  Nested dictionary that tracks transitions between zones: {zone_out_id: {zone_in_id: set(tracker_ids)}}.
   -  Example: {0: {2: {101, 105}}, 1: {3: {102}}} meaning: Two objects (IDs 101 and 105) moved from ZONE_IN_POLYGONS[2] to ZONE_OUT_POLYGONS[0], and one object (ID 102) moved from ZONE_IN_POLYGONS[3] to ZONE_OUT_POLYGONS[1].
  
- Using setDefault simplifies the process of ensuring that nested keys exist before modifying the dictionary.

In [6]:
class DetectionsManager:
    def __init__(self) -> None:
        self.tracker_id_to_zone_id: Dict[int, int] = {}
        self.counts: Dict[int, Dict[int, Set[int]]] = {}

    def update(
        self,
        detections_all: sv.Detections,
        detections_in_zones: List[sv.Detections],
        detections_out_zones: List[sv.Detections],
    ) -> sv.Detections:
        for zone_in_id, detections_in_zone in enumerate(detections_in_zones):
            for tracker_id in detections_in_zone.tracker_id:
                self.tracker_id_to_zone_id.setdefault(tracker_id, zone_in_id)

        for zone_out_id, detections_out_zone in enumerate(detections_out_zones):
            for tracker_id in detections_out_zone.tracker_id:
                if tracker_id in self.tracker_id_to_zone_id:
                    zone_in_id = self.tracker_id_to_zone_id[tracker_id]
                    self.counts.setdefault(zone_out_id, {})
                    self.counts[zone_out_id].setdefault(zone_in_id, set())
                    self.counts[zone_out_id][zone_in_id].add(tracker_id)
                    
        if len(detections_all) > 0:
            detections_all.class_id = np.vectorize(
                lambda x: self.tracker_id_to_zone_id.get(x, -1)
            )(detections_all.tracker_id)
        else:
            detections_all.class_id = np.array([], dtype=int)
        return detections_all[detections_all.class_id != -1]


- The final part of the update function above ensures that only detections with valid tracker_id mappings (i.e., those in the zones) are kept for further processing. It uses np.vectorize to map tracker_id to its associated zone_in_id:
  - If the tracker_id exists in tracker_id_to_zone_id, it assigns the corresponding zone_in_id.
  - Otherwise, it assigns -1 (invalid ID).
  - Finally filters out detections with class_id = -1 (those that did not pass through any ZONE_IN_POLYGONS).
- The final output is a list of filtered sv.Detections object containing only valid detections.

<hr>

# Video Processing

- In the earlier sections we covered the **process_frame** function that serves as the starting point of video processing.
- In **process_frame**, we deal with both object and zone detections. In this section we cover the remiander of process_frame i.e YOLO for object detection & ByteTrack for object tracking.

### YOLO

**results = self.model(frame, verbose=False, conf=self.conf_threshold, iou=self.iou_threshold)[0]**
<br>

**detections = sv.Detections.from_ultralytics(results)**

- YOLO processes the current frame and outputs a list of detected objects (bounding boxes, confidence scores, etc.).
- sv.Detections.from_ultralytics(results) converts the YOLO results into a supervision.Detections object.

**detections.class_id = np.zeros(len(detections))**
- Initializes the class_id field for the detections.

## ByteTrack

**detections = self.tracker.update_with_detections(detections)**
- The detections for the current frame are passed to ByteTrack for tracking.
- ByteTrack updates its internal state and assigns tracker_ids to the detections.
- How ByteTrack Matches Detections Across Frames
  - Matching with High-Confidence Detections
    - ByteTrack uses a high-confidence threshold to create "active tracks." For example, detections with confidence scores above a certain threshold are considered reliable and used to update existing tracks.
  - Matching with Low-Confidence Detections
    - If no high-confidence detections match a tracked object, ByteTrack uses low-confidence detections to fill gaps. This helps prevent objects from "disappearing" momentarily due to occlusions or other issues.
  - Kalman Filter
    - ByteTrack uses a Kalman filter to predict the position of tracked objects in the next frame, improving its ability to match objects between frames.
  - Hungarian Algorithm
    - The Hungarian algorithm is used to assign detections to existing tracks based on a cost function, which evaluates how well a detection matches a track. The cost function considers:
      - IoU (Intersection over Union) between bounding boxes.
      - Distance between predicted and detected positions.
  - New Tracks
    - If a detection cannot be matched to any existing track, a new track is created with a new tracker_id.


- After processing the frame, the derrived detections are passed to **annotate_frame** for annotation.

### Frame Annotation
- For every frame and list of filtered detections received from **process_frame**, we do 5 key annotations:
  - Zone Annotation
    - Draws the zone polygons on the frame with their corresponding colours.  
  - Trace Annotator
    - Shows where each object has been (movement history).
    - It uses the historical positions of objects (tracked by their tracker_id) to draw a line or "trace" showing the path each object has taken across frames.
  - Box Annotator
    - Highlights the object’s current position with a bounding box.
  - Label Annotator
    - Displays custom_text (in this case the tracker_id) for each object, for  identification.
  - Zone Statistics
    - Shows the counts for how many objects transitioned from each entry zone to each exit zone, based on DetectionManager's counts property.

### Frame Processing
- The **sv.get_video_frames_generator** function creates a generator that yields individual frames from the input video. Total Frames=Video Duration (seconds)×FPS.
- The method supports two modes based on whether an output video path (self.target_video_path) is specified.
  - Save Processed Video to File: Handles writing processed frames to a video file
  - Display Processed Video in Real-Time: The processed frame is displayed in a window using OpenCV's cv2.imshow
  
- tqdm provides user feedback on processing progress, especially helpful for long videos.

In [7]:
class VideoProcessor:
    def __init__(
        self,
        source_weights_path: str,
        source_video_path: str,
        target_video_path: Optional[str] = None,
        confidence_threshold: float = 0.3,
        iou_threshold: float = 0.7,
    ) -> None:
        self.conf_threshold = confidence_threshold
        self.iou_threshold = iou_threshold
        self.source_video_path = source_video_path
        self.target_video_path = target_video_path

        self.model = YOLO(source_weights_path)
        self.tracker = sv.ByteTrack()

        self.video_info = sv.VideoInfo.from_video_path(source_video_path)
        self.zones_in = initiate_polygon_zones(ZONE_IN_POLYGONS, [sv.Position.CENTER])
        self.zones_out = initiate_polygon_zones(ZONE_OUT_POLYGONS, [sv.Position.CENTER])

        self.box_annotator = sv.BoxAnnotator(color=COLORS)
        self.label_annotator = sv.LabelAnnotator(
            color=COLORS, text_color=sv.Color.BLACK
        )
        self.trace_annotator = sv.TraceAnnotator(
            color=COLORS, position=sv.Position.CENTER, trace_length=100, thickness=2
        )
        self.detections_manager = DetectionsManager()
        

    def process_frame(self, frame: np.ndarray) -> np.ndarray:
        results = self.model(
            frame, verbose=False, conf=self.conf_threshold, iou=self.iou_threshold
        )[0]
        detections = sv.Detections.from_ultralytics(results)
        detections.class_id = np.zeros(len(detections))
        detections = self.tracker.update_with_detections(detections)

        detections_in_zones = []
        detections_out_zones = []

        for zone_in, zone_out in zip(self.zones_in, self.zones_out):
            detections_in_zone = detections[zone_in.trigger(detections=detections)]
            detections_in_zones.append(detections_in_zone)
            detections_out_zone = detections[zone_out.trigger(detections=detections)]
            detections_out_zones.append(detections_out_zone)

        detections = self.detections_manager.update(
            detections, detections_in_zones, detections_out_zones
        )
        return self.annotate_frame(frame, detections)
    

    def annotate_frame(
        self, frame: np.ndarray, detections: sv.Detections
    ) -> np.ndarray:
        annotated_frame = frame.copy()
        for i, (zone_in, zone_out) in enumerate(zip(self.zones_in, self.zones_out)):
            annotated_frame = sv.draw_polygon(
                annotated_frame, zone_in.polygon, COLORS.colors[i]
            )
            annotated_frame = sv.draw_polygon(
                annotated_frame, zone_out.polygon, COLORS.colors[i]
            )

        labels = [f"#{tracker_id}" for tracker_id in detections.tracker_id]
        annotated_frame = self.trace_annotator.annotate(annotated_frame, detections)
        annotated_frame = self.box_annotator.annotate(annotated_frame, detections)
        annotated_frame = self.label_annotator.annotate(
            annotated_frame, detections, labels
        )

        for zone_out_id, zone_out in enumerate(self.zones_out):
            zone_center = sv.get_polygon_center(polygon=zone_out.polygon)
            if zone_out_id in self.detections_manager.counts:
                counts = self.detections_manager.counts[zone_out_id]
                for i, zone_in_id in enumerate(counts):
                    count = len(self.detections_manager.counts[zone_out_id][zone_in_id])
                    text_anchor = sv.Point(x=zone_center.x, y=zone_center.y + 40 * i)
                    annotated_frame = sv.draw_text(
                        scene=annotated_frame,
                        text=str(count),
                        text_anchor=text_anchor,
                        background_color=COLORS.colors[zone_in_id],
                    )

        return annotated_frame


    def process_video(self):
        frame_generator = sv.get_video_frames_generator(
            source_path=self.source_video_path
        )

        if self.target_video_path:
            with sv.VideoSink(self.target_video_path, self.video_info) as sink:
                for frame in tqdm(frame_generator, total=self.video_info.total_frames):
                    annotated_frame = self.process_frame(frame)
                    sink.write_frame(annotated_frame)
        else:
            for frame in tqdm(frame_generator, total=self.video_info.total_frames):
                annotated_frame = self.process_frame(frame)
                cv2.imshow("Processed Video", annotated_frame)
                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
            cv2.destroyAllWindows()

# Main

In [10]:
# parameters


source_weights_path = "data/traffic_analysis.pt"                        # Path to the source weights file
source_video_path = "data/traffic_analysis.mov"                         # Path to the source video file
target_video_path = "output/sample1_result.mp4"          # Path to the target video file (output)
confidence_threshold = 0.3                                              # Confidence threshold for the model
iou_threshold= 0.5                                                      # IOU threshold for the model

In [11]:
processor = VideoProcessor(
        source_weights_path=source_weights_path,
        source_video_path=source_video_path,
        target_video_path=target_video_path,
        confidence_threshold=confidence_threshold,
        iou_threshold=iou_threshold,
    )

processor.process_video()

100%|██████████| 806/806 [03:27<00:00,  3.88it/s]
