## Imports and Setup 


In [2]:
# ============================================
# 0. Imports & Global Settings
# ============================================
import os
from pathlib import Path
from collections import defaultdict

import cv2
import numpy as np
import pandas as pd

from scipy.optimize import linear_sum_assignment

# YOLOv8
from ultralytics import YOLO
import torch

# DeepSORT
from deep_sort_realtime.deepsort_tracker import DeepSort

# Plotting / debug (optional)
import matplotlib.pyplot as plt


In [3]:
# ============================================
# 0.1 Paths & Constants
# ============================================
BASE_DIR = Path("Object_Tracking")

TASK1_IMAGES_DIR = BASE_DIR / "Task1" / "images"
TASK1_GT_PATH    = BASE_DIR / "Task1" / "gt" / "gt.txt"

TASK2_IMAGES_DIR = BASE_DIR / "Task2" / "images"

# Output paths
TASK1_INPUT_VIDEO  = Path("task1_input.mp4")
TASK1_OUTPUT_VIDEO = Path("task1.mp4")
TASK2_OUTPUT_VIDEO = Path("task2.mp4")
TASK2_COUNTS_CSV   = Path("task2_count.csv")

FPS_TASK1 = 14
FPS_TASK2 = 14

# YOLO weights
YOLO_WEIGHTS = "yolov8n.pt"  # or 'yolov8s.pt' if you want a heavier model

# Pick CUDA if available, otherwise CPU
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", DEVICE)


Using device: cpu


## 1. Data Preparation (Task 1 – images → video @ 14 FPS)

In [4]:
# ============================================
# 1. Convert Task1 images to video (task1_input.mp4)
# ============================================
def images_to_video(image_dir: Path, output_path: Path, fps: int = 14):
    """
    Convert all images in image_dir to a video at the given fps.
    Assumes images are named so that lexicographic sort is correct frame order
    (e.g., 000001.jpg, 000002.jpg, ...).
    """
    image_files = sorted(
        [p for p in image_dir.iterdir() if p.suffix.lower() in [".jpg", ".jpeg", ".png"]]
    )
    assert len(image_files) > 0, f"No images found in {image_dir}"

    # Read first image to get frame size
    first_frame = cv2.imread(str(image_files[0]))
    assert first_frame is not None, f"Could not read first image {image_files[0]}"

    height, width = first_frame.shape[:2]
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))

    for img_path in image_files:
        frame = cv2.imread(str(img_path))
        if frame is None:
            print(f"Warning: could not read {img_path}, skipping.")
            continue
        out.write(frame)

    out.release()
    print(f"Saved video: {output_path} ({len(image_files)} frames at {fps} FPS)")

# Run for Task1
images_to_video(TASK1_IMAGES_DIR, TASK1_INPUT_VIDEO, fps=FPS_TASK1)


Saved video: task1_input.mp4 (429 frames at 14 FPS)


## 2. YOLOv8 + DeepSORT Tracking (Task 2 – Task1 video)

### 2.1 Initialize YOLO and DeepSORT

In [22]:
# ============================================
# 2.1 Initialize YOLOv8 and DeepSORT
# ============================================
def init_yolo(weights_path: str = YOLO_WEIGHTS, device: str = DEVICE):
    """
    Initialize YOLOv8 model on CPU or CUDA if available.
    """
    model = YOLO(weights_path)
    if device != "cpu":
        model.to(device)
    return model


def init_deepsort():
    """
    Initialize DeepSort tracker from deep_sort_realtime.
    """
    tracker = DeepSort(
        max_age=30,
        n_init=3,
        nn_budget=100,
        max_iou_distance=0.7,
    )
    return tracker


yolo_model = init_yolo()
deepsort_tracker = init_deepsort()


[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 33.5MB/s 0.2s/s 0.1s<0.5s


### 2.2 Helper: Run tracker on a video & save results

In [None]:
# ============================================
# 2.2 Run YOLOv8 + DeepSORT on a video (Task1)
# ============================================
def run_tracking(
    input_video_path: Path,
    output_video_path: Path,
    tracker_txt_out: Path,
    yolo_model,
    deepsort_tracker,
    fps: int,
):
    """
    Run YOLOv8 + DeepSORT tracking on a video.

    Outputs:
      - Annotated video with tracking boxes & IDs
      - Text file with tracking results:
        <frame>, <id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>
    """
    cap = cv2.VideoCapture(str(input_video_path))
    assert cap.isOpened(), f"Cannot open {input_video_path}"

    # Get frame size from first frame
    ret, first_frame = cap.read()
    assert ret, "Could not read first frame"
    height, width = first_frame.shape[:2]
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)  # reset to start

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(str(output_video_path), fourcc, fps, (width, height))

    all_tracks = []  # (frame_idx, track_id, bb_left, bb_top, bb_width, bb_height)
    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_idx += 1

        height, width = frame.shape[:2]

        # YOLO inference (more generous settings)
        results = yolo_model(frame, imgsz=960, conf=0.2, verbose=False)[0]
        boxes = results.boxes

        detections = []
        if boxes is not None and len(boxes) > 0:
            xyxy = boxes.xyxy.cpu().numpy()
            confs = boxes.conf.cpu().numpy()
            clss = boxes.cls.cpu().numpy()

            for bbox, score, cls in zip(xyxy, confs, clss):
                # COCO class 0 = 'person'
                if int(cls) != 0:
                    continue
                x1, y1, x2, y2 = bbox

                # DeepSORT expects [x, y, w, h] (top-left + width/height)
                w = x2 - x1
                h = y2 - y1
                detections.append(([x1, y1, w, h], float(score), "person"))

        tracks = deepsort_tracker.update_tracks(detections, frame=frame)

        for track in tracks:
            if not track.is_confirmed() or track.time_since_update > 0:
                continue

            track_id = track.track_id

            # Use original detection box for better IoU with GT
            l, t, r, b = track.to_ltrb(orig=True)

            # Clamp to image
            l = max(0, min(int(l), width - 1))
            r = max(0, min(int(r), width - 1))
            t = max(0, min(int(t), height - 1))
            b = max(0, min(int(b), height - 1))

            bb_left = float(l)
            bb_top = float(t)
            bb_width = float(r - l)
            bb_height = float(b - t)

            all_tracks.append(
                (frame_idx, int(track_id), bb_left, bb_top, bb_width, bb_height)
            )

            # Draw
            cv2.rectangle(frame, (l, t), (r, b), (0, 255, 0), 2)
            cv2.putText(
                frame,
                f"ID {track_id}",
                (l, t - 5),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.6,
                (0, 255, 0),
                2,
                cv2.LINE_AA,
            )

        out.write(frame)

    cap.release()
    out.release()

    # Save tracking results to txt
    tracker_txt_out = Path(tracker_txt_out)
    with tracker_txt_out.open("w") as f:
        for (frame_idx, track_id, bb_left, bb_top, bb_width, bb_height) in all_tracks:
            f.write(
                f"{frame_idx},{track_id},{bb_left:.2f},{bb_top:.2f},{bb_width:.2f},{bb_height:.2f}\n"
            )

    print(f"Tracking done. Saved video to {output_video_path}")
    print(f"Tracking results saved to {tracker_txt_out}")


# Run tracking for Task1
TASK1_TRACKS_TXT = Path("task1_tracks.txt")
run_tracking(
    TASK1_INPUT_VIDEO,
    TASK1_OUTPUT_VIDEO,
    TASK1_TRACKS_TXT,
    yolo_model,
    deepsort_tracker,
    fps=FPS_TASK1,
)


Tracking done. Saved video to task1.mp4
Tracking results saved to task1_tracks.txt


## 3. Model Evaluation: MOTA (Task 3)

### 3.1 Load ground truth

In [24]:
# ============================================
# 3.1 Load ground truth annotations (Task1/gt/gt.txt)
# Using only the first 6 columns:
# <frame>, <id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, ...
# ============================================
def load_gt(gt_path: Path):
    """
    Load ground truth from MOT-style gt.txt.

    Assumes columns:
      1: frame
      2: id
      3: bb_left
      4: bb_top
      5: bb_width
      6: bb_height
      [7: conf (optional)]
      [8: class (optional, 1 = pedestrian)]
      [9+: other fields, ignored]

    We:
      - skip lines with conf <= 0 (unlabeled / ignored)
      - if class column exists, keep only class == 1 (pedestrians)
    """
    gt_by_frame = defaultdict(list)

    with open(gt_path, "r") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue

            cols = line.split(",")
            if len(cols) < 6:
                continue

            frame = int(cols[0])
            obj_id = int(cols[1])
            bb_left   = float(cols[2])
            bb_top    = float(cols[3])
            bb_width  = float(cols[4])
            bb_height = float(cols[5])

            # Optional 7th column: conf
            if len(cols) >= 7:
                conf = float(cols[6])
                # MOT convention: conf <= 0 => ignore
                if conf <= 0:
                    continue

            # Optional 8th column: class (1 = pedestrian)
            if len(cols) >= 8:
                cls = int(cols[7])
                if cls != 1:
                    # keep only pedestrians
                    continue

            gt_by_frame[frame].append(
                {
                    "id": obj_id,
                    "bbox": [bb_left, bb_top, bb_width, bb_height],
                }
            )

    return gt_by_frame

gt_by_frame = load_gt(TASK1_GT_PATH)
print("Loaded GT frames:", len(gt_by_frame))
print("Total GT boxes:", sum(len(v) for v in gt_by_frame.values()))


Loaded GT frames: 429
Total GT boxes: 19870


### 3.2 Load predictions (tracker output)

In [25]:
# ============================================
# 3.2 Load tracking results from our tracker output txt
# Format: <frame>, <id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>
# ============================================
def load_predictions(pred_path: Path):
    pred_by_frame = defaultdict(list)
    with pred_path.open("r") as f:
        for line in f:
            if not line.strip():
                continue
            parts = line.strip().split(",")
            frame = int(parts[0])
            track_id = int(parts[1])
            x = float(parts[2])
            y = float(parts[3])
            w = float(parts[4])
            h = float(parts[5])

            pred_by_frame[frame].append(
                {
                    "id": track_id,
                    "bbox": np.array([x, y, w, h], dtype=float),
                }
            )
    return pred_by_frame

pred_by_frame = load_predictions(TASK1_TRACKS_TXT)
print(f"Loaded predictions for {len(pred_by_frame)} frames")


Loaded predictions for 427 frames


### 3.3 IoU, Hungarian matching

In [26]:
# ============================================
# 3.3 IoU & matching utilities
# ============================================
def xywh_to_xyxy(box_xywh):
    """Convert [x, y, w, h] -> [x1, y1, x2, y2]."""
    x, y, w, h = box_xywh
    return np.array([x, y, x + w, y + h], dtype=float)


def compute_iou_matrix(gt_boxes_xywh, pred_boxes_xywh):
    """
    Compute IoU matrix between:
      - gt_boxes_xywh: list of [x, y, w, h]
      - pred_boxes_xywh: list of [x, y, w, h]
    Returns: (N_gt, N_pred) IoU matrix.
    """
    N = len(gt_boxes_xywh)
    M = len(pred_boxes_xywh)

    if N == 0 or M == 0:
        return np.zeros((N, M), dtype=float)

    gt = np.array([xywh_to_xyxy(b) for b in gt_boxes_xywh], dtype=float)  # (N,4)
    pr = np.array([xywh_to_xyxy(b) for b in pred_boxes_xywh], dtype=float)  # (M,4)

    gt_x1 = gt[:, 0][:, None]
    gt_y1 = gt[:, 1][:, None]
    gt_x2 = gt[:, 2][:, None]
    gt_y2 = gt[:, 3][:, None]

    pr_x1 = pr[:, 0][None, :]
    pr_y1 = pr[:, 1][None, :]
    pr_x2 = pr[:, 2][None, :]
    pr_y2 = pr[:, 3][None, :]

    inter_x1 = np.maximum(gt_x1, pr_x1)
    inter_y1 = np.maximum(gt_y1, pr_y1)
    inter_x2 = np.minimum(gt_x2, pr_x2)
    inter_y2 = np.minimum(gt_y2, pr_y2)

    inter_w = np.clip(inter_x2 - inter_x1, a_min=0, a_max=None)
    inter_h = np.clip(inter_y2 - inter_y1, a_min=0, a_max=None)
    inter_area = inter_w * inter_h

    gt_area = (gt_x2 - gt_x1) * (gt_y2 - gt_y1)   # (N,1)
    pr_area = (pr_x2 - pr_x1) * (pr_y2 - pr_y1)   # (1,M)
    union_area = gt_area + pr_area - inter_area

    iou = np.zeros_like(inter_area)
    mask = union_area > 0
    iou[mask] = inter_area[mask] / union_area[mask]
    return iou

### 3.4 Compute MOTA, FP, FN, IDSW, GT

In [27]:
# ============================================
# 3.4 Compute MOTA, FP, FN, IDSW, GT
# ============================================
def compute_mota(gt_by_frame, pred_by_frame, iou_threshold=0.5):
    """
    Compute MOTA, and totals of FP, FN, IDSW, and GT.
    Following the definition given in the assignment.
    """
    frames = sorted(gt_by_frame.keys())       # frames with GT
    all_frames = frames                       # just use GT frames


    total_FP = 0
    total_FN = 0
    total_IDSW = 0
    total_GT = 0

    # For ID switch tracking: gt_id -> last matched pred_id
    prev_match_for_gt = {}

    for t in all_frames:
        gt_objs = gt_by_frame.get(t, [])
        pr_objs = pred_by_frame.get(t, [])

        gt_boxes = [g["bbox"] for g in gt_objs]
        gt_ids = [g["id"] for g in gt_objs]

        pr_boxes = [p["bbox"] for p in pr_objs]
        pr_ids = [p["id"] for p in pr_objs]

        N = len(gt_boxes)
        M = len(pr_boxes)

        total_GT += N

        if N == 0 and M == 0:
            # nothing here
            continue

        # IoU matrix
        iou_mat = compute_iou_matrix(gt_boxes, pr_boxes)

        if N > 0 and M > 0:
            # Cost matrix for Hungarian: we want to maximize IoU,
            # so we minimize (1 - IoU). Set cost very high if IoU < threshold.
            cost = 1.0 - iou_mat
            cost[iou_mat < iou_threshold] = 1e6

            row_ind, col_ind = linear_sum_assignment(cost)

            matched_gt_idx = set()
            matched_pr_idx = set()

            # Evaluate matches above threshold
            for r, c in zip(row_ind, col_ind):
                if iou_mat[r, c] >= iou_threshold:
                    matched_gt_idx.add(r)
                    matched_pr_idx.add(c)

                    gt_id = gt_ids[r]
                    pr_id = pr_ids[c]

                    # Identity switch?
                    if gt_id in prev_match_for_gt:
                        if prev_match_for_gt[gt_id] != pr_id:
                            total_IDSW += 1
                    prev_match_for_gt[gt_id] = pr_id

            # FN: GT with no match
            FN_t = N - len(matched_gt_idx)

            # FP: predictions with no match
            FP_t = M - len(matched_pr_idx)

        elif N == 0 and M > 0:
            # All predictions are FP
            FP_t = M
            FN_t = 0

        elif N > 0 and M == 0:
            # All GT are FN
            FN_t = N
            FP_t = 0

        total_FN += FN_t
        total_FP += FP_t

    if total_GT == 0:
        mota = 0.0
    else:
        mota = 1.0 - (total_FN + total_FP + total_IDSW) / total_GT

    return mota, total_FP, total_FN, total_IDSW, total_GT


mota, total_FP, total_FN, total_IDSW, total_GT = compute_mota(
    gt_by_frame, pred_by_frame, iou_threshold=0.5
)

print(f"MOTA: {mota:.4f}")
print(f"Total GT:   {total_GT}")
print(f"Total FP:   {total_FP}")
print(f"Total FN:   {total_FN}")
print(f"Total IDSW: {total_IDSW}")


MOTA: 0.3938
Total GT:   19870
Total FP:   1237
Total FN:   10436
Total IDSW: 372


In [28]:
# Quick debug: how many GT vs predictions per frame?
all_gt_frames = sorted(gt_by_frame.keys())
all_pred_frames = sorted(pred_by_frame.keys())

total_GT = sum(len(gt_by_frame[f]) for f in all_gt_frames)
total_pred = sum(len(pred_by_frame.get(f, [])) for f in all_gt_frames)

print("Frames with GT:", len(all_gt_frames))
print("Total GT boxes:", total_GT, "-> avg per frame:", total_GT / len(all_gt_frames))
print("Total pred boxes on those frames:", total_pred, "-> avg per frame:", total_pred / len(all_gt_frames))


Frames with GT: 429
Total GT boxes: 19870 -> avg per frame: 46.31701631701632
Total pred boxes on those frames: 10671 -> avg per frame: 24.874125874125873


## 4. Prediction & Kaggle Competition

### 4.1 Convert Task2 images to a video and track

In [44]:
# ============================================
# 4.1 Run YOLOv8 + DeepSORT on Task2 images
#     Save annotated video (task2.mp4) and counts per frame
# ============================================
def track_on_image_sequence(
    image_dir: Path,
    output_video_path: Path,
    yolo_model,
    deepsort_tracker,
    fps: int = 14,
):
    """
    Run YOLOv8 + DeepSORT on a sequence of images in image_dir.
    Saves an annotated video and returns per-frame counts (dict: frame_idx -> count).
    """
    image_files = sorted(
        [p for p in image_dir.iterdir() if p.suffix.lower() in [".jpg", ".jpeg", ".png"]]
    )
    assert len(image_files) > 0, f"No images found in {image_dir}"

    # Read first image to get size
    first_frame = cv2.imread(str(image_files[0]))
    assert first_frame is not None, f"Could not read first image {image_files[0]}"
    height, width = first_frame.shape[:2]

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(str(output_video_path), fourcc, fps, (width, height))

    frame_counts = {}  # frame_idx (1-based) -> count of people

    for idx, img_path in enumerate(image_files, start=1):
        frame = cv2.imread(str(img_path))
        if frame is None:
            print(f"Warning: could not read {img_path}, skipping.")
            continue

        # YOLO inference
        results = yolo_model(frame, imgsz=640, conf=0.4, verbose=False)[0]
        boxes = results.boxes

        detections = []
        if boxes is not None and len(boxes) > 0:
            xyxy = boxes.xyxy.cpu().numpy()
            confs = boxes.conf.cpu().numpy()
            clss  = boxes.cls.cpu().numpy()

            for bbox, score, cls in zip(xyxy, confs, clss):
                if int(cls) != 0:   # person class only
                    continue
                x1, y1, x2, y2 = bbox
                detections.append(([x1, y1, x2, y2], score, "person"))

        tracks = deepsort_tracker.update_tracks(detections, frame=frame)

        # Count people as number of confirmed tracks in this frame
        count = 0
        for track in tracks:
            if not track.is_confirmed():
                continue
            count += 1
            track_id = track.track_id
            l, t, r, b = track.to_ltrb()

            # Draw
            cv2.rectangle(frame, (int(l), int(t)), (int(r), int(b)), (0, 255, 0), 2)
            cv2.putText(
                frame,
                f"ID {track_id}",
                (int(l), int(t) - 5),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.6,
                (0, 255, 0),
                2,
                cv2.LINE_AA,
            )

        frame_counts[idx] = count
        out.write(frame)

    out.release()
    print(f"Saved Task2 annotated video to {output_video_path}")
    return frame_counts


# (Re)initialize YOLO + DeepSORT for Task2 if you want a fresh tracker
yolo_model_task2 = init_yolo()
deepsort_task2 = init_deepsort()

task2_frame_counts = track_on_image_sequence(
    TASK2_IMAGES_DIR,
    TASK2_OUTPUT_VIDEO,
    yolo_model_task2,
    deepsort_task2,
    fps=FPS_TASK2
)


Saved Task2 annotated video to task2.mp4


### 4.2 Save frame counts

In [45]:
# ============================================
# 4.2 Save frame counts to CSV for Kaggle
#     Format:
#       Number,Count
#       1,12
#       2,15
#       ...
# ============================================
def save_counts_to_csv(frame_counts: dict, csv_path: Path):
    """
    frame_counts: dict[frame_idx] -> count
    """
    data = []
    for frame_idx in sorted(frame_counts.keys()):
        data.append({"Number": frame_idx, "Count": frame_counts[frame_idx]})

    df = pd.DataFrame(data)
    df.to_csv(csv_path, index=False)
    print(f"Saved counts to {csv_path}")

save_counts_to_csv(task2_frame_counts, TASK2_COUNTS_CSV)


Saved counts to task2_count.csv
