# **SETUP**

In [None]:
!pip install filterpy



In [2]:
from collections import defaultdict

import cv2
import numpy as np


import xml.etree.ElementTree as ET

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib import rc as rc
import IPython.display
import matplotlib as mpl

# Set the animation embed limit to 50 MB
mpl.rcParams['animation.embed_limit'] = 1000
rc('animation', html='jshtml')

# from google.colab import drive
# drive.mount('/content/drive')

# **VIDEO PLAYER**

In [3]:
def draw_text(img, text,
          pos=(0, 0),
          font=cv2.FONT_HERSHEY_PLAIN,
          font_scale=3.0,
          font_thickness=2,
          text_color=(0, 255, 0),
          text_color_bg=(0, 0, 0)
          ):

    x, y = pos
    text_size, _ = cv2.getTextSize(text, font, font_scale, font_thickness)
    text_w, text_h = text_size
    cv2.rectangle(img, pos, (x + text_w, y + text_h), text_color_bg, -1)
    cv2.putText(img, text, (x, round(y + text_h + font_scale - 1)), font, font_scale, text_color, font_thickness)

    return text_size

In [4]:
class VideoPlayer:
    def __init__(self, video_path):
        self.video_path = video_path
        self.cap = cv2.VideoCapture(video_path)
        if not self.cap.isOpened():
            raise ValueError(f"Could not open video file: {video_path}")

        self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
        self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.figsize = (5, round(5 / self.width * self.height))

        print(self.figsize)

        # Close immediately if you only need the metadata
        self.cap.release()

    def get_frames(self, start_frame, end_frame):
        """
        Returns all frames in RGB from start_frame (inclusive) to end_frame (exclusive).
        """
        if start_frame >= self.frame_count:
            return np.array([])

        cap = cv2.VideoCapture(self.video_path)
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
        frames = []
        for frame_idx in range(start_frame, end_frame):
            if frame_idx >= self.frame_count:
                break
            ret, frame = cap.read()
            if not ret:
                break
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame_rgb)
        cap.release()
        return np.array(frames)

    def plot(
        self,
        start_frame=0,
        end_frame=100,
        bounding_boxes_gt=None,
        bounding_boxes_pred=None,
        gt_color=(0, 255, 0),
        pred_color=(255, 0, 0),
        bbox_thickness=5,
        save_file=None,
    ):
        """
        Displays an animation of the video frames within [start_frame, end_frame).
        Optionally draws bounding boxes for ground-truth and predicted data,
        with track IDs rendered in a small filled square of the same color:
        - GT ID near top-left corner
        - Pred ID near top-right corner
        """
        frames = self.get_frames(start_frame, end_frame)
        if len(frames) == 0:
            print("No frames to display for the specified frame range.")
            return

        processed_frames = []
        label_box_size = 30  # Size (width=height) of the track ID "label box"

        for i, frame in enumerate(frames):
            # Convert to uint8 for drawing bounding boxes in OpenCV
            out_frame_uint8 = frame.astype(np.uint8)

            # Draw ground-truth bounding boxes
            if bounding_boxes_gt is not None:
                for gt_box in bounding_boxes_gt[i + start_frame]:
                    x, y, w, h = gt_box["bbox"]
                    # Draw the bounding box
                    cv2.rectangle(
                        out_frame_uint8,
                        (int(x), int(y)), (int(x + w), int(y + h)),
                        color=gt_color,
                        thickness=bbox_thickness
                    )
                    # Draw the track_id box (if present)
                    if "track_id" in gt_box:
                        tid = str(gt_box["track_id"])
                        # Draw a small filled square in the top-left corner of the box
                        lx, ly = int(x + 0.25*w), int(y + h + 5)
                        # Put the text in white
                        draw_text(
                            out_frame_uint8,
                            tid,
                            pos=(lx, ly),  # approximate vertical center
                            font=cv2.FONT_HERSHEY_SIMPLEX,
                            font_scale=1.25,  # font scale
                            text_color=(0, 0, 0),  # white text
                            font_thickness=3,  # text thickness
                            text_color_bg=gt_color
                        )

            # Draw predicted bounding boxes
            if bounding_boxes_pred is not None:
                for pred_box in bounding_boxes_pred[i + start_frame]:
                    x, y, w, h = pred_box["bbox"]
                    cv2.rectangle(
                        out_frame_uint8,
                        (int(x), int(y)), (int(x + w), int(y + h)),
                        color=pred_color,
                        thickness=bbox_thickness
                    )
                    # Draw the track_id box (if present)
                    if "track_id" in pred_box:
                        tid = str(pred_box["track_id"])
                        # Draw a small filled square in the top-right corner of the box
                        lx, ly = int(x + 0.75*w), int(y + h + 5)
                        draw_text(
                            out_frame_uint8,
                            tid,
                            pos=(lx, ly),  # approximate vertical center
                            font=cv2.FONT_HERSHEY_SIMPLEX,
                            font_scale=1.25,  # font scale
                            text_color=(0, 0, 0),  # white text
                            font_thickness=3,  # text thickness
                            text_color_bg=pred_color
                        )

            processed_frames.append(out_frame_uint8)

        processed_frames = np.array(processed_frames)

        # Create the Matplotlib animation
        fig, ax = plt.subplots(figsize=self.figsize)
        img_display = ax.imshow(processed_frames[0])
        ax.set_title(f"Frame: {start_frame + 1}/{self.frame_count}")
        ax.axis("off")
        plt.tight_layout()

        def update(frame_idx):
            img_display.set_data(processed_frames[frame_idx])
            ax.set_title(f"Frame: {start_frame + frame_idx + 1}/{self.frame_count}")
            return (img_display,)

        anim = FuncAnimation(
            fig,
            update,
            frames=len(processed_frames),
            interval=1000 / self.fps if self.fps else 40,
            blit=True
        )

        if save_file is not None:
            from matplotlib.animation import FFMpegWriter, PillowWriter
            ext = save_file.lower().rsplit('.', 1)[-1]
            fps_for_save = int(self.fps) if self.fps else 30

            if ext == 'gif':
                writer = PillowWriter(fps=fps_for_save)
            else:
                writer = FFMpegWriter(fps=fps_for_save, metadata={'title': 'Video Output'})

            anim.save(save_file, writer=writer)
            print(f"Animation saved to {save_file}")
        else:
            IPython.display.display(anim)
            plt.close(fig)

video_path = '../train/S03/c010/vdo.avi'
video_player = VideoPlayer(video_path)

(5, 3)


# **I/O**

## XML

In [5]:
def parse_cvat_annotations(xml_path):
    """
    Reads a CVAT annotation XML file and returns ground truth data
    in a dict of the form:
        {
            frame_idx: [
                {
                    "bbox": [x, y, w, h],
                    "category_id": <int>,
                    "track_id": <str>
                },
                ...
            ],
            ...
        }

    Assumptions/notes:
      - Only 'outside="0"' (visible) objects will be returned.
      - Only non-parked objects are returned (based on <attribute name="parked">).
      - The default category_id is given by label_to_id below; unrecognized labels
        will get new IDs assigned automatically.
      - Frame indices are taken from the 'box' element's 'frame' attribute in CVAT.
    """

    # Manually seed known labels if you like,
    # or leave empty and assign new IDs on the fly:
    label_to_id = {
        "car": 1,
        "bike": 1,
    }

    frames_dict = defaultdict(list)

    tree = ET.parse(xml_path)
    root = tree.getroot()

    # Iterate over each "track" element in <annotations>
    for track in root.findall("track"):
        label = track.attrib["label"]  # e.g. "car" or "bike"
        if label not in label_to_id:
            # Assign a new ID if the label is not recognized
            label_to_id[label] = len(label_to_id) + 1

        track_id = track.attrib["id"]  # track identifier as string

        for box in track.findall("box"):
            frame_str = box.attrib["frame"]
            xtl = float(box.attrib["xtl"])
            ytl = float(box.attrib["ytl"])
            xbr = float(box.attrib["xbr"])
            ybr = float(box.attrib["ybr"])
            outside = box.attrib["outside"]  # "0" or "1"

            # Check whether this box is marked 'parked'
            parked = False
            for attr_node in box.findall("attribute"):
                if attr_node.attrib.get("name") == "parked":
                    if attr_node.text.strip().lower() == "true":
                        parked = True
                        break

            # Skip 'outside' (invisible) objects
            if outside == "1" or label=='bike':
                continue

            # Convert to [x, y, w, h]
            x = xtl
            y = ytl
            w = xbr - xtl
            h = ybr - ytl

            frame_idx = int(frame_str)
            cat_id = label_to_id[label]

            annotation = {
                "bbox": [x, y, w, h],
                "category_id": cat_id,
                "track_id": track_id,
                "conf": 1,
            }

            frames_dict[frame_idx].append(annotation)
    return frames_dict

## TXT

In [6]:
def parse_detection_file(filepath):
    """
    Reads the detection text file and returns data grouped by frame.
    Each element in the returned list corresponds to a single frame,
    which itself is a list of dictionaries.
    Each dictionary has keys: 'bbox' -> [left, top, width, height], 'conf' -> conf_value

    :param filepath: Path to the input text file.
    """

    # Using a dictionary to accumulate detections by frame number:
    frames_dict = defaultdict(list)

    with open(filepath, 'r') as f:
        for line in f:
            # Strip and skip any empty lines
            line = line.strip()
            if not line:
                continue

            # Split line into fields
            fields = line.split(',')
            # fields are expected as: frame, -1, left, top, width, height, conf, -1, -1, -1

            frame = int(fields[0].strip())-1
            left  = float(fields[2].strip())
            top   = float(fields[3].strip())
            width = float(fields[4].strip())
            height= float(fields[5].strip())
            conf  = float(fields[6].strip())

            # Construct detection dictionary
            detection = {
                'bbox': [left, top, width, height],
                'conf': conf
            }

            # Append the detection to the corresponding frame
            frames_dict[frame].append(detection)

    return frames_dict


def save_tracking_data(filepath, tracking_data):
    """
    Saves tracking data to a file in the format:
      frame,id,left,top,width,height,conf,-1,-1,-1

    :param filepath: Path to the output text file.
    :param tracking_data: A list of frames (list),
                          where each frame is a list of dictionaries.
                          Each dictionary has keys:
                            {
                                'id': <integer ID>,
                                'bbox': [left, top, width, height],
                                'conf': <float confidence>
                            }
    """
    with open(filepath, 'w') as f:
        # 'frame_idx' will start from 1, but adjust if your frames are 0-based
        for frame_idx, detections in enumerate(tracking_data, start=1):
            for det in detections:
                box_id = det['track_id']
                left, top, width, height = det['bbox']
                conf = det['conf']
                # Write one line per detection
                line = f"{frame_idx},{box_id},{left:.2f},{top:.2f},{width:.2f},{height:.2f},{conf:.2f},-1,-1,-1"
                f.write(line + "\n")

# **TRACKING**

## Utils

In [7]:
def iou(box_a, box_b):
    """
    Computes the Intersection-over-Union (IoU) of two boxes.
    Each box is in the format [x, y, w, h].
    """
    # Convert [x, y, w, h] to (xmin, ymin, xmax, ymax)
    ax1, ay1 = box_a[0], box_a[1]
    ax2, ay2 = ax1 + box_a[2], ay1 + box_a[3]

    bx1, by1 = box_b[0], box_b[1]
    bx2, by2 = bx1 + box_b[2], by1 + box_b[3]

    # Intersection rectangle
    inter_x1 = max(ax1, bx1)
    inter_y1 = max(ay1, by1)
    inter_x2 = min(ax2, bx2)
    inter_y2 = min(ay2, by2)

    inter_w = max(0, inter_x2 - inter_x1)
    inter_h = max(0, inter_y2 - inter_y1)
    intersection_area = inter_w * inter_h

    # Areas of each box
    area_a = box_a[2] * box_a[3]  # w*h
    area_b = box_b[2] * box_b[3]

    union_area = area_a + area_b - intersection_area
    if union_area == 0:
        return 0.0
    return intersection_area / union_area


def union_box(box_a, box_b):
    """
    Returns the bounding-box union of the two boxes
    (the minimal rectangle that encloses both).
    Each box is in the format [x, y, w, h].
    """
    ax1, ay1 = box_a[0], box_a[1]
    ax2, ay2 = ax1 + box_a[2], ay1 + box_a[3]

    bx1, by1 = box_b[0], box_b[1]
    bx2, by2 = bx1 + box_b[2], by1 + box_b[3]

    union_x1 = min(ax1, bx1)
    union_y1 = min(ay1, by1)
    union_x2 = max(ax2, bx2)
    union_y2 = max(ay2, by2)

    return [union_x1, union_y1, union_x2 - union_x1, union_y2 - union_y1]


def union_of_boxes(list_of_boxes):
    """
    Given a list of [x, y, w, h] boxes, returns the bounding box that encloses them all.
    """
    if not list_of_boxes:
        return None

    # Initialize x1, y1 with a large value, x2, y2 with a small value
    x1 = float('inf')
    y1 = float('inf')
    x2 = float('-inf')
    y2 = float('-inf')

    for (x, y, w, h) in list_of_boxes:
        # Convert [x, y, w, h] into corners
        bx1, by1 = x, y
        bx2, by2 = x + w, y + h

        # Update union coordinates
        x1 = min(x1, bx1)
        y1 = min(y1, by1)
        x2 = max(x2, bx2)
        y2 = max(y2, by2)

    # Convert corners back to [x, y, w, h]
    return [x1, y1, x2 - x1, y2 - y1]


def merge_overlapping_boxes(bboxes, iou_threshold=0.5):
    """
    Given a list of bounding boxes [ [x, y, w, h], ... ],
    merges any two boxes whose IoU > iou_threshold into their union.
    Repeats until no further merges are found.
    Returns the merged list of boxes.
    """
    merged = True
    boxes = bboxes[:]

    # Keep merging until no more merges happen
    while merged:
        merged = False
        new_boxes = []
        while boxes:
            current_box = boxes.pop()
            # Try to merge current_box with one of the boxes already in new_boxes
            for i, nb in enumerate(new_boxes):
                if iou(current_box['bbox'], nb['bbox']) > iou_threshold:
                    # Merge them
                    merged_box = union_box(current_box['bbox'], nb['bbox'])
                    merged_conf = max(current_box['conf'], nb['conf'])
                    # Replace the box in new_boxes with the merged box
                    new_boxes[i] = {'bbox': merged_box, 'conf': merged_conf}
                    merged = True
                    break
            else:
                # If we never broke, it means no merge happened; keep current_box
                new_boxes.append(current_box)
        boxes = new_boxes

    return boxes


def preprocess_detections_dict(detections_per_frame, iou_threshold=0.9):
    """
    Takes a dictionary where the keys are frame indices and the values are lists of bounding boxes.
    Merges overlapping boxes in each frame if IoU > iou_threshold.
    Returns a dictionary of the same structure with merged boxes.

    Example:
        detections_per_frame = {
            0: [ [x0, y0, w0, h0], [x1, y1, w1, h1], ... ],
            1: [ [x2, y2, w2, h2], ... ],
            ...
        }
    """
    merged_frames = {}
    for frame_idx, frame_bboxes in detections_per_frame.items():
        merged_bboxes = merge_overlapping_boxes(frame_bboxes, iou_threshold)
        merged_frames[frame_idx] = merged_bboxes
    return merged_frames

## Maximum Overlap Method

In [8]:
def tracking_postprocessing(tracking_by_frame, min_frames=3):
    """
    - Merges all boxes in a single frame that share the same tracking_id
      into one bounding box (their union).
    - Removes any track IDs that do not appear in at least `min_frames` frames.

    :param tracking_by_frame: dict of:
        {
            frame_idx: [
                {
                    "bbox": [x, y, w, h],
                    "track_id": <int>,
                    ... (other fields possible)
                },
                ...
            ],
            ...
        }
    :param min_frames: the minimum number of frames a track_id must appear in
                       to remain in the final output.
    :return: A new dictionary with the same structure, but merged and filtered.
    """
    # 1. Count how many frames each track_id appears in
    frames_for_track_id = defaultdict(set)  # track_id -> set of frame_idxs
    for frame_idx, detections in tracking_by_frame.items():
        for det in detections:
            tid = det["track_id"]
            frames_for_track_id[tid].add(frame_idx)

    # 2. Determine which track_ids appear in >= min_frames
    valid_track_ids = set()
    for tid, frames_set in frames_for_track_id.items():
        if len(frames_set) >= min_frames:
            valid_track_ids.add(tid)

    # 3. Merge boxes for each track_id within a frame
    new_tracking_by_frame = {}
    for frame_idx, detections in tracking_by_frame.items():
        # Group boxes by track_id
        boxes_by_tid = defaultdict(list)
        for det in detections:
            tid = det["track_id"]
            # Only keep track if tid is valid
            if tid in valid_track_ids:
                boxes_by_tid[tid].append(det["bbox"])

        # Now merge boxes for each track_id in this frame
        merged_detections = []
        for tid, box_list in boxes_by_tid.items():
            merged_box = union_of_boxes(box_list)
            new_det = {
                "track_id": tid,
                "bbox": merged_box
            }
            merged_detections.append(new_det)

        # Store back
        new_tracking_by_frame[frame_idx] = merged_detections

    return new_tracking_by_frame

def track_by_maximum_overlap(detections_per_frame, iou_threshold=0.2, min_hits=3):
    """
    Tracks objects from frame to frame by assigning track IDs according to maximum overlap (IoU).

    :param detections_per_frame: Dictionary of {frame_idx: [ { "bbox": [...], ... }, ... ] }
                                 Each box dict can have other fields like "conf" if you want.
    :param iou_threshold: The minimum IoU needed to consider a box in the current frame
                          as matching a box in the previous frame.
    :return: Dictionary {frame_idx: [ { "bbox": [...], "track_id": <int>, ... }, ... ] },
             same shape as input, but each detection has a track_id assigned.
    """
    frame_indices = sorted(detections_per_frame.keys())

    next_track_id = 1

    tracking_by_frame = defaultdict(list)

    for frame_idx in frame_indices:
        current_detections = detections_per_frame[frame_idx]
        previous_tracked = tracking_by_frame[frame_idx-1]
        new_tracked_detections = []

        for detection in current_detections:

            best_iou = 0.0
            best_track_id = None
            for prev_detection in previous_tracked:
                overlap = iou(detection['bbox'], prev_detection['bbox'])
                if overlap > iou_threshold and overlap > best_iou:
                    best_iou = overlap
                    best_track_id = prev_detection['track_id']

            if best_track_id is not None:
                assigned_id = best_track_id
            else:
                assigned_id = next_track_id
                next_track_id += 1

            new_det = dict(detection)
            new_det["track_id"] = assigned_id
            new_tracked_detections.append(new_det)

        tracking_by_frame[frame_idx] = new_tracked_detections

    return tracking_postprocessing(tracking_by_frame, min_hits)

## SORT Method

### SORT Code

In [9]:
import os
import numpy as np
from skimage import io

import glob
import time
import argparse
from filterpy.kalman import KalmanFilter

np.random.seed(0)


def linear_assignment(cost_matrix):
    from scipy.optimize import linear_sum_assignment
    x, y = linear_sum_assignment(cost_matrix)
    return np.array(list(zip(x, y)))


def iou_batch(bb_test, bb_gt):
    """
    From SORT: Computes IOU between two bboxes in the form [x1,y1,x2,y2]
    """
    bb_gt = np.expand_dims(bb_gt, 0)
    bb_test = np.expand_dims(bb_test, 1)

    xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
    yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
    xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
    yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
    + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
    return(o)


def convert_bbox_to_z(bbox):
    """
    Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
    [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
    the aspect ratio
    """
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    x = bbox[0] + w/2.
    y = bbox[1] + h/2.
    s = w * h    #scale is just area
    r = w / float(h)
    return np.array([x, y, s, r]).reshape((4, 1))


def convert_x_to_bbox(x,score=None):
    """
    Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
    [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
    """
    w = np.sqrt(x[2] * x[3])
    h = x[2] / w
    if(score==None):
        return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4))
    else:
        return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5))


class KalmanBoxTracker(object):
    """
    This class represents the internal state of individual tracked objects observed as bbox.
    """
    count = 0
    def __init__(self,bbox):
        """
        Initialises a tracker using initial bounding box.
        """
        #define constant velocity model
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],  [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
        self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])

        self.kf.R[2:,2:] *= 10.
        self.kf.P[4:,4:] *= 1000. #give high uncertainty to the unobservable initial velocities
        self.kf.P *= 10.
        self.kf.Q[-1,-1] *= 0.01
        self.kf.Q[4:,4:] *= 0.01

        self.kf.x[:4] = convert_bbox_to_z(bbox)
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0

    def update(self,bbox):
        """
        Updates the state vector with observed bbox.
        """
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(convert_bbox_to_z(bbox))

    def predict(self):
        """
        Advances the state vector and returns the predicted bounding box estimate.
        """
        if((self.kf.x[6]+self.kf.x[2])<=0):
            self.kf.x[6] *= 0.0
        self.kf.predict()
        self.age += 1
        if(self.time_since_update>0):
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(convert_x_to_bbox(self.kf.x))
        return self.history[-1]

    def get_state(self):
        """
        Returns the current bounding box estimate.
        """
        return convert_x_to_bbox(self.kf.x)


def associate_detections_to_trackers(detections, trackers, iou_threshold = 0.3):
    """
    Assigns detections to tracked object (both represented as bounding boxes)

    Returns 3 lists of matches, unmatched_detections and unmatched_trackers
    """
    if(len(trackers)==0):
        return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int)

    iou_matrix = iou_batch(detections, trackers)

    if min(iou_matrix.shape) > 0:
        a = (iou_matrix > iou_threshold).astype(np.int32)
        if a.sum(1).max() == 1 and a.sum(0).max() == 1:
            matched_indices = np.stack(np.where(a), axis=1)
        else:
            matched_indices = linear_assignment(-iou_matrix)
    else:
        matched_indices = np.empty(shape=(0,2))

    unmatched_detections = []
    for d, det in enumerate(detections):
        if(d not in matched_indices[:,0]):
            unmatched_detections.append(d)
    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if(t not in matched_indices[:,1]):
            unmatched_trackers.append(t)

    #filter out matched with low IOU
    matches = []
    for m in matched_indices:
        if(iou_matrix[m[0], m[1]]<iou_threshold):
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1,2))
    if(len(matches)==0):
        matches = np.empty((0,2),dtype=int)
    else:
        matches = np.concatenate(matches,axis=0)

    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)


class Sort(object):
  def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3):
    """
    Sets key parameters for SORT
    """
    self.max_age = max_age
    self.min_hits = min_hits
    self.iou_threshold = iou_threshold
    self.trackers = []
    self.frame_count = 0

  def update(self, dets=np.empty((0, 5))):
    """
    Params:
      dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
    Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections).
    Returns the a similar array, where the last column is the object ID.

    NOTE: The number of objects returned may differ from the number of detections provided.
    """
    self.frame_count += 1
    # get predicted locations from existing trackers.
    trks = np.zeros((len(self.trackers), 5))
    to_del = []
    ret = []
    for t, trk in enumerate(trks):
      pos = self.trackers[t].predict()[0]
      trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
      if np.any(np.isnan(pos)):
        to_del.append(t)
    trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
    for t in reversed(to_del):
      self.trackers.pop(t)
    matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets,trks, self.iou_threshold)

    # update matched trackers with assigned detections
    for m in matched:
      self.trackers[m[1]].update(dets[m[0], :])

    # create and initialise new trackers for unmatched detections
    for i in unmatched_dets:
        trk = KalmanBoxTracker(dets[i,:])
        self.trackers.append(trk)
    i = len(self.trackers)
    for trk in reversed(self.trackers):
        d = trk.get_state()[0]
        if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
          ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1)) # +1 as MOT benchmark requires positive
        i -= 1
        # remove dead tracklet
        if(trk.time_since_update > self.max_age):
          self.trackers.pop(i)
    if(len(ret)>0):
      return np.concatenate(ret)
    return np.empty((0,5))

### Tracking with SORT

In [10]:
def track_by_sort(detections_per_frame, iou_threshold=0.2, min_hits=3, max_age=1):
    """
    Uses the SORT tracker to assign track IDs to detections from frame to frame.

    :param detections_per_frame: Dictionary {frame_idx: [ { "bbox": [x1,y1,x2,y2], "conf": ... }, ... ]}
    :param iou_threshold: IoU threshold for data association in SORT.
    :param min_hits: Minimum number of hits before a track is output.
    :param max_age: Maximum frames to keep alive a track without seeing it again.
    :return: Dictionary {frame_idx: [ { "bbox": [...], "track_id": <int> }, ... ] }
             Each detection is assigned a "track_id" from SORT.
    """
    # 1) Create the SORT object with desired parameters
    tracker = Sort(max_age=max_age, min_hits=min_hits, iou_threshold=iou_threshold)

    # 2) Sort the frame indices so we process in order
    frame_indices = sorted(detections_per_frame.keys())

    # 3) Prepare the result dict
    tracking_by_frame = defaultdict(list)

    # 4) Main loop over frames
    for frame_idx in frame_indices:
        # Convert list of detection dicts -> Nx5 array: [x1, y1, x2, y2, score]
        current_detections = detections_per_frame[frame_idx]
        dets_array = []
        for det in current_detections:
            box = det["bbox"]  # Must be [x1,y1,x2,y2]
            score = det.get("conf", 1.0)  # Default to 1.0 if not provided
            dets_array.append([box[0], box[1], box[2]+box[0], box[3]+box[1], score])

        # If no detections, an empty array
        dets_array = np.array(dets_array) if len(dets_array) > 0 else np.empty((0, 5))

        # 5) Pass the detections to the SORT tracker
        tracked = tracker.update(dets_array)
        # 'tracked' is an array of shape (N, 5) -> [x1, y1, x2, y2, track_id]

        # 6) Build a list of detection dicts from the tracked results
        frame_output = []
        for t in tracked:
            x1, y1, x2, y2, track_id = t
            out_det = {
                "bbox": [float(x1), float(y1), float(x2) - float(x1), float(y2)-float(y1)],
                "track_id": int(track_id)
            }
            frame_output.append(out_det)

        tracking_by_frame[frame_idx] = frame_output

    return dict(tracking_by_frame)

# RESULTS

In [11]:
detections = parse_detection_file("../train/S03/c010/det/det_mask_rcnn.txt")
annotations = parse_cvat_annotations("../ai_challenge_s03_c010-full_annotation.xml")

detections_f = preprocess_detections_dict(detections)

tracking_1 = track_by_maximum_overlap(detections_f)
tracking_2 = track_by_sort(detections_f)

In [None]:
video_player.plot(start_frame=861, end_frame=921, bounding_boxes_gt=annotations, bounding_boxes_pred=detections)
video_player.plot(start_frame=861, end_frame=921, bounding_boxes_gt=annotations, bounding_boxes_pred=detections_f)

In [None]:
video_player.plot(start_frame=0, end_frame=3, bounding_boxes_pred=tracking_2)

In [None]:
video_player.plot(start_frame=0, end_frame=3, bounding_boxes_gt=annotations, bounding_boxes_pred=tracking_1)

In [None]:
video_player.plot(start_frame=0, end_frame=3, bounding_boxes_pred=tracking_1)

* Have to remove frame -1

In [15]:
tracking_1.pop(-1)

[]

# Eval

* Install the package

In [1]:
!cd TrackEval/ && pip install -e .

Defaulting to user installation because normal site-packages is not writeable
Obtaining file:///mnt/home/mcv-c6-2025-team5/Week2/TrackEval
doneuild dependencies ... [?25l
doneGetting requirements to build wheel ... [?25l
done  Preparing wheel metadata ... [?25l
Installing collected packages: trackeval
  Attempting uninstall: trackeval
    Found existing installation: trackeval 1.0.dev1
    Uninstalling trackeval-1.0.dev1:
      Successfully uninstalled trackeval-1.0.dev1
  Running setup.py develop for trackeval
Successfully installed trackeval


* Run the eval

In [12]:
from trackeval.metrics.hota import HOTA
from trackeval.metrics.identity import Identity

In [17]:
def calculate_metrics(tracker_data, gt_data):
    # build mapping dict for tracker
    unique_tracker_ids_tr = set()
    for frame, dets in tracker_data.items():
        for det in dets:
            unique_tracker_ids_tr.add(det['track_id'])
    unique_tracker_ids_tr = sorted(list(unique_tracker_ids_tr))
    tracker_id_mapping_tr = {old_id: new_id for new_id, old_id in enumerate(unique_tracker_ids_tr)}
    
    # build mapping dict for gt
    unique_tracker_ids_gt = set()
    for frame, dets in gt_data.items():
        for det in dets:
            unique_tracker_ids_gt.add(det['track_id'])
    unique_tracker_ids_gt = sorted(list(unique_tracker_ids_gt))
    tracker_id_mapping_gt = {old_id: new_id for new_id, old_id in enumerate(unique_tracker_ids_gt)}
    
    all_frames = sorted(set(gt_data.keys()).union(tracker_data.keys()))
    
    gt_ids_list = []
    tracker_ids_list = []
    similarity_scores_list = []
    total_tracker_dets = 0
    total_gt_dets = 0
    
    for frame in all_frames:
        gt_dets = gt_data[frame]
        tr_dets = tracker_data[frame]
        
        total_gt_dets += len(gt_dets)
        total_tracker_dets += len(tr_dets)
        
        # remap track IDs for gt
        gt_ids = np.array([tracker_id_mapping_gt[det['track_id']] for det in gt_dets])
        # remap track IDs for tr
        tr_ids = np.array([tracker_id_mapping_tr[det['track_id']] for det in tr_dets])
        
        if len(gt_dets) > 0 and len(tr_dets) > 0:
            sim_matrix = np.zeros((len(gt_dets), len(tr_dets)), dtype=float)
            for i, gt in enumerate(gt_dets):
                for j, tr in enumerate(tr_dets):
                    sim_matrix[i, j] = iou(gt['bbox'], tr['bbox'])
        else:
            sim_matrix = np.zeros((len(gt_dets), len(tr_dets)), dtype=float)
        
        gt_ids_list.append(gt_ids)
        # gt_ids_list.append(np.array([int(a['track_id']) for a in gt_dets]))
        tracker_ids_list.append(tr_ids)
        # tracker_ids_list.append(np.array([int(a['track_id']) for a in tr_dets]))
        similarity_scores_list.append(sim_matrix)
    
    
    num_gt_ids = len(unique_tracker_ids_gt)
    num_tracker_ids = len(unique_tracker_ids_tr)
    
    # data dictionary for HOTA
    data = {
        'num_tracker_dets': total_tracker_dets,
        'num_gt_dets': total_gt_dets,
        'num_gt_ids': num_gt_ids,
        'num_tracker_ids': num_tracker_ids,
        'gt_ids': gt_ids_list,
        'tracker_ids': tracker_ids_list,
        'similarity_scores': similarity_scores_list
    }
    
    hota_metric = HOTA()
    identity_metric = Identity()
    result_hota = hota_metric.eval_sequence(data)
    result_identity = identity_metric.eval_sequence(data)
    return result_hota, result_identity

In [18]:
result = calculate_metrics(tracking_1, dict(annotations))
result


Identity Config:
THRESHOLD            : 0.5                           
PRINT_CONFIG         : True                          


({'HOTA': array([0.66116379, 0.66064385, 0.65922814, 0.65668806, 0.65401819,
         0.65045638, 0.62927106, 0.54019389, 0.52443087, 0.51698678,
         0.51149629, 0.50422555, 0.49195857, 0.48366427, 0.47339261,
         0.45232037, 0.4028273 , 0.30987649, 0.0406153 ]),
  'DetA': array([0.5631787 , 0.56218499, 0.55935276, 0.55449123, 0.54896223,
         0.54160505, 0.5171988 , 0.40470358, 0.36345262, 0.34785504,
         0.33966746, 0.32639371, 0.29494128, 0.27786651, 0.2655043 ,
         0.24751614, 0.21273221, 0.14718239, 0.01394005]),
  'AssA': array([0.77619689, 0.77634641, 0.7769368 , 0.77772052, 0.77917891,
         0.78118456, 0.76562836, 0.72104487, 0.75670863, 0.7683526 ,
         0.7702488 , 0.77894703, 0.82058108, 0.84188312, 0.84405622,
         0.82658739, 0.76278922, 0.6524112 , 0.11833547]),
  'DetRe': array([0.5752541 , 0.57460435, 0.57274795, 0.56954564, 0.56587924,
         0.56095976, 0.54429851, 0.46001764, 0.4256277 , 0.41207593,
         0.40483594, 0.39290853

In [19]:
result = calculate_metrics(tracking_2, dict(annotations))
result


Identity Config:
THRESHOLD            : 0.5                           
PRINT_CONFIG         : True                          


({'HOTA': array([0.52786818, 0.52724032, 0.52643001, 0.52514403, 0.52352546,
         0.52126253, 0.50693996, 0.42940318, 0.41445576, 0.40563123,
         0.3995102 , 0.390666  , 0.38022795, 0.37068593, 0.35561858,
         0.3301802 , 0.29609274, 0.26471752, 0.02968189]),
  'DetA': array([0.63658788, 0.63299919, 0.6284809 , 0.62167807, 0.61222055,
         0.60294311, 0.57908174, 0.45151154, 0.3878071 , 0.35719865,
         0.34128501, 0.32276072, 0.29437429, 0.27475023, 0.2545467 ,
         0.21970051, 0.17112231, 0.11709423, 0.00962842]),
  'AssA': array([0.43771618, 0.4391512 , 0.44094985, 0.44359977, 0.44768002,
         0.4506472 , 0.44378558, 0.40837737, 0.4429356 , 0.46063078,
         0.46766895, 0.4728578 , 0.49112066, 0.5001199 , 0.49682267,
         0.49621625, 0.51232895, 0.5984528 , 0.09150147]),
  'DetRe': array([0.658746  , 0.6564719 , 0.65359447, 0.64923191, 0.64310577,
         0.63702604, 0.62106094, 0.52680187, 0.47324454, 0.4457233 ,
         0.43091846, 0.41323618