Installing Packages

In [None]:

print("üì¶ Installing packages...")
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
                      "ultralytics", "opencv-python-headless", "filterpy",
                      "scipy", "numpy", "matplotlib"])
print("‚úÖ Packages installed!\n")

Import Libraries

In [None]:
import cv2
import numpy as np
from collections import defaultdict, deque
import os
import time
from ultralytics import YOLO
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment

os.makedirs('output', exist_ok=True)
print("‚úÖ Libraries imported!\n")

Kalman Box Tracker

In [None]:
class KalmanBoxTracker:
    """Kalman Filter for tracking bounding boxes."""
    count = 0

    def __init__(self, bbox):
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],
                              [0,0,0,1,0,0,0],[0,0,0,0,1,0,0],[0,0,0,0,0,1,0],
                              [0,0,0,0,0,0,1]])
        self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],
                              [0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])
        self.kf.R[2:, 2:] *= 10.0
        self.kf.P[4:, 4:] *= 1000.0
        self.kf.P *= 10.0
        self.kf.Q[-1, -1] *= 0.01
        self.kf.Q[4:, 4:] *= 0.01
        self.kf.x[:4] = self.convert_bbox_to_z(bbox)
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0

    def update(self, bbox):
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(self.convert_bbox_to_z(bbox))

    def predict(self):
        if self.kf.x[6] + self.kf.x[2] <= 0:
            self.kf.x[6] *= 0.0
        self.kf.predict()
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(self.convert_x_to_bbox(self.kf.x))
        return self.history[-1]

    def get_state(self):
        return self.convert_x_to_bbox(self.kf.x)

    @staticmethod
    def convert_bbox_to_z(bbox):
        w = bbox[2] - bbox[0]
        h = bbox[3] - bbox[1]
        x = bbox[0] + w / 2.0
        y = bbox[1] + h / 2.0
        s = w * h
        r = w / float(h)
        return np.array([x, y, s, r]).reshape((4, 1))

    @staticmethod
    def convert_x_to_bbox(x, score=None):
        w = np.sqrt(x[2] * x[3])
        h = x[2] / w
        if score is None:
            return np.array([x[0] - w/2., x[1] - h/2., x[0] + w/2., x[1] + h/2.]).reshape((1, 4))
        else:
            return np.array([x[0] - w/2., x[1] - h/2., x[0] + w/2., x[1] + h/2., score]).reshape((1, 5))

print("‚úÖ Kalman Box Tracker defined\n")

Define IoU and association

In [None]:
def iou_batch(bb_test, bb_gt):
    bb_gt = np.expand_dims(bb_gt, 0)
    bb_test = np.expand_dims(bb_test, 1)
    xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
    yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
    xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
    yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    intersection = w * h
    area_test = (bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
    area_gt = (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1])
    union = area_test + area_gt - intersection
    return intersection / union

def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
    if len(trackers) == 0:
        return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)
    iou_matrix = iou_batch(detections, trackers)
    if min(iou_matrix.shape) > 0:
        a = (iou_matrix > iou_threshold).astype(np.int32)
        if a.sum(1).max() == 1 and a.sum(0).max() == 1:
            matched_indices = np.stack(np.where(a), axis=1)
        else:
            matched_indices = linear_sum_assignment(-iou_matrix)
            matched_indices = np.array(list(zip(*matched_indices)))
    else:
        matched_indices = np.empty(shape=(0, 2))
    unmatched_detections = []
    for d, det in enumerate(detections):
        if d not in matched_indices[:, 0]:
            unmatched_detections.append(d)
    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if t not in matched_indices[:, 1]:
            unmatched_trackers.append(t)
    matches = []
    for m in matched_indices:
        if iou_matrix[m[0], m[1]] < iou_threshold:
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1, 2))
    if len(matches) == 0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)
    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)

print("‚úÖ Association functions defined\n")

Deep SORT

In [None]:
class DeepSORT:
    def __init__(self, max_age=30, min_hits=3, iou_threshold=0.3):
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.trackers = []
        self.frame_count = 0
        self.trajectories = defaultdict(lambda: deque(maxlen=30))

    def update(self, detections):
        self.frame_count += 1
        trks = np.zeros((len(self.trackers), 5))
        to_del = []
        for t, trk in enumerate(trks):
            pos = self.trackers[t].predict()[0]
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            if np.any(np.isnan(pos)):
                to_del.append(t)
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
        for t in reversed(to_del):
            self.trackers.pop(t)
        matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(
            detections, trks, self.iou_threshold)
        for m in matched:
            self.trackers[m[1]].update(detections[m[0], :])
        for i in unmatched_dets:
            trk = KalmanBoxTracker(detections[i, :])
            self.trackers.append(trk)
        ret = []
        i = len(self.trackers)
        for trk in reversed(self.trackers):
            d = trk.get_state()[0]
            if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                ret.append(np.concatenate((d, [trk.id + 1])).reshape(1, -1))
                center = ((d[0] + d[2]) / 2, (d[1] + d[3]) / 2)
                self.trajectories[trk.id].append(center)
            i -= 1
            if trk.time_since_update > self.max_age:
                self.trackers.pop(i)
        if len(ret) > 0:
            return np.concatenate(ret)
        return np.empty((0, 5))

    def predict_trajectory(self, track_id, num_frames=10):
        for tracker in self.trackers:
            if tracker.id == track_id:
                predictions = []
                saved_x = tracker.kf.x.copy()
                saved_P = tracker.kf.P.copy()
                for _ in range(num_frames):
                    tracker.kf.predict()
                    pred_box = KalmanBoxTracker.convert_x_to_bbox(tracker.kf.x)[0]
                    center = ((pred_box[0] + pred_box[2]) / 2, (pred_box[1] + pred_box[3]) / 2)
                    predictions.append(center)
                tracker.kf.x = saved_x
                tracker.kf.P = saved_P
                return predictions
        return []

print("‚úÖ Deep SORT tracker defined\n")

YoLo

In [None]:
yolo_model = YOLO('yolov8n.pt')
COLORS = np.random.randint(0, 255, size=(200, 3), dtype=np.uint8)
print("‚úÖ YOLO model loaded!\n")

Visualization

In [None]:
def draw_boxes_and_trajectories(frame, tracked_objects, tracker, show_predictions=True):
    for track in tracked_objects:
        x1, y1, x2, y2, track_id = track
        track_id = int(track_id)
        x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
        color = COLORS[track_id % len(COLORS)].tolist()
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        label = f'ID: {track_id}'
        (label_w, label_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
        cv2.rectangle(frame, (x1, y1 - label_h - 10), (x1 + label_w, y1), color, -1)
        cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        if track_id in tracker.trajectories:
            points = list(tracker.trajectories[track_id])
            for i in range(1, len(points)):
                if points[i - 1] is None or points[i] is None:
                    continue
                thickness = int(np.sqrt(64 / float(i + 1)) * 1.5)
                cv2.line(frame, (int(points[i-1][0]), int(points[i-1][1])),
                        (int(points[i][0]), int(points[i][1])), color, thickness)
        if show_predictions:
            predictions = tracker.predict_trajectory(track_id - 1, num_frames=10)
            if len(predictions) > 0:
                for i in range(1, len(predictions)):
                    cv2.line(frame, (int(predictions[i-1][0]), int(predictions[i-1][1])),
                            (int(predictions[i][0]), int(predictions[i][1])), color, 2, lineType=cv2.LINE_AA)
                cv2.circle(frame, (int(predictions[-1][0]), int(predictions[-1][1])), 5, color, -1)
    return frame

print("‚úÖ Visualization function defined\n")

Processing Function

In [None]:
def process_video(video_path, output_path='output/tracked_video.mp4',
                  conf_threshold=0.4, max_frames=300, show_predictions=True):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"‚ùå Error: Could not open video {video_path}")
        return None
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"üìπ Video info: {width}x{height} @ {fps}fps, {total_frames} frames")
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    tracker = DeepSORT(max_age=30, min_hits=3, iou_threshold=0.3)
    frame_count = 0
    total_detections = 0
    processing_times = []
    print(f"üöÄ Processing video (max {max_frames} frames)...\n")
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if max_frames and frame_count >= max_frames:
            break
        start_time = time.time()
        results = yolo_model(frame, conf=conf_threshold, verbose=False)
        detections = []
        for result in results:
            boxes = result.boxes
            for box in boxes:
                cls = int(box.cls[0])
                if cls in [0, 1, 2, 3, 5, 7]:
                    x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                    conf = float(box.conf[0])
                    detections.append([x1, y1, x2, y2, conf])
        detections = np.array(detections) if len(detections) > 0 else np.empty((0, 5))
        tracked_objects = tracker.update(detections)
        frame = draw_boxes_and_trajectories(frame, tracked_objects, tracker, show_predictions)
        info_text = f"Frame: {frame_count + 1} | Objects: {len(tracked_objects)} | FPS: {1 / (time.time() - start_time):.1f}"
        cv2.putText(frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        out.write(frame)
        processing_times.append(time.time() - start_time)
        total_detections += len(tracked_objects)
        frame_count += 1
        if frame_count % 30 == 0:
            print(f"   ‚úì Processed {frame_count} frames...")
    cap.release()
    out.release()
    avg_fps = 1 / np.mean(processing_times)
    print(f"\n{'='*60}")
    print(f"‚úÖ Processing complete!")
    print(f"{'='*60}")
    print(f"  Frames processed: {frame_count}")
    print(f"  Average FPS: {avg_fps:.2f}")
    print(f"  Total detections: {total_detections}")
    print(f"  Output saved to: {output_path}")
    print(f"{'='*60}\n")
    return output_path

print("‚úÖ Processing function defined\n")

Input Processing

In [None]:
print("üìπ VIDEO SOURCE OPTIONS")
print("="*60)
print("Sample Videos Available:")
print("  1. MOT17-04 - Crowded street (pedestrians) ‚≠ê RECOMMENDED")
print("  2. MOT17-02 - Shopping area (pedestrians)")
print("  3. MOT17-11 - Highway (cars + people)")
print("  4. Upload your own video")
print("="*60)

from google.colab import files
import urllib.request

SAMPLE_VIDEOS = {
    '1': {
        'name': 'MOT17-04',
        'url': 'https://motchallenge.net/sequenceVideos/MOT17-04-DPM-raw.mp4',
        'description': 'Crowded street - many pedestrians',
        'filename': 'MOT17-04-sample.mp4'
    },
    '2': {
        'name': 'MOT17-02',
        'url': 'https://motchallenge.net/sequenceVideos/MOT17-02-DPM-raw.mp4',
        'description': 'Shopping area - indoor pedestrians',
        'filename': 'MOT17-02-sample.mp4'
    },
    '3': {
        'name': 'MOT17-11',
        'url': 'https://motchallenge.net/sequenceVideos/MOT17-11-DPM-raw.mp4',
        'description': 'Highway - vehicles and people',
        'filename': 'MOT17-11-sample.mp4'
    }
}

choice = input("\nEnter your choice (1-4): ").strip()
input_video = None

if choice in ['1', '2', '3']:
    video_info = SAMPLE_VIDEOS[choice]
    print(f"\nüì• Downloading {video_info['name']}...")
    print(f"   Description: {video_info['description']}")

    try:
        def download_progress(block_num, block_size, total_size):
            downloaded = block_num * block_size
            if total_size > 0:
                percent = min(downloaded * 100 / total_size, 100)
                print(f"\r   Progress: {percent:.1f}%", end='')

        urllib.request.urlretrieve(
            video_info['url'],
            video_info['filename'],
            download_progress
        )
        print("\n‚úÖ Download complete!")
        input_video = video_info['filename']
        print(f"   File: {input_video}")
        print(f"   Source: MOT Challenge Dataset")

    except Exception as e:
        print(f"\n‚ùå Download failed: {e}")
        print("   Please choose option 4 to upload your own video")
        choice = '4'

if choice == '4' or input_video is None:
    print("\nüì§ UPLOAD YOUR VIDEO")
    print("-" * 60)
    uploaded = files.upload()
    if uploaded:
        input_video = list(uploaded.keys())[0]
        print(f"‚úÖ Video uploaded: {input_video}")
    else:
        print("‚ùå No video provided")

# Process
if input_video and os.path.exists(input_video):
    print("\n" + "="*60)
    print("üöÄ STARTING PROCESSING")
    print("="*60 + "\n")

    output_video = process_video(
        input_video,
        output_path='output/tracked_output.mp4',
        conf_threshold=0.4,
        max_frames=300,
        show_predictions=True
    )

    if output_video:
        print("\nüì• Downloading result...")
        files.download(output_video)
        print("‚úÖ Done! Your tracked video is ready!")

        print("\n" + "="*60)
        print("üí° TIPS:")
        print("="*60)
        print("‚Ä¢ Full video: Set max_frames=None")
        print("‚Ä¢ Faster: Increase conf_threshold to 0.6")
        print("‚Ä¢ More detections: Decrease conf_threshold to 0.3")
        print("‚Ä¢ MOT17 videos are ~750 frames at 30 FPS")
        print("="*60)
else:
    print("\n‚ùå No video to process")

print("\n" + "="*60)
print("üéâ ALL DONE!")
print("="*60)