In [2]:
import cv2
import os
import numpy as np
import matplotlib
from matplotlib import pyplot as plt
import torch

from pathlib import Path
import sklearn
import skimage
from skimage import __version__ as skimage_version
from sklearn import __version__ as sklearn_version


path = "/content/"
debug = True

from google.colab import drive
drive.mount(path+'/drive/')

if debug == True:
    A5_savepath = path+'/drive/MyDrive/03 McGill EE Semester 5 (Fall 2025)/ECSE 415 (Clark)/A5/'

print(f"Python version:           {os.sys.version.split()[0]}")
print(f"OpenCV version:           {cv2.__version__}")
print(f"NumPy version:            {np.__version__}")
print(f"Matplotlib version:       {matplotlib.__version__}")
print(f"PyTorch version:          {torch.__version__}")
print(f"scikit-image version:     {skimage_version}")
print(f"scikit-learn version:     {sklearn_version}")
print("Path: " + path)


ModuleNotFoundError: No module named 'google.colab'

In [None]:
if debug == True:
    os.chdir(path)
    !pip install kaggle
    !mkdir -p ~/.kaggle
    !cp /content/drive/MyDrive/Kaggle_API/kaggle.json ~/.kaggle/
    !chmod 600 ~/.kaggle/kaggle.json
    !kaggle competitions download -c ecse-415-video-analysis
    !unzip -q ecse-415-video-analysis.zip -d .

In [None]:
# ===== PART 1: Data Preparation (10 points) =====
# Convert Task1 images to video at 14 FPS and save as task1_input.mp4

task1_dir = Path(path, "Object_Tracking", "Task1")
csv1_path = Path(task1_dir, "gt", "gt.txt")
ground_truth_task1 = np.loadtxt(csv1_path, delimiter=",")

images1_path = Path(task1_dir, "images")

# Get all image files and sort them by filename to ensure correct order
image_files = sorted([f for f in os.listdir(images1_path) if f.endswith(('.jpg', '.png', '.jpeg'))])

print(f"Found {len(image_files)} images in {images1_path}")

if len(image_files) == 0:
    print("ERROR: No images found!")
else:
    # Read first image to get dimensions
    first_img_path = Path(images1_path, image_files[0])
    first_frame = cv2.imread(str(first_img_path))
    H, W, _ = first_frame.shape
    
    print(f"Video dimensions: {W}x{H}")
    
    # Set up VideoWriter with better codec compatibility
    fps = 14.0  # Required FPS for Part 1
    frame_size = (W, H)
    
    # Set output path
    if debug == True:
        mp4_path = os.path.join(A5_savepath, 'task1_input.mp4')
    else:
        mp4_path = 'task1_input.mp4'
    
    # Try different codecs for better compatibility
    codecs_to_try = [
        ('avc1', 'H.264 (avc1)'),
        ('H264', 'H.264 (H264)'),
        ('X264', 'H.264 (X264)'),
        ('mp4v', 'MPEG-4'),
        ('XVID', 'XVID')
    ]
    
    writer = None
    codec_used = None
    
    for codec, codec_name in codecs_to_try:
        fourcc = cv2.VideoWriter_fourcc(*codec)
        test_writer = cv2.VideoWriter(mp4_path, fourcc, fps, frame_size)
        if test_writer.isOpened():
            writer = test_writer
            codec_used = codec_name
            print(f"Using codec: {codec_name}")
            break
        else:
            test_writer.release()
    
    if writer is None or not writer.isOpened():
        print("ERROR: Could not open VideoWriter with any codec!")
    else:
        # Write all frames to video
        frame_count = 0
        for img_file in image_files:
            img_path = Path(images1_path, img_file)
            img_bgr = cv2.imread(str(img_path))
            
            if img_bgr is not None:
                # Ensure frame is correct size
                if img_bgr.shape[:2] != (H, W):
                    img_bgr = cv2.resize(img_bgr, (W, H))
                
                # cv2.VideoWriter expects BGR format
                writer.write(img_bgr)
                frame_count += 1
            else:
                print(f"Warning: Could not read {img_file}")
        
        # IMPORTANT: Properly release the writer
        writer.release()
        
        # Verify file was created
        if os.path.exists(mp4_path):
            file_size_mb = os.path.getsize(mp4_path) / (1024 * 1024)
            print(f"\n✓ Part 1 Complete: Video created successfully!")
            print(f"  Output: {mp4_path}")
            print(f"  Codec: {codec_used}")
            print(f"  Frames written: {frame_count}/{len(image_files)}")
            print(f"  FPS: {fps}")
            print(f"  Duration: {frame_count/fps:.2f} seconds")
            print(f"  File size: {file_size_mb:.2f} MB")
        else:
            print("ERROR: Video file was not created!")

In [None]:
# ===== PART 2: Model Implementation (40 points) =====
# Install required packages for YOLOv8 + DeepSORT

# Install ultralytics (YOLOv8)
!pip install ultralytics

# Install deep-sort-realtime
!pip install deep-sort-realtime

print("✓ Packages installed successfully!")

In [None]:
# ===== PART 2: YOLOv8 + DeepSORT Tracking Implementation =====

from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# Initialize YOLOv8 model
print("Loading YOLOv8 model...")
model = YOLO('yolov8n.pt')  # Using nano model for speed, can use yolov8s.pt, yolov8m.pt for better accuracy
print("✓ YOLOv8 model loaded")

# Initialize DeepSORT tracker
print("Initializing DeepSORT tracker...")
tracker = DeepSort(
    max_age=30,           # Maximum frames to keep track alive without detections
    n_init=3,             # Number of consecutive frames for track confirmation
    max_iou_distance=0.7, # Maximum IOU distance for matching
    embedder="mobilenet", # Feature extractor
    half=True,            # Use FP16 for speed
    embedder_gpu=torch.cuda.is_available()
)
print("✓ DeepSORT tracker initialized")

# Open input video
cap = cv2.VideoCapture(mp4_path)

# Get video properties
fps_input = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print(f"\nInput Video Properties:")
print(f"  Resolution: {width}x{height}")
print(f"  FPS: {fps_input}")
print(f"  Total frames: {total_frames}")

# Set up output video writer
if debug == True:
    output_path = os.path.join(A5_savepath, 'task1.mp4')
    tracking_results_path = os.path.join(A5_savepath, 'task1_tracking.txt')
else:
    output_path = 'task1.mp4'
    tracking_results_path = 'task1_tracking.txt'

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps_input, (width, height))

# Open file to save tracking results
tracking_file = open(tracking_results_path, 'w')

# Process video frame by frame
frame_idx = 0
tracking_data = []

print("\nProcessing video with YOLOv8 + DeepSORT...")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_idx += 1
    
    # Run YOLOv8 detection (class 0 is 'person' in COCO dataset)
    results = model(frame, classes=[0], verbose=False)  # Only detect persons
    
    # Extract detections for DeepSORT
    detections = []
    for result in results:
        boxes = result.boxes
        for box in boxes:
            # Get bounding box coordinates
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            conf = box.conf[0].cpu().numpy()
            
            # Convert to [left, top, width, height] format for DeepSORT
            bbox = [x1, y1, x2 - x1, y2 - y1]
            
            # DeepSORT expects: ([left, top, width, height], confidence, class_name)
            detections.append((bbox, conf, 'person'))
    
    # Update tracker with detections
    tracks = tracker.update_tracks(detections, frame=frame)
    
    # Draw bounding boxes and IDs on frame
    for track in tracks:
        if not track.is_confirmed():
            continue
        
        track_id = track.track_id
        ltrb = track.to_ltrb()  # Get [left, top, right, bottom]
        
        x1, y1, x2, y2 = map(int, ltrb)
        bb_left = x1
        bb_top = y1
        bb_width = x2 - x1
        bb_height = y2 - y1
        
        # Save tracking result: <frame>, <id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>
        tracking_file.write(f"{frame_idx},{track_id},{bb_left},{bb_top},{bb_width},{bb_height}\n")
        tracking_data.append([frame_idx, track_id, bb_left, bb_top, bb_width, bb_height])
        
        # Draw bounding box
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        
        # Draw tracking ID
        label = f'ID: {track_id}'
        label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
        label_y = max(y1 - 10, label_size[1] + 10)
        
        # Draw background for text
        cv2.rectangle(frame, (x1, label_y - label_size[1] - 10), 
                     (x1 + label_size[0], label_y), (0, 255, 0), -1)
        
        # Draw text
        cv2.putText(frame, label, (x1, label_y - 5), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
    
    # Add frame number to video
    cv2.putText(frame, f'Frame: {frame_idx}/{total_frames}', (10, 30), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    
    # Write frame to output video
    out.write(frame)
    
    # Progress indicator
    if frame_idx % 50 == 0:
        print(f"  Processed frame {frame_idx}/{total_frames} ({frame_idx/total_frames*100:.1f}%)")

# Release resources
cap.release()
out.release()
tracking_file.close()

print(f"Output video: {output_path}")
print(f"Tracking results: {tracking_results_path}")
print(f"Total frames processed: {frame_idx}")
print(f"Total tracked detections: {len(tracking_data)}")

In [None]:
# ===== Display Sample Tracked Frame and Statistics =====

import pandas as pd

# Load tracking results
tracking_df = pd.DataFrame(tracking_data, columns=['frame', 'id', 'bb_left', 'bb_top', 'bb_width', 'bb_height'])

print("Tracking Statistics:")
print(f"  Total detections: {len(tracking_df)}")
print(f"  Unique track IDs: {tracking_df['id'].nunique()}")
print(f"  Frames with detections: {tracking_df['frame'].nunique()}")
print(f"  Average detections per frame: {len(tracking_df) / tracking_df['frame'].nunique():.2f}")

# Show distribution of track IDs
print("\nTrack ID distribution (top 10):")
print(tracking_df['id'].value_counts().head(10))

# Display a sample tracked frame
sample_frame_num = total_frames // 2  # Middle frame
cap_sample = cv2.VideoCapture(output_path)
cap_sample.set(cv2.CAP_PROP_POS_FRAMES, sample_frame_num)
ret, sample_frame = cap_sample.read()
cap_sample.release()

if ret:
    # Convert BGR to RGB for matplotlib
    sample_frame_rgb = cv2.cvtColor(sample_frame, cv2.COLOR_BGR2RGB)
    
    plt.figure(figsize=(12, 8))
    plt.imshow(sample_frame_rgb)
    plt.title(f'Sample Tracked Frame (Frame {sample_frame_num})')
    plt.axis('off')
    plt.tight_layout()
    plt.show()
    


In [None]:
# ===== PART 3: Model Evaluation (40 points) =====
# Calculate MOTA metric using ground truth and predictions

from scipy.optimize import linear_sum_assignment

# box format: [x, y, width, height]
def compute_iou(box1, box2):
    x1_min = box1[0]
    y1_min = box1[1]
    x1_max = box1[0] + box1[2]
    y1_max = box1[1] + box1[3]

    x2_min = box2[0]
    y2_min = box2[1]
    x2_max = box2[0] + box2[2]
    y2_max = box2[1] + box2[3]
    
    overlap_x_min = max(x1_min, x2_min)
    overlap_y_min = max(y1_min, y2_min)
    overlap_x_max = min(x1_max, x2_max)
    overlap_y_max = min(y1_max, y2_max)

    overlap_width = overlap_x_max - overlap_x_min
    overlap_height = overlap_y_max - overlap_y_min

    if overlap_width <= 0 or overlap_height <= 0:
        return 0.0
    
    intersection_area = overlap_width * overlap_height
    union_area = box1[2] * box1[3] + box2[2] * box2[3] - intersection_area

    return intersection_area / union_area


# gt/pred boxes format: [[x, y, w, h], [x, y, w, h], ...]
# returns IoU matrix (rows=GT, cols=Pred)
def compute_iou_one_frame(gt_boxes, pred_boxes):
    matrix = np.zeros((len(gt_boxes), len(pred_boxes)))
    for i, gt_box in enumerate(gt_boxes):
        for j, pred_box in enumerate(pred_boxes):
            matrix[i, j] = compute_iou(gt_box, pred_box)
    return matrix


# Match GT boxes to predicted boxes using Hungarian algorithm
# Returns: matches, unmatched_gt, unmatched_pred
def match_boxes(gt_boxes, pred_boxes, iou_threshold=0.5):
    # Handle edge cases
    if len(gt_boxes) == 0 or len(pred_boxes) == 0:
        return [], list(range(len(gt_boxes))), list(range(len(pred_boxes)))
    
    # Compute IoU matrix
    iou_matrix = compute_iou_one_frame(gt_boxes, pred_boxes)
    
    # Hungarian algorithm (minimizes cost, so negate IoU)
    cost_matrix = -iou_matrix
    gt_indices, pred_indices = linear_sum_assignment(cost_matrix)
    
    # Filter matches by IoU threshold
    matches = []
    matched_gt = set()
    matched_pred = set()
    
    for gt_idx, pred_idx in zip(gt_indices, pred_indices):
        iou = iou_matrix[gt_idx, pred_idx]
        if iou >= iou_threshold:
            matches.append((gt_idx, pred_idx, iou))
            matched_gt.add(gt_idx)
            matched_pred.add(pred_idx)
    
    # Find unmatched boxes
    unmatched_gt = [i for i in range(len(gt_boxes)) if i not in matched_gt]
    unmatched_pred = [i for i in range(len(pred_boxes)) if i not in matched_pred]
    
    return matches, unmatched_gt, unmatched_pred



In [None]:
# ===== PART 3: Calculate MOTA =====

print("We have the following data:")

# Load ground truth data
gt_data = ground_truth_task1  # Already loaded in Part 1
print(f"  Ground truth: {len(gt_data)} detections")

# Load prediction data
if debug == True:
    pred_path = os.path.join(A5_savepath, 'task1_tracking.txt')
else:
    pred_path = 'task1_tracking.txt'

pred_data = np.loadtxt(pred_path, delimiter=',')
print(f"  Predictions: {len(pred_data)} detections")

# Get frame range
max_frame = int(max(gt_data[:, 0].max(), pred_data[:, 0].max()))
min_frame = 1
print(f"  Frame range: {min_frame} to {max_frame}")

# Initialize counters
total_FN = 0
total_FP = 0
total_IDSW = 0
total_GT = 0

# Track GT object IDs across frames for identity switch detection
# Maps: gt_id -> pred_id from previous frame
gt_to_pred_id = {}

print("\nMOTA Metrics processing")

# Process each frame
for frame_num in range(min_frame, max_frame + 1):
    # Get boxes for this frame
    gt_frame = gt_data[gt_data[:, 0] == frame_num]
    pred_frame = pred_data[pred_data[:, 0] == frame_num]
    
    # Extract box coordinates [x, y, w, h]
    gt_boxes = gt_frame[:, 2:6]  # columns: bb_left, bb_top, bb_width, bb_height
    pred_boxes = pred_frame[:, 2:6]
    
    # Extract IDs
    gt_ids = gt_frame[:, 1] if len(gt_frame) > 0 else []
    pred_ids = pred_frame[:, 1] if len(pred_frame) > 0 else []
    
    # Match boxes using Hungarian algorithm
    matches, unmatched_gt, unmatched_pred = match_boxes(gt_boxes, pred_boxes, iou_threshold=0.5)
    
    # Count false negatives and false positives
    FN = len(unmatched_gt)
    FP = len(unmatched_pred)
    GT = len(gt_boxes)
    
    total_FN += FN
    total_FP += FP
    total_GT += GT
    
    # Count identity switches
    IDSW = 0
    for gt_idx, pred_idx, iou in matches:
        gt_id = gt_ids[gt_idx]
        pred_id = pred_ids[pred_idx]
        
        # Check if this GT object was tracked before
        if gt_id in gt_to_pred_id:
            # If it was matched to a different prediction ID, it's an identity switch
            if gt_to_pred_id[gt_id] != pred_id:
                IDSW += 1
        
        # Update tracking
        gt_to_pred_id[gt_id] = pred_id
    
    total_IDSW += IDSW
    
    # Progress indicator
    if frame_num % 50 == 0:
        print(f"  Processed frame {frame_num}/{max_frame}")

# Calculate MOTA
if total_GT > 0:
    MOTA = 1 - (total_FN + total_FP + total_IDSW) / total_GT
else:
    MOTA = 0.0

# Display results
print("\n" + "="*50)
print("MOTA EVALUATION RESULTS")
print("="*50)
print(f"False Negatives (FN):    {total_FN}")
print(f"False Positives (FP):    {total_FP}")
print(f"Identity Switches (IDSW): {total_IDSW}")
print(f"Ground Truth Objects (GT): {total_GT}")
print("-"*50)
print(f"MOTA Score: {MOTA:.4f} ({MOTA*100:.2f}%)")
print("="*50)

# Additional statistics
print("\nBreakdown:")
print(f"  Miss Rate (FN/GT):     {total_FN/total_GT:.2%}" if total_GT > 0 else "  Miss Rate: N/A")
print(f"  False Alarm Rate:      {total_FP/total_GT:.2%}" if total_GT > 0 else "  False Alarm Rate: N/A")
print(f"  ID Switch Rate:        {total_IDSW/total_GT:.2%}" if total_GT > 0 else "  ID Switch Rate: N/A")



In [None]:
# ===== Visualize MOTA Results =====

import matplotlib.pyplot as plt

# Create visualization
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Error breakdown
error_types = ['False\nNegatives', 'False\nPositives', 'Identity\nSwitches']
error_counts = [total_FN, total_FP, total_IDSW]
colors = ['#ff6b6b', '#feca57', '#48dbfb']

axes[0].bar(error_types, error_counts, color=colors)
axes[0].set_ylabel('Count')
axes[0].set_title('Error Breakdown')
axes[0].grid(axis='y', alpha=0.3)

for i, count in enumerate(error_counts):
    axes[0].text(i, count + max(error_counts)*0.02, str(count), 
                ha='center', va='bottom', fontweight='bold')

# Plot 2: MOTA components as percentages
if total_GT > 0:
    percentages = [
        total_FN / total_GT * 100,
        total_FP / total_GT * 100,
        total_IDSW / total_GT * 100
    ]
    
    axes[1].bar(error_types, percentages, color=colors)
    axes[1].set_ylabel('Percentage of GT (%)')
    axes[1].set_title('Error Rates')
    axes[1].grid(axis='y', alpha=0.3)
    
    for i, pct in enumerate(percentages):
        axes[1].text(i, pct + max(percentages)*0.02, f'{pct:.1f}%', 
                    ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.show()

# Summary table
print("\nSummary Table:")
print("-" * 60)
print(f"{'Metric':<25} {'Count':<15} {'% of GT':<15}")
print("-" * 60)
if total_GT > 0:
    print(f"{'False Negatives (FN)':<25} {total_FN:<15} {total_FN/total_GT*100:>6.2f}%")
    print(f"{'False Positives (FP)':<25} {total_FP:<15} {total_FP/total_GT*100:>6.2f}%")
    print(f"{'Identity Switches (IDSW)':<25} {total_IDSW:<15} {total_IDSW/total_GT*100:>6.2f}%")
    print(f"{'Ground Truth (GT)':<25} {total_GT:<15} {'100.00%':>11}")
    print("-" * 60)
    print(f"{'MOTA Score':<25} {MOTA:<15.4f} {MOTA*100:>6.2f}%")
print("-" * 60)

In [None]:
# ===== PART 2: YOLOv8 + ByteTrack Tracking Implementation =====

from ultralytics import YOLO

# Initialize YOLOv8 model (medium for better accuracy at distance)
print("Loading YOLOv8 model...")
model = YOLO('yolov8m.pt')  # Can also try yolov8l.pt for even better accuracy
print("✓ YOLOv8 model loaded")

# Open input video
cap = cv2.VideoCapture(mp4_path)

# Get video properties
fps_input = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print(f"\nInput Video Properties:")
print(f"  Resolution: {width}x{height}")
print(f"  FPS: {fps_input}")
print(f"  Total frames: {total_frames}")

# Set up output video writer
if debug == True:
    output_path = os.path.join(A5_savepath, 'task2.mp4')
    tracking_results_path = os.path.join(A5_savepath, 'task2_tracking.txt')
else:
    output_path = 'task2.mp4'
    tracking_results_path = 'task2_tracking.txt'

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps_input, (width, height))

# Open file to save tracking results
tracking_file = open(tracking_results_path, 'w')

# Process video frame by frame
frame_idx = 0
tracking_data = []

print("\nProcessing video with YOLOv8 + ByteTrack...")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_idx += 1
    
    # Run YOLOv8 tracking (ByteTrack built-in)
    results = model.track(
        source=frame,
        classes=[0],              # Only track people (class 0)
        conf=0.35,                # Lower confidence for better recall
        iou=0.7,                  # NMS IoU threshold
        tracker='bytetrack.yaml', # Use ByteTrack
        persist=True,             # Persist tracks across frames
        verbose=False,
        imgsz=640                 # Can increase to 1280 for small objects
    )
    
    # Extract tracking results
    for result in results:
        boxes = result.boxes
        if boxes is not None and boxes.id is not None:
            for box_coords, track_id in zip(boxes.xyxy, boxes.id):
                x1, y1, x2, y2 = box_coords.cpu().numpy()
                track_id = int(track_id.cpu().numpy())
                
                bb_left = int(x1)
                bb_top = int(y1)
                bb_width = int(x2 - x1)
                bb_height = int(y2 - y1)
                
                # Save tracking result
                tracking_file.write(f"{frame_idx},{track_id},{bb_left},{bb_top},{bb_width},{bb_height}\n")
                tracking_data.append([frame_idx, track_id, bb_left, bb_top, bb_width, bb_height])
                
                # Draw on frame
                cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                
                label = f'ID: {track_id}'
                label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
                label_y = max(int(y1) - 10, label_size[1] + 10)
                
                cv2.rectangle(frame, (int(x1), label_y - label_size[1] - 10), 
                             (int(x1) + label_size[0], label_y), (0, 255, 0), -1)
                cv2.putText(frame, label, (int(x1), label_y - 5), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
    
    # Add frame number
    cv2.putText(frame, f'Frame: {frame_idx}/{total_frames}', (10, 30), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    
    out.write(frame)
    
    if frame_idx % 50 == 0:
        print(f"  Processed frame {frame_idx}/{total_frames} ({frame_idx/total_frames*100:.1f}%)")

cap.release()
out.release()
tracking_file.close()

print(f"\n✓ Tracking complete!")
print(f"Output video: {output_path}")
print(f"Tracking results: {tracking_results_path}")
print(f"Total frames processed: {frame_idx}")
print(f"Total tracked detections: {len(tracking_data)}")