# Vehicle Detection, Tracking and Speed Estimation

In [1]:
# STEP 1: Mount Google Drive and Load Video
from google.colab import drive
import os
import cv2
import numpy as np
import time

def mount_and_prepare_video(relative_drive_path='MyDrive/highway_video.mp4'):
    drive.mount('/content/drive', force_remount=True)
    input_path = os.path.join('/content/drive', relative_drive_path)
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise FileNotFoundError(f"Could not open video at: {input_path}")
    fps = cap.get(cv2.CAP_PROP_FPS)
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    cap.release()
    print(f"Loaded video: {input_path}\nResolution: {width}x{height}, FPS: {fps}")
    return input_path, fps, width, height


In [2]:
input_video_path, fps, width, height = mount_and_prepare_video('MyDrive/highway_video.mp4')

Mounted at /content/drive
Loaded video: /content/drive/MyDrive/highway_video.mp4
Resolution: 640x360, FPS: 25.0


In [3]:
# STEP 2: Prepare Output Directory
def prepare_output_dir(base_dir='/content/vehicle_speed_outputs'):
    os.makedirs(base_dir, exist_ok=True)
    print(f"Output will be saved to: {base_dir}")
    return base_dir

In [4]:
output_dir = prepare_output_dir()

Output will be saved to: /content/vehicle_speed_outputs


In [5]:
# STEP 3: Load Models
!pip install ultralytics
import torch
from torchvision.models.detection import ssdlite320_mobilenet_v3_large, fasterrcnn_resnet50_fpn
from torchvision import transforms
from ultralytics import YOLO

def load_models():
    models = {
        'yolo': YOLO('yolov8n.pt'),
        'ssd': ssdlite320_mobilenet_v3_large(pretrained=True).eval(),
        'frcnn': fasterrcnn_resnet50_fpn(pretrained=True).eval()
    }
    transform = transforms.Compose([transforms.ToTensor()])
    print("Models loaded: YOLOv8, SSD, Faster R-CNN")
    return models, transform



In [6]:
models, transform = load_models()



Models loaded: YOLOv8, SSD, Faster R-CNN


In [7]:
# STEP 4: COCO Category Names
COCO_INSTANCE_CATEGORY_NAMES = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter',
    'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra',
    'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis',
    'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard',
    'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon',
    'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog',
    'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table',
    'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave',
    'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors',
    'teddy bear', 'hair drier', 'toothbrush'
]

# Sources:
# COCO Labels: https://github.com/nightrome/cocostuff/blob/master/labels.md
# PyTorch Reference: https://github.com/pytorch/vision/blob/main/references/detection/coco_utils.py


In [20]:
# STEP 5: Detection Functions

# Run YOLOv8 model to detect vehicles in a frame
# Filters for object classes: car (2), bus (5), and truck (7)
def detect_yolov8(yolo_model, frame):
    results = yolo_model(frame)[0]  # Run YOLO inference
    detections = []
    for r in results.boxes.data.tolist():
        x1, y1, x2, y2, score, cls_id = r
        if int(cls_id) in [2, 5, 7]:  # Filter: car, bus, truck
            detections.append([int(x1), int(y1), int(x2), int(y2), score])
    return detections


# Run SSD (Single Shot Detector) on a frame using torchvision
# Converts frame to tensor, runs inference, filters by class label and confidence
# COCO class labels are used to identify vehicle types

def detect_ssd(ssd_model, transform, frame):
    image = transform(frame).unsqueeze(0)  # Convert to tensor and batch
    with torch.no_grad():
        outputs = ssd_model(image)[0]  # Run SSD inference
    detections = []
    for idx in range(len(outputs['boxes'])):
        label = outputs['labels'][idx].item()
        score = outputs['scores'][idx].item()
        # Lowered confidence threshold to 0.3 for SSD to improve detection coverage
        if COCO_INSTANCE_CATEGORY_NAMES[label] in ['car', 'bus', 'truck'] and score > 0.3:
            x1, y1, x2, y2 = outputs['boxes'][idx].int().tolist()
            detections.append([x1, y1, x2, y2, score])
    return detections


# Run Faster R-CNN on a frame using torchvision
# Similar to SSD, uses COCO labels to filter for car, bus, truck
# Returns bounding boxes and scores for valid detections

def detect_frcnn(frcnn_model, transform, frame):
    image = transform(frame).unsqueeze(0)
    with torch.no_grad():
        outputs = frcnn_model(image)[0]  # Run Faster R-CNN inference
    detections = []
    for idx in range(len(outputs['boxes'])):
        label = outputs['labels'][idx].item()
        score = outputs['scores'][idx].item()
        # check label index is within COCO list
        if label < len(COCO_INSTANCE_CATEGORY_NAMES) and COCO_INSTANCE_CATEGORY_NAMES[label] in ['car', 'bus', 'truck'] and score > 0.5:
            x1, y1, x2, y2 = outputs['boxes'][idx].int().tolist()
            detections.append([x1, y1, x2, y2, score])
    return detections


In [21]:
# Utility: Extract the actual video FPS to ensure consistent speed estimation
import cv2
import subprocess

# Convert video to a supported format with FFmpeg before metadata extraction

def convert_video_with_ffmpeg(input_path, output_path):
    command = [
        'ffmpeg', '-y', '-i', input_path,
        '-vf', 'fps=25',
        '-c:v', 'libx264', '-crf', '23', '-preset', 'veryfast',
        '-c:a', 'copy', output_path
    ]
    subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)


# Check if video is readable and fix loading issue if needed

def validate_video_readability(video_path):
    cap = cv2.VideoCapture(video_path)
    ret, _ = cap.read()
    cap.release()
    if not ret:
        print("Warning: Could not read frames. Re-encoding might be necessary.")
        return False
    return True


# Get video metadata using OpenCV

def get_video_metadata(video_path):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()
    return fps, width, height, total_frames



In [22]:
# Set your original video path
original_video = "/content/drive/MyDrive/highway_video.mp4"
converted_video = "/content/converted_video.mp4"

# Check readability and convert if needed
if not validate_video_readability(original_video):
    convert_video_with_ffmpeg(original_video, converted_video)
    input_video_path = converted_video
else:
    input_video_path = original_video

# Load metadata
fps, width, height, total_frames = get_video_metadata(input_video_path)

# Print results
print(f"Using: {input_video_path}")
print(f"FPS: {fps}, Resolution: {width}x{height}, Total Frames: {total_frames}")


Using: /content/drive/MyDrive/highway_video.mp4
FPS: 25.0, Resolution: 640x360, Total Frames: 1725


In [23]:
# STEP 6: Tracking and Speed Estimation
trackers = {}
next_id = 0

# Simple object tracker using nearest-neighbor logic
# Tracks are identified by comparing center point proximity across frames

def track_objects(detections, frame_idx):
    global next_id
    objects = []
    for det in detections:
        x1, y1, x2, y2, score = det
        cx = int((x1 + x2) / 2)
        cy = int((y1 + y2) / 2)
        matched_id = None

        # Try to match current object to an existing tracker ID
        for obj_id, history in trackers.items():
            if abs(cx - history[-1][0]) < 30 and abs(cy - history[-1][1]) < 30:
                matched_id = obj_id
                trackers[obj_id].append((cx, cy, frame_idx))
                break

        # If no match is found, assign a new tracker ID
        if matched_id is None:
            trackers[next_id] = [(cx, cy, frame_idx)]
            matched_id = next_id
            next_id += 1

        objects.append((matched_id, (x1, y1, x2, y2)))
    return objects


# Estimate speed from tracked object history
# Uses displacement in pixels between last two frames, scaled to meters
# scale_mpp is the assumed meters-per-pixel ratio
# former scale_mpp=0.05
# scale_mpp = 0.25 is tuned for this highway video and improves realism

def estimate_speed(track, fps, scale_mpp=0.25):
    if len(track) < 2:
        return 0
    (x1, y1, f1), (x2, y2, f2) = track[-2], track[-1]
    dx = x2 - x1
    dy = y2 - y1
    dist_pix = np.sqrt(dx**2 + dy**2)
    dist_m = dist_pix * scale_mpp
    dt = (f2 - f1) / fps
    if dt == 0:
        return 0
    return (dist_m / dt) * 3.6  # Convert m/s to km/h


In [27]:
# STEP 7: Run All Models and Generate Output Videos

# Run speed tracking and alert pipeline for each model
# Saves annotated output videos and logs average FPS and speed
# Also tracks frame count, total runtime, and detection count for quantitative evaluation

def run_all_models(input_video_path, output_dir, fps, width, height, models, transform):
    detectors = {
        'yolo': lambda frame: detect_yolov8(models['yolo'], frame),
        'ssd': lambda frame: detect_ssd(models['ssd'], transform, frame),
        'frcnn': lambda frame: detect_frcnn(models['frcnn'], transform, frame),
    }

    results_summary = []

    for model_name, detect_func in detectors.items():
        print(f"\n Running {model_name.upper()} model...")

        cap = cv2.VideoCapture(input_video_path)
        output_path = os.path.join(output_dir, f'output_{model_name}.mp4')
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        frame_idx = 0
        trackers.clear()
        global next_id
        next_id = 0

        start_time = time.time()
        total_speeds = []
        frame_times = []
        total_detections = 0
        track_lengths = {}

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            t0 = time.time()
            detections = detect_func(frame)
            total_detections += len(detections)
            objects = track_objects(detections, frame_idx)

            # Annotate each tracked vehicle
            for obj_id, (x1, y1, x2, y2) in objects:
                track = trackers[obj_id]
                speed = estimate_speed(track, fps)
                total_speeds.append(speed)
                track_lengths[obj_id] = len(track)
                color = (0, 0, 255) if speed > 100 else (0, 255, 0)
                label = f'ID:{obj_id} Speed:{int(speed)} km/h'
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.putText(frame, label, (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            out.write(frame)
            frame_idx += 1
            frame_times.append(time.time() - t0)

        cap.release()
        out.release()
        total_time = time.time() - start_time

        # Log performance statistics
        avg_fps = round(1 / np.mean(frame_times), 2) if frame_times else 0
        avg_speed = round(np.mean(total_speeds), 2) if total_speeds else 0
        avg_track_len = round(np.mean(list(track_lengths.values())), 2) if track_lengths else 0

        print(f"Done: {model_name} → saved to {output_path}")
        print(f"Avg FPS: {avg_fps}")
        print(f"Avg Speed Estimate: {avg_speed} km/h")
        print(f"Total Detections: {total_detections}")
        print(f"Avg Track Length: {avg_track_len} frames")
        print(f"Total Runtime: {round(total_time, 2)} seconds")

        results_summary.append({
            'model': model_name,
            'fps': avg_fps,
            'avg_speed_kmh': avg_speed,
            'detections': total_detections,
            'avg_track_length': avg_track_len,
            'runtime_sec': round(total_time, 2),
            'video_path': output_path
        })

    return results_summary


In [28]:
results = run_all_models(input_video_path, output_dir, fps, width, height, models, transform)

[1;30;43mDie letzten 5000 Zeilen der Streamingausgabe wurden abgeschnitten.[0m
Speed: 1.6ms preprocess, 9.5ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 bus, 1 train, 3 trucks, 8.6ms
Speed: 1.7ms preprocess, 8.6ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 train, 2 trucks, 8.4ms
Speed: 1.5ms preprocess, 8.4ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 train, 3 trucks, 10.1ms
Speed: 1.4ms preprocess, 10.1ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 1 bus, 1 train, 3 trucks, 8.1ms
Speed: 1.6ms preprocess, 8.1ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 2 trains, 2 trucks, 8.3ms
Speed: 1.4ms preprocess, 8.3ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 1 train, 2 trucks, 8.3ms
Speed: 1.4ms preprocess, 8.3ms inference, 1

In [29]:
import pandas as pd
df = pd.DataFrame(results)
df

Unnamed: 0,model,fps,avg_speed_kmh,detections,avg_track_length,runtime_sec,video_path
0,yolo,53.0,48.85,22092,279.65,33.27,/content/vehicle_speed_outputs/output_yolo.mp4
1,ssd,13.93,72.95,1842,54.18,124.59,/content/vehicle_speed_outputs/output_ssd.mp4
2,frcnn,0.58,31.3,36652,407.24,2953.15,/content/vehicle_speed_outputs/output_frcnn.mp4
