In [5]:
from google.colab.patches import cv2_imshow

In [7]:
!pip install ultralytics opencv-python numpy



In [10]:
# =========================================================
# Highway Congestion Monitoring System (Colab Optimized)
# Requirements:
# pip install ultralytics opencv-python numpy
# =========================================================

import cv2
import numpy as np
from ultralytics import YOLO
from IPython.display import Video

# -----------------------------
# CONFIGURATION
# -----------------------------
VIDEO_PATH = "traffic.mp4"      # Upload your video
MODEL_PATH = "yolov8n.pt"

# COCO vehicle classes
VEHICLE_CLASSES = [2, 3, 5, 7]  # car, motorcycle, bus, truck

# Congestion parameters
LOW_SPEED_THRESHOLD = 10        # pixels per second
MIN_VEHICLES = 8
MIN_LOW_SPEED = 5

MAX_MISSING = 30                # frames before removing vehicle
RESIZE_WIDTH = 960
RESIZE_HEIGHT = 540

# -----------------------------
# LOAD MODEL
# -----------------------------
model = YOLO(MODEL_PATH)

# -----------------------------
# VIDEO CAPTURE
# -----------------------------
cap = cv2.VideoCapture(VIDEO_PATH)
fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0:
    fps = 30

print("FPS:", fps)

# Output video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output.mp4', fourcc, fps, (RESIZE_WIDTH, RESIZE_HEIGHT))

# -----------------------------
# TRACKING MEMORY
# -----------------------------
prev_positions = {}
speed_memory = {}
last_seen = {}

# =========================================================
# MAIN LOOP
# =========================================================
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame = cv2.resize(frame, (RESIZE_WIDTH, RESIZE_HEIGHT))

    results = model.track(frame, persist=True, conf=0.3, iou=0.5, verbose=False)

    current_ids = set()

    if results[0].boxes.id is not None:
        boxes = results[0].boxes.xyxy.cpu().numpy()
        ids = results[0].boxes.id.cpu().numpy()
        classes = results[0].boxes.cls.cpu().numpy()

        for box, track_id, cls in zip(boxes, ids, classes):

            if int(cls) not in VEHICLE_CLASSES:
                continue

            x1, y1, x2, y2 = map(int, box)
            cx = int((x1 + x2) / 2)
            cy = int((y1 + y2) / 2)

            current_ids.add(track_id)
            last_seen[track_id] = 0

            # Speed Calculation
            if track_id in prev_positions:
                px, py = prev_positions[track_id]
                distance = np.sqrt((cx - px) ** 2 + (cy - py) ** 2)
                speed = distance * fps  # convert to pixels per second
            else:
                speed = 0

            prev_positions[track_id] = (cx, cy)
            speed_memory[track_id] = speed

            # Draw bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, f"ID:{int(track_id)}", (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
            cv2.putText(frame, f"Speed:{int(speed)}", (x1, y2 + 15),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

    # -----------------------------
    # Remove Missing Vehicles
    # -----------------------------
    for tid in list(last_seen.keys()):
        if tid not in current_ids:
            last_seen[tid] += 1
            if last_seen[tid] > MAX_MISSING:
                last_seen.pop(tid, None)
                prev_positions.pop(tid, None)
                speed_memory.pop(tid, None)

    # -----------------------------
    # Congestion Detection
    # -----------------------------
    vehicle_count = len(speed_memory)
    low_speed_count = sum(
        1 for s in speed_memory.values() if s < LOW_SPEED_THRESHOLD
    )

    if vehicle_count > MIN_VEHICLES and low_speed_count > MIN_LOW_SPEED:
        cv2.putText(frame, "CONGESTION DETECTED",
                    (50, 50),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    1.2,
                    (0, 0, 255),
                    3)

    cv2.putText(frame, f"Vehicles: {vehicle_count}",
                (50, 90),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.8,
                (0, 255, 255),
                2)

    cv2.putText(frame, f"Slow Vehicles: {low_speed_count}",
                (50, 120),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.8,
                (0, 255, 255),
                2)

    # Write frame to output video
    out.write(frame)

# =========================================================
# RELEASE RESOURCES
# =========================================================
cap.release()
out.release()
cv2.destroyAllWindows()

print("Processing Complete ✅")

# Display video inside Colab
Video("output.mp4")


FPS: 24.0
Processing Complete ✅
