In [1]:
pip install ultralytics opencv-python numpy


Note: you may need to restart the kernel to use updated packages.


In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from IPython.display import display, clear_output
import matplotlib.pyplot as plt

# INPUT FILES

In [None]:
VIDEO_PATH = "video.mp4"
MODEL_PATH = "yolov8n.pt"

# VEHICLE SETTINGS

In [None]:
VEHICLE_CLASSES = [2,3,5,7]

# TRAFFIC / CONGESTION PARAMETERS

In [None]:
LOW_SPEED_THRESHOLD = 25
MIN_VEHICLES = 8
MIN_LOW_SPEED = 5
CONGESTION_FRAMES = 40

# MEMORY DICTIONARIES

In [7]:
prev_positions = {}
speed_memory = {}
speed_history = {}
last_seen = {}

MAX_MISSING = 30
congestion_counter = 0

## LOAD YOLO MODEL

In [None]:
model = YOLO(MODEL_PATH)
cap = cv2.VideoCapture(VIDEO_PATH)

fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0:
    fps = 30

width = 960
height = 540


# VIDEO OUTPUT SETUP

In [None]:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height))

print("Processing Video...")


## MAIN VIDEO LOOP

In [None]:
while cap.isOpened():

    ret, frame = cap.read()

    # Stop if video ends
    if not ret:
        break

    # Resize frame for faster processing
    frame = cv2.resize(frame, (width, height))

    # Detect and track objects using YOLO
    results = model.track(frame, persist=True, conf=0.3, iou=0.5, verbose=False)

    # Check if detections exist
    if results[0].boxes is not None and results[0].boxes.id is not None:

        boxes = results[0].boxes.xyxy.cpu().numpy()  # bounding boxes
        ids = results[0].boxes.id.cpu().numpy()      # tracking IDs
        clss = results[0].boxes.cls.cpu().numpy()    # class IDs

        current_ids = set()

        # Loop through detected objects
        for box, track_id, cls in zip(boxes, ids, clss):

            # Ignore non-vehicle classes
            if int(cls) not in VEHICLE_CLASSES:
                continue

            # Bounding box coordinates
            x1,y1,x2,y2 = map(int,box)

            # Calculate center point
            cx,cy = int((x1+x2)/2), int((y1+y2)/2)

            current_ids.add(track_id)
            last_seen[track_id] = 0

            # ------------------------------
            # SPEED CALCULATION
            # Based on pixel distance between frames
            # ------------------------------

            if track_id in prev_positions:

                px,py = prev_positions[track_id]

                # Distance moved between frames
                distance = np.sqrt((cx-px)**2 + (cy-py)**2)

                # Speed = distance * FPS
                instant_speed = distance * fps

                # Smooth speed using moving average
                if track_id not in speed_history:
                    speed_history[track_id] = []

                speed_history[track_id].append(instant_speed)

                if len(speed_history[track_id]) > 5:
                    speed_history[track_id].pop(0)

                speed = sum(speed_history[track_id]) / len(speed_history[track_id])
                speed_memory[track_id] = speed

            else:
                speed_memory[track_id] = 0

            # Save current position
            prev_positions[track_id] = (cx,cy)

            # ------------------------------
            # DRAW VISUAL OUTPUT
            # ------------------------------

            cv2.rectangle(frame,(x1,y1),(x2,y2),(0,255,0),2)
            cv2.putText(frame,f"ID:{int(track_id)}",(x1,y1-10),
                        cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,255,0),2)

            cv2.putText(frame,f"Spd:{int(speed_memory[track_id])}",
                        (x1,y2+15),
                        cv2.FONT_HERSHEY_SIMPLEX,0.5,(255,0,0),2)

        # ------------------------------
        # REMOVE LOST VEHICLES
        # ------------------------------

        for tid in list(last_seen.keys()):
            if tid not in current_ids:
                last_seen[tid]+=1
                if last_seen[tid] > MAX_MISSING*2:
                    last_seen.pop(tid,None)
                    prev_positions.pop(tid,None)
                    speed_memory.pop(tid,None)
                    speed_history.pop(tid,None)

        # ------------------------------
        # TRAFFIC ANALYSIS
        # ------------------------------

        vehicle_count=len(speed_memory)

        # Count slow vehicles
        low_speed_count=sum(1 for s in speed_memory.values() if s<LOW_SPEED_THRESHOLD)

        # Congestion logic
        if vehicle_count>MIN_VEHICLES and low_speed_count>MIN_LOW_SPEED:
            congestion_counter+=1
        else:
            congestion_counter=0

        # Display congestion warning
        if congestion_counter>CONGESTION_FRAMES:
            cv2.putText(frame,"CONGESTION DETECTED",(50,50),
                        cv2.FONT_HERSHEY_SIMPLEX,1.2,(0,0,255),3)

        # ------------------------------
        # TRAFFIC LEVEL CLASSIFICATION
        # ------------------------------

        if vehicle_count<5:
            level="FREE FLOW"
        elif vehicle_count<12:
            level="MODERATE"
        else:
            level="HEAVY"

        # Display traffic stats
        cv2.putText(frame,f"Vehicles:{vehicle_count}",(50,90),
                    cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,255,255),2)

        cv2.putText(frame,f"Slow:{low_speed_count}",(50,120),
                    cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,255,255),2)

        cv2.putText(frame,level,(50,150),
                    cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,0),2)

    # Save processed frame into output video
    out.write(frame)

    # Display frame inside Jupyter Notebook
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    clear_output(wait=True)
    plt.imshow(frame_rgb)
    plt.axis("off")
    display(plt.gcf())

# Release resources
cap.release()
out.release()

print("Processing Finished! Video saved as output.mp4")