In [1]:
import cv2
import numpy as np
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter
from tqdm import tqdm
import json
import time
import os

In [2]:
# helpers to calculate IoU and IoU distance matrix for SORT
def iou(bbox1, bbox2):
    """
    Calculate Intersection over Union between two bounding boxes
    bbox format: [x, y, width, height]
    """
    x1, y1, width1, height1 = bbox1
    x2, y2, width2, height2 = bbox2
    
    # calculate intersection
    x_left = max(x1, x2)
    y_top = max(y1, y2)
    x_right = min(x1 + width1, x2 + width2)
    y_bottom = min(y1 + height1, y2 + height2)
    
    if x_right < x_left or y_bottom < y_top:
        return 0.0
    
    intersection_area = (x_right - x_left) * (y_bottom - y_top)
    bbox1_area = width1 * height1
    bbox2_area = width2 * height2
    union_area = bbox1_area + bbox2_area - intersection_area
    
    return intersection_area / union_area if union_area > 0 else 0.0

def iou_distance(bboxes1, bboxes2):
    """
    Calculate IoU distance matrix between two sets of bounding boxes
    Returns a cost matrix (1 - IoU) for the Hungarian algorithm
    """
    if len(bboxes1) == 0 or len(bboxes2) == 0:
        return np.zeros((len(bboxes1), len(bboxes2)))
    
    cost_matrix = np.zeros((len(bboxes1), len(bboxes2)))
    for i, bbox1 in enumerate(bboxes1):
        for j, bbox2 in enumerate(bboxes2):
            cost_matrix[i, j] = 1 - iou(bbox1, bbox2)
    
    return cost_matrix

In [3]:
# Kalman filter w/ constant velocity model
class KalmanBoxTracker:
    """
    Kalman filter-based tracker for bounding boxes
    State: [x, y, width, height, velocity_x, velocity_y, velocity_width, velocity_height]
    """
    count = 0
    
    def __init__(self, bbox):
        """Initialize Kalman filter with initial bounding box [x, y, width, height]"""
        self.kf = KalmanFilter(dim_x=8, dim_z=4)
        
        # state transition matrix
        self.kf.F = np.array([
            [1, 0, 0, 0, 1, 0, 0, 0],
            [0, 1, 0, 0, 0, 1, 0, 0],
            [0, 0, 1, 0, 0, 0, 1, 0],
            [0, 0, 0, 1, 0, 0, 0, 1],
            [0, 0, 0, 0, 1, 0, 0, 0],
            [0, 0, 0, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 0, 0, 1, 0],
            [0, 0, 0, 0, 0, 0, 0, 1]
        ])
        
        # measurement matrix
        self.kf.H = np.array([
            [1, 0, 0, 0, 0, 0, 0, 0],
            [0, 1, 0, 0, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 1, 0, 0, 0, 0]
        ])
        
        # measurement noise
        self.kf.R *= 10.0
        
        # process noise
        self.kf.Q[-4:, -4:] *= 0.01
        self.kf.Q[:4, :4] *= 0.01
        
        # initial state covariance
        self.kf.P[4:, 4:] *= 1000.0
        self.kf.P *= 10.0
        
        # initialize state
        self.kf.x[:4] = bbox.reshape(4, 1)
        
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.hits = 1
        self.hit_streak = 1
        self.age = 1
        
    def update(self, bbox):
        """Update the Kalman filter with observed bounding box"""
        self.time_since_update = 0
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(bbox.reshape(4, 1))
        
    def predict(self):
        """Predict the next state"""
        self.kf.predict()
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        return self.get_state()
        
    def get_state(self):
        """Return the current bounding box estimate"""
        state = self.kf.x[:4].flatten()
        # ensure width and height are positive
        state[2] = max(state[2], 1)
        state[3] = max(state[3], 1)
        return state

In [4]:
# SORT configs
MAX_AGE = 25
MIN_HITS = 3
IOU_THRESHOLD = 0.3

class SORTTracker:
    """
    SORT-inspired multi-object tracker using Kalman filters and Hungarian algorithm
    """
    def __init__(self, max_age=MAX_AGE, min_hits=MIN_HITS, iou_threshold=IOU_THRESHOLD):
        """
        max_age: maximum frames to keep alive a track without associated detections
        min_hits: minimum number of associated detections before track is confirmed
        iou_threshold: minimum IoU for matching detections to tracks
        """
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.trackers = []
        self.frame_count = 0
        
    def update(self, detections):
        """
        Update tracks with new detections
        detections: numpy array of shape (N, 4) with format [x, y, width, height]
        Returns: numpy array of shape (M, 5) with format [x, y, width, height, track_id]
        """
        self.frame_count += 1
        
        # get predicted locations from existing trackers
        tracks = np.zeros((len(self.trackers), 4))
        to_delete = []
        for track_index, track in enumerate(tracks):
            position = self.trackers[track_index].predict()
            track[:] = position
            if np.any(np.isnan(position)):
                to_delete.append(track_index)
        
        # remove invalid trackers
        for track_index in reversed(to_delete):
            self.trackers.pop(track_index)
        tracks = np.delete(tracks, to_delete, axis=0)
        
        # associate detections to trackers
        matched, unmatched_detections, unmatched_trackers = self.associate_detections_to_trackers(
            detections, tracks
        )
        
        # update matched trackers with assigned detections
        for match in matched:
            self.trackers[match[1]].update(detections[match[0]])
        
        # create new trackers for unmatched detections
        for detection_index in unmatched_detections:
            tracker = KalmanBoxTracker(detections[detection_index])
            self.trackers.append(tracker)
        
        # return current tracked objects
        results = []
        for tracker in self.trackers:
            if tracker.time_since_update < 1 and (tracker.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                detection = tracker.get_state()
                results.append(np.concatenate((detection, [tracker.id])))
        
        # remove dead trackers
        self.trackers = [tracker for tracker in self.trackers if tracker.time_since_update < self.max_age]
        
        return np.array(results) if len(results) > 0 else np.empty((0, 5))
    
    def associate_detections_to_trackers(self, detections, trackers):
        """
        Assigns detections to tracked objects using the Hungarian algorithm
        Returns 3 lists: matches, unmatched_detections, unmatched_trackers
        """
        if len(trackers) == 0:
            return [], list(range(len(detections))), []
        
        if len(detections) == 0:
            return [], [], list(range(len(trackers)))
        
        # calculate IoU distance matrix
        cost_matrix = iou_distance(detections, trackers)
        
        # solve assignment problem
        row_indices, column_indices = linear_sum_assignment(cost_matrix)
        
        # filter out matches with low IoU
        matches = []
        unmatched_detections = list(range(len(detections)))
        unmatched_trackers = list(range(len(trackers)))
        
        for row_index, column_index in zip(row_indices, column_indices):
            if cost_matrix[row_index, column_index] < (1 - self.iou_threshold):
                matches.append([row_index, column_index])
                unmatched_detections.remove(row_index)
                unmatched_trackers.remove(column_index)
        
        return matches, unmatched_detections, unmatched_trackers

In [5]:
# detection configs
MIN_CONTOUR_AREA = 300
MORPH_KERNEL_SIZE = (5, 5)
MORPH_OPEN_ITERATIONS = 1
MORPH_CLOSE_ITERATIONS = 2

def detect_objects(frame, background_subtractor, min_area=MIN_CONTOUR_AREA):
    """
    Detect objects in frame using background subtraction
    Returns list of bounding boxes in format [x, y, width, height]
    """
    # apply background subtraction
    foreground_mask = background_subtractor.apply(frame)
    
    # morphological operations to reduce noise
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, MORPH_KERNEL_SIZE)
    foreground_mask = cv2.morphologyEx(foreground_mask, cv2.MORPH_OPEN, kernel, iterations=MORPH_OPEN_ITERATIONS)
    foreground_mask = cv2.morphologyEx(foreground_mask, cv2.MORPH_CLOSE, kernel, iterations=MORPH_CLOSE_ITERATIONS)
    
    # find contours
    contours, _ = cv2.findContours(foreground_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # extract bounding boxes from valid contours
    detections = []
    for contour in contours:
        area = cv2.contourArea(contour)
        if area > min_area:
            x, y, width, height = cv2.boundingRect(contour)
            detections.append([x, y, width, height])
    
    return np.array(detections) if len(detections) > 0 else np.empty((0, 4))

In [6]:
# background subtractor configs
BACKGROUND_HISTORY = 500
BACKGROUND_VAR_THRESHOLD = 16
BACKGROUND_DETECT_SHADOWS = False

# video writer 
VIDEO_CODEC = 'mp4v'

# viz constants
BBOX_THICKNESS = 2
TEXT_FONT = cv2.FONT_HERSHEY_SIMPLEX
TEXT_SCALE = 0.6
TEXT_THICKNESS = 2
TEXT_Y_OFFSET = 10

def process_video(video_path, output_dir, sequence_name):
    """
    Process a single video file with MOT tracking
    Returns processing time and average FPS
    """
    # create output directory
    os.makedirs(output_dir, exist_ok=True)
    
    # open video
    capture = cv2.VideoCapture(video_path)
    if not capture.isOpened():
        print(f"Error: Could not open video {video_path}")
        return None
    
    # get video properties
    fps = int(capture.get(cv2.CAP_PROP_FPS))
    width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # initialize video writer
    output_video_path = os.path.join(output_dir, f"{sequence_name}.mp4")
    fourcc = cv2.VideoWriter_fourcc(*VIDEO_CODEC)
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
    # initialize background subtractor and tracker
    background_subtractor = cv2.createBackgroundSubtractorMOG2(
        history=BACKGROUND_HISTORY,
        varThreshold=BACKGROUND_VAR_THRESHOLD,
        detectShadows=BACKGROUND_DETECT_SHADOWS
    )
    tracker = SORTTracker(max_age=MAX_AGE, min_hits=MIN_HITS, iou_threshold=IOU_THRESHOLD)
    
    # storage for tracking results
    tracking_results = []
    
    # process video
    frame_number = 0
    start_time = time.time()
    
    progress_bar = tqdm(total=total_frames, desc=f"Processing {sequence_name}")
    
    while True:
        ret, frame = capture.read()
        if not ret:
            break
        
        frame_number += 1
        
        # detect objects
        detections = detect_objects(frame, background_subtractor, min_area=MIN_CONTOUR_AREA)
        
        # update tracker
        tracked_objects = tracker.update(detections)
        
        # draw bounding boxes and save results
        for obj in tracked_objects:
            x, y, width, height, track_id = obj
            x, y, width, height = int(x), int(y), int(width), int(height)
            track_id = int(track_id)
            
            # draw on frame
            color = tuple(int(c) for c in np.random.RandomState(track_id).randint(0, 255, 3))
            cv2.rectangle(frame, (x, y), (x + width, y + height), color, BBOX_THICKNESS)
            cv2.putText(frame, f"ID: {track_id}", (x, y - TEXT_Y_OFFSET),
                       TEXT_FONT, TEXT_SCALE, color, TEXT_THICKNESS)
            
            # save tracking result (MOT format)
            tracking_results.append([frame_number, track_id, x, y, width, height, -1, -1, -1, -1])
        
        # write frame to output video
        video_writer.write(frame)
        progress_bar.update(1)
    
    progress_bar.close()
    
    # cleanup
    capture.release()
    video_writer.release()
    
    # calculate metrics
    end_time = time.time()
    processing_time = end_time - start_time
    average_fps = frame_number / processing_time if processing_time > 0 else 0
    
    # save tracking results to file
    tracking_file = os.path.join(output_dir, f"{sequence_name}.txt")
    np.savetxt(tracking_file, tracking_results, fmt='%d,%d,%d,%d,%d,%d,%d,%d,%d,%d')
    
    print(f"Completed {sequence_name}: {frame_number} frames in {processing_time:.2f}s ({average_fps:.2f} FPS)")
    
    return {
        'sequence_name': sequence_name,
        'total_frames': frame_number,
        'processing_time': processing_time,
        'average_fps': average_fps
    }

In [7]:
# path cofigs 
TEST_DIR = "../data/soccer_side/test"
RESULTS_DIR = "../results/classical"
VIDEO_FILENAME = "img1.mp4" # name of videos in teamtrack dataset
METRICS_FILENAME = "metrics.json"

def run_classical_method():
    """
    Main function to process all videos
    """
    # find all video sequences
    video_sequences = []
    for item in os.listdir(TEST_DIR):
        sequence_dir = os.path.join(TEST_DIR, item)
        if os.path.isdir(sequence_dir):
            video_path = os.path.join(sequence_dir, VIDEO_FILENAME)
            if os.path.exists(video_path):
                video_sequences.append((item, video_path))
    
    print(f"Found {len(video_sequences)} video sequences to process")
    
    # process each video
    all_metrics = []
    for sequence_name, video_path in video_sequences:
        print(f"\nProcessing: {sequence_name}")
        output_dir = os.path.join(RESULTS_DIR, sequence_name)
        
        metrics = process_video(video_path, output_dir, sequence_name)
        if metrics:
            all_metrics.append(metrics)
    
    # save overall metrics
    metrics_file = os.path.join(RESULTS_DIR, METRICS_FILENAME)
    os.makedirs(RESULTS_DIR, exist_ok=True)
    with open(metrics_file, 'w') as f:
        json.dump(all_metrics, f, indent=2)
    
    print(f"\nTotal sequences processed: {len(all_metrics)}")
    print(f"Results saved to: {RESULTS_DIR}")
    print(f"Metrics saved to: {metrics_file}")
    
    # print summary
    if all_metrics:
        total_time = sum(metric['processing_time'] for metric in all_metrics)
        average_fps = sum(metric['average_fps'] for metric in all_metrics) / len(all_metrics)
        print(f"\nTotal processing time: {total_time:.2f}s")
        print(f"Average FPS across all sequences: {average_fps:.2f}")

# run main function
run_classical_method()

Found 10 video sequences to process

Processing: F_20220220_1_1890_1920


Processing F_20220220_1_1890_1920: 100%|██████████| 750/750 [00:31<00:00, 24.14it/s]


Completed F_20220220_1_1890_1920: 750 frames in 31.11s (24.11 FPS)

Processing: F_20220220_1_1920_1950


Processing F_20220220_1_1920_1950: 100%|██████████| 750/750 [00:28<00:00, 25.96it/s]


Completed F_20220220_1_1920_1950: 750 frames in 28.91s (25.95 FPS)

Processing: F_20220220_1_1680_1710


Processing F_20220220_1_1680_1710: 100%|██████████| 750/750 [00:28<00:00, 25.92it/s]


Completed F_20220220_1_1680_1710: 750 frames in 28.95s (25.91 FPS)

Processing: F_20220220_1_1770_1800


Processing F_20220220_1_1770_1800: 100%|██████████| 750/750 [00:29<00:00, 25.62it/s]


Completed F_20220220_1_1770_1800: 750 frames in 29.29s (25.61 FPS)

Processing: F_20220220_1_1950_1980


Processing F_20220220_1_1950_1980: 100%|██████████| 750/750 [00:28<00:00, 26.41it/s]


Completed F_20220220_1_1950_1980: 750 frames in 28.41s (26.40 FPS)

Processing: F_20220220_1_1830_1860


Processing F_20220220_1_1830_1860: 100%|██████████| 750/750 [00:30<00:00, 24.53it/s]


Completed F_20220220_1_1830_1860: 750 frames in 30.59s (24.52 FPS)

Processing: F_20220220_1_1740_1770


Processing F_20220220_1_1740_1770: 100%|██████████| 750/750 [00:28<00:00, 26.07it/s]


Completed F_20220220_1_1740_1770: 750 frames in 28.78s (26.06 FPS)

Processing: F_20220220_1_1860_1890


Processing F_20220220_1_1860_1890: 100%|██████████| 750/750 [00:29<00:00, 25.28it/s]


Completed F_20220220_1_1860_1890: 750 frames in 29.68s (25.27 FPS)

Processing: F_20220220_1_1800_1830


Processing F_20220220_1_1800_1830: 100%|██████████| 750/750 [00:29<00:00, 25.12it/s]


Completed F_20220220_1_1800_1830: 750 frames in 29.87s (25.11 FPS)

Processing: F_20220220_1_1710_1740


Processing F_20220220_1_1710_1740: 100%|██████████| 750/750 [00:28<00:00, 26.08it/s]


Completed F_20220220_1_1710_1740: 750 frames in 28.77s (26.07 FPS)

Total sequences processed: 10
Results saved to: ../results/classical
Metrics saved to: ../results/classical/metrics.json

Total processing time: 294.35s
Average FPS across all sequences: 25.50
