In [None]:
# D. Install Dependencies (Run this in a single code cell)
# The initial '!' tells Colab to run the command in the terminal.

# 1. Install PyTorch, OpenCV, Pandas, Matplotlib (already in Colab but good practice)
!pip install torch torchvision torchaudio opencv-python-headless pandas matplotlib pillow

# 2. Install Ultralytics (YOLOv8)
!pip install ultralytics

# 3. Install PaddleOCR and its dependencies
# The --force-reinstall and the order here help prevent the numpy conflicts.
!pip install paddlepaddle --force-reinstall
!pip install paddleocr

# 4. Install Gradio
!pip install gradio

# The 'restart runtime' issue you mentioned is often related to one package
# overwriting numpy/torch. Running all installations in one cell often helps.

In [1]:
%%writefile custom_botsort.yaml
tracker_type: botsort
track_high_thresh: 0.5
track_low_thresh: 0.1
new_track_thresh: 0.6
match_thresh: 0.8
# gmc_method: iou

# This is the "patience" setting for BoTSORT, just like track_buffer was
# It's called 'track_buffer' here too, but it's part of a different tracker's config.
track_buffer: 120

# Appeasing THIS FUCKING COLAB.
with_reid: False
gmc_method: sparseOptFlow
proximity_thresh: 0.5
appearance_thresh: 0.25
fuse_score: True

Writing custom_botsort.yaml


In [None]:
# app.py â€” Vehicle Detection, Speed Estimation & Counting (no plate OCR)

# ------------------- MAIN IMPORTS --------------------------
import gradio as gr
import cv2
import numpy as np
import math, os, tempfile, io
import pandas as pd
from collections import deque, defaultdict
from ultralytics import YOLO
import matplotlib.pyplot as plt
from PIL import Image
import logging
import torch

# ----------------- CONFIG / DEVICE ------------------------
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

logging.getLogger("ultralytics").setLevel(logging.ERROR)

# ----------------- MODELS (LOAD ONCE) ---------------------
vehicle_model = YOLO("yolov8n.pt")  # COCO model for vehicles

# ----------------- CONSTANTS & HELPERS -------------------
ACCEPTED_VEHICLE_NAMES = {"car", "motorcycle", "bus", "truck"}
CLASS_MAP = {
    "car": "car",
    "motorcycle": "motorcycle",
    "bus": "bus",
    "truck": "truck",
}


# ----------------- CORE PROCESSING ------------------------

def process_video(video_path: str, speed_threshold: float, distance_m: float = 10.0):
    """
    Process video:
      - detect & track vehicles
      - estimate speed
      - count by class
      - write processed video + csv (no plate column)
    """

    prev_positions = {}
    prev_center_y = {}
    speed_memory = defaultdict(lambda: deque(maxlen=15))
    track_label = {}

    t_in, t_out = {}, {}
    line_speed_done = set()

    counted_ids_by_class = {name: set() for name in CLASS_MAP.values()}

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    out_path = os.path.join(tempfile.gettempdir(), "processed.mp4")
    out = cv2.VideoWriter(
        out_path,
        cv2.VideoWriter_fourcc(*"mp4v"),
        fps,
        (width, height),
    )

    line_in_y = int(height * 0.40)
    line_out_y = int(height * 0.60)
    count_line_y = line_out_y

    px_between_lines = abs(line_out_y - line_in_y)
    px_per_meter = (px_between_lines / distance_m) if distance_m > 0 else None

    vehicle_summary = {}
    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_idx += 1

        # Run vehicle tracking on full frame
        v_results = vehicle_model.track(
            frame,
            imgsz=640,
            conf=0.35,
            device=DEVICE,
            verbose=False,
            persist=True,
            tracker="custom_botsort.yaml",
        )

        boxes = v_results[0].boxes

        if boxes is None or boxes.id is None:
            # draw lines and continue
            cv2.line(frame, (0, line_in_y), (width, line_in_y), (0, 0, 255), 2)
            cv2.line(frame, (0, line_out_y), (width, line_out_y), (255, 0, 0), 2)
            out.write(frame)
            continue

        names_map = getattr(v_results[0], "names", None) or vehicle_model.names

        for b in boxes:
            try:
                tid = int(b.id.item())
                coords = b.xyxy[0].cpu().numpy().astype(int)
                cls_idx = int(b.cls[0].item())
            except Exception:
                continue

            x1, y1, x2, y2 = coords
            x1o, y1o, x2o, y2o = x1, y1, x2, y2

            name = names_map.get(cls_idx, str(cls_idx))
            if name not in ACCEPTED_VEHICLE_NAMES:
                continue

            label = CLASS_MAP.get(name, "car")
            track_label[tid] = label

            cx, cy = (x1o + x2o) // 2, (y1o + y2o) // 2
            now_s = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0

            # ------------ speed estimation ------------
            if tid in prev_positions:
                px, py, pt = prev_positions[tid]
                dist_px = math.hypot(cx - px, cy - py)
                dt = max(1e-6, now_s - pt)
                dist_m = dist_px / px_per_meter if px_per_meter else dist_px * 0.01
                inst_speed_kmh = (dist_m / dt) * 3.6
                speed_memory[tid].append(inst_speed_kmh)
            prev_positions[tid] = (cx, cy, now_s)

            # ------------ line crossing & count ------------
            if tid in prev_center_y:
                py = prev_center_y[tid]
                if py < line_in_y <= cy and tid not in t_in:
                    t_in[tid] = now_s
                if py < line_out_y <= cy and tid not in t_out:
                    t_out[tid] = now_s
                if py < count_line_y <= cy:
                    if label not in counted_ids_by_class:
                        counted_ids_by_class[label] = set()
                    if tid not in counted_ids_by_class[label]:
                        counted_ids_by_class[label].add(tid)
            prev_center_y[tid] = cy

            # speed from line-crossing (more stable)
            if tid in t_in and tid in t_out and tid not in line_speed_done:
                dt = max(1e-6, t_out[tid] - t_in[tid])
                line_speed_kmh = (distance_m / dt) * 3.6
                for _ in range(5):
                    speed_memory[tid].append(line_speed_kmh)
                line_speed_done.add(tid)

            avg_speed = float(np.mean(speed_memory[tid])) if len(speed_memory[tid]) else 0.0
            is_overspeeding = avg_speed > speed_threshold

            # draw overlays
            color = (0, 0, 255) if is_overspeeding else (0, 255, 0)
            cv2.rectangle(frame, (x1o, y1o), (x2o, y2o), color, 2)
            label_text = f"ID {tid} {label} {avg_speed:.1f} km/h"
            cv2.putText(
                frame,
                label_text,
                (x1o, max(20, y1o - 8)),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.55,
                (255, 255, 0),
                2,
            )

            # summary (NO plate field)
            current_summary = vehicle_summary.get(
                tid,
                {
                    "track_id": tid,
                    "class": label,
                    "speed_kmh": 0.0,
                    "overspeeding": False,
                },
            )
            current_summary["class"] = label
            current_summary["overspeeding"] = bool(is_overspeeding)
            current_summary["speed_kmh"] = round(avg_speed, 2)
            vehicle_summary[tid] = current_summary

        # draw reference lines
        cv2.line(frame, (0, line_in_y), (width, line_in_y), (0, 0, 255), 2)
        cv2.line(frame, (0, line_out_y), (width, line_out_y), (255, 0, 0), 2)

        out.write(frame)

    cap.release()
    out.release()

    # Final per-class counts (only vehicles with some speed info)
    final_counts = defaultdict(int)
    for item in vehicle_summary.values():
        if item["speed_kmh"] > 0:
            final_counts[item["class"]] += 1

    vehicle_counts = {cls: final_counts[cls] for cls in CLASS_MAP.values()}
    for cls in final_counts:
        if cls not in vehicle_counts:
            vehicle_counts[cls] = final_counts[cls]

    # CSV without plate column
    csv_path = os.path.join(tempfile.gettempdir(), "sort_log.csv")
    pd.DataFrame(list(vehicle_summary.values())).to_csv(csv_path, index=False)

    return out_path, csv_path, vehicle_counts


# ----------------- PLOTTING HELPERS ------------------------

def plot_vehicle_count_graph(vehicle_counts):
    plt.figure(figsize=(6, 4))
    labels = list(vehicle_counts.keys())
    values = list(vehicle_counts.values())
    plt.bar(labels, values)
    plt.title("Vehicle Count by Class")
    plt.xlabel("Vehicle Class")
    plt.ylabel("Count")
    plt.grid(axis="y")
    buf = io.BytesIO()
    plt.savefig(buf, format="png", bbox_inches="tight")
    plt.close()
    buf.seek(0)
    return Image.open(buf)


# ----------------- UI ------------------------

def run_ui():
    def run_detection(video, threshold, distance_m):
        # Need to return 3 outputs even when video is None
        if video is None:
            return None, None, None

        if isinstance(video, str):
            tmp_path = video
        else:
            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp:
                tmp.write(video.read())
                tmp_path = tmp.name

        processed_path, csv_path, vehicle_counts = process_video(
            tmp_path, threshold, distance_m
        )

        count_plot = plot_vehicle_count_graph(vehicle_counts)

        # CSV file, processed video, bar chart image
        return csv_path, processed_path, count_plot

    with gr.Blocks(theme=gr.themes.Soft()) as demo:
        gr.Markdown("# ðŸš˜ Vehicle Speed & Vehicle Counting System")

        with gr.Tabs():
            # -------- Tab 1: upload & detect ----------
            with gr.Tab("ðŸ“¤ Upload & Detect"):
                with gr.Row():
                    with gr.Column():
                        video_input = gr.Video(label="Upload Vehicle Video")
                        threshold_input = gr.Slider(
                            10,
                            200,
                            step=5,
                            value=60,
                            label="Speed Threshold (km/h)",
                        )
                        distance_input = gr.Slider(
                            3,
                            50,
                            step=1,
                            value=10,
                            label="Distance between red & blue lines (m)",
                        )
                        detect_btn = gr.Button(
                            "Run Detection",
                            variant="primary",
                        )
                    with gr.Column():
                        processed_video_output = gr.Video(
                            label="Processed Video (Detections + Speed)",
                        )

            # -------- Tab 2: results & data ----------
            with gr.Tab("ðŸ“Š Results & Data"):
                with gr.Row():
                    count_graph = gr.Image(label="Vehicle Count Bar Chart")
                with gr.Row():
                    csv_output = gr.File(label="Download Full Log (.csv)")

        detect_btn.click(
            fn=run_detection,
            inputs=[video_input, threshold_input, distance_input],
            outputs=[
                csv_output,
                processed_video_output,
                count_graph,
            ],
        )

    demo.launch(debug=False, share=True, show_error=True)


if __name__ == "__main__":
    run_ui()
