# Avg Speed and Vehicle Detection

### Workflow
1. Read frames sequentially from the video stream.
2. Resize frames for consistent processing.
3. Detect vehicles using YOLO object detection.
4. Extract bounding boxes of detected vehicles.
5. Track vehicles to obtain persistent IDs and centroids.
6. Monitor centroid movement across virtual lines.
7. Measure time taken to travel between lines.
8. Compute individual vehicle speeds.
9. Calculate average speed for traffic analysis.
10. Determine congestion level from average speed.
11. Visualize detections, speeds, and congestion on frames.

In [1]:
!pip install ultralytics opencv-python pandas numpy

Collecting ultralytics
  Downloading ultralytics-8.4.14-py3-none-any.whl.metadata (39 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.4.14-py3-none-any.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m35.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.4.14 ultralytics-thop-2.0.18


In [2]:
from google.colab.patches import cv2_imshow

In [3]:
import cv2
import numpy as np
import pandas as pd
import math
import time
from ultralytics import YOLO


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [4]:
model = YOLO("yolov8n.pt")   # nano = fast for Colab


[KDownloading https://github.com/ultralytics/assets/releases/download/v8.4.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 342.2MB/s 0.0s


In [12]:
from google.colab import files
uploaded = files.upload()


Saving traffic2.mp4 to traffic2.mp4


In [6]:
class Tracker:
    def __init__(self):
        self.next_id = 0
        self.objects = {}

    def update(self, detections):
        updated_objects = {}

        for box in detections:
            x1, y1, x2, y2 = box
            cx = int((x1 + x2) / 2)
            cy = int((y1 + y2) / 2)

            matched_id = None

            for obj_id, (px, py) in self.objects.items():
                distance = math.hypot(cx - px, cy - py)
                if distance < 35:     # threshold
                    matched_id = obj_id
                    break

            if matched_id is None:
                matched_id = self.next_id
                self.next_id += 1

            updated_objects[matched_id] = (cx, cy)

        self.objects = updated_objects
        return updated_objects



In [7]:
LINE_Y1 = 250   # entry line
LINE_Y2 = 320   # exit line
PIXEL_DISTANCE = abs(LINE_Y2 - LINE_Y1)

PIXEL_TO_METER = 0.05   # adjust depending on camera view


In [8]:
tracker = Tracker()

entry_time = {}
speeds = {}

stopped_counter = {}
STOP_THRESHOLD_FRAMES = 20
MIN_MOVEMENT = 2


In [24]:
video_path = list(uploaded.keys())[0]
cap = cv2.VideoCapture(video_path)
print(list(uploaded.keys())[0])

frame_id = 0


traffic2.mp4


In [None]:
while True:

    # Read next frame from the video
    ret, frame = cap.read()

    # Stop if video has ended
    if not ret:
        break

    # Increment processed frame counter
    frame_id += 1

    # Resize frame for consistent computation & speed
    frame = cv2.resize(frame, (960, 540))

    # Run YOLO inference on the frame
    results = model(frame)

    detections = []   # Store vehicle bounding boxes

    # -------- OBJECT DETECTION --------
    for r in results:
        for box in r.boxes:

            # Extract detected class ID
            cls = int(box.cls[0])

            # Filter only vehicle classes (car, bike, bus, truck)
            if cls in [2, 3, 5, 7]:

                # Extract bounding box coordinates
                x1, y1, x2, y2 = map(int, box.xyxy[0])

                # Save detection for tracking stage
                detections.append([x1, y1, x2, y2])

                # Draw bounding box on frame
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)

    # -------- TRACKING STAGE --------
    # Assign persistent IDs & compute centroids
    objects = tracker.update(detections)

    # Process each tracked vehicle
    for obj_id, (cx, cy) in objects.items():

        # Draw centroid of tracked vehicle
        cv2.circle(frame, (cx, cy), 4, (0,0,255), -1)

        # Display tracking ID
        cv2.putText(frame, f"ID {obj_id}", (cx, cy-10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 2)

        # -------- SPEED MEASUREMENT LOGIC --------

        # Record entry time when centroid crosses first line
        if cy > LINE_Y1 and obj_id not in entry_time:
            entry_time[obj_id] = time.time()

        # Compute speed when centroid crosses second line
        if cy > LINE_Y2 and obj_id in entry_time and obj_id not in speeds:

            MIN_TIME = 0.3  # Prevent noise / instant jumps

            # Time taken between the two lines
            elapsed = time.time() - entry_time[obj_id]

            if elapsed > MIN_TIME:

                # Convert pixel distance → real-world meters
                meters = PIXEL_DISTANCE * PIXEL_TO_METER

                # Speed formula (m/s → km/h)
                speed = (meters / elapsed) * 3.6

                # Ignore unrealistic speeds
                if 0 < speed < 200:
                    speeds[obj_id] = speed

        # Display speed if available
        if obj_id in speeds:
            cv2.putText(frame, f"{int(speeds[obj_id])} km/h",
                        (cx, cy+20),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,255), 2)

    # -------- TRAFFIC ANALYSIS --------
    if len(speeds) > 0:

        # Compute average vehicle speed
        avg_speed = sum(speeds.values()) / len(speeds)

        # Display average speed
        cv2.putText(frame, f"Average Speed: {int(avg_speed)} km/h",
                    (20, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)

        # Detect congestion condition
        if avg_speed < 20:
            cv2.putText(frame, "HEAVY CONGESTION",
                        (20, 80),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)

    # -------- VISUAL AIDS --------
    # Draw speed measurement reference lines
    cv2.line(frame, (0, LINE_Y1), (960, LINE_Y1), (255,0,0), 2)
    cv2.line(frame, (0, LINE_Y2), (960, LINE_Y2), (0,255,255), 2)

    # Display every 10th frame for performance
    if frame_id % 10 == 0:
        cv2_imshow(frame)

    # Exit on ESC key press
    if cv2.waitKey(1) & 0xFF == 27:
        break

# Release video resources
cap.release()

# Close OpenCV windows
cv2.destroyAllWindows()


# Stopped Vehicle

### Workflow
1. Open the video file and read frames sequentially.
2. Resize each frame for consistent processing.
3. Run YOLO detection to identify vehicles.
4. Extract bounding boxes for detected vehicles.
5. Update the tracker to obtain vehicle IDs and centroids.
6. Measure centroid movement between consecutive frames.
7. Determine if movement is below a stopping threshold.
8. Count consecutive still frames for each vehicle.
9. Classify vehicle as stopped after sufficient still frames.
10. Visualize tracking results and stop status on the frame.
11. Display processed frames at intervals for efficiency.

In [None]:
# ---------------- IMPORTS ----------------
import cv2          # OpenCV for video processing & drawing
import time         # Time utilities (not strictly required here)
import math         # Distance calculations

# ---------------- PARAMETERS ----------------

VIDEO_SIZE = (960, 540)   # Standardized frame resolution

# YOLO class IDs representing vehicles
VEHICLE_CLASSES = [2, 3, 5, 7]   # car, motorcycle, bus, truck

STOP_DISTANCE = 6     # Pixel movement tolerance (noise filter)
STOP_FRAMES = 20      # Frames required to declare STOPPED

# ---------------- MEMORY STRUCTURES ----------------

previous_positions = {}   # Stores last centroid per vehicle ID
stop_counter = {}         # Counts consecutive still frames
frame_id = 0              # Frame counter for display control

# Open the video file
cap = cv2.VideoCapture(video_path)

# ---------------- MAIN VIDEO LOOP ----------------
while True:

    # Read next frame from video
    ret, frame = cap.read()

    # Exit loop if video ends
    if not ret:
        break

    # Increment frame counter
    frame_id += 1

    # Resize frame for consistent calculations
    frame = cv2.resize(frame, VIDEO_SIZE)

    # Run YOLO detection on current frame
    results = model(frame)

    detections = []   # Store bounding boxes for tracker

    # ---------------- YOLO DETECTION ----------------
    for r in results:
        for box in r.boxes:

            # Extract predicted class ID
            cls = int(box.cls[0])

            # Keep only vehicle detections
            if cls in VEHICLE_CLASSES:

                # Extract bounding box coordinates
                x1, y1, x2, y2 = map(int, box.xyxy[0])

                # Add detection for tracking
                detections.append([x1, y1, x2, y2])

                # Draw bounding box for visualization
                cv2.rectangle(frame, (x1, y1), (x2, y2),
                              (0, 255, 0), 2)

    # ---------------- TRACKER UPDATE ----------------
    # Assign persistent IDs & compute centroids
    objects = tracker.update(detections)

    # ---------------- STOPPED VEHICLE LOGIC ----------------
    for obj_id, (cx, cy) in objects.items():

        stopped = False   # Default state

        # Check if vehicle existed in previous frame
        if obj_id in previous_positions:

            # Retrieve previous centroid
            px, py = previous_positions[obj_id]

            # Compute centroid displacement
            distance = math.hypot(cx - px, cy - py)

            # If movement is very small → vehicle considered still
            if distance < STOP_DISTANCE:
                stop_counter[obj_id] = stop_counter.get(obj_id, 0) + 1
            else:
                # Reset counter if vehicle moves
                stop_counter[obj_id] = 0

            # Declare STOPPED after enough still frames
            if stop_counter[obj_id] >= STOP_FRAMES:
                stopped = True

        # Store centroid for next frame comparison
        previous_positions[obj_id] = (cx, cy)

        # ---------------- VISUALIZATION ----------------
        # Red → stopped, Green → moving
        color = (0, 0, 255) if stopped else (0, 255, 0)

        # Draw centroid marker
        cv2.circle(frame, (cx, cy), 5, color, -1)

        # Display status label
        label = f"STOPPED ID {obj_id}" if stopped else f"ID {obj_id}"

        cv2.putText(frame,
                    label,
                    (cx, cy - 10),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.5,
                    (255, 255, 255),
                    2)

    # ---------------- DISPLAY CONTROL ----------------
    # Show only some frames to keep runtime fast
    if frame_id % 10 == 0:
        cv2_imshow(frame)

# Release video resources
cap.release()

# Close OpenCV windows
cv2.destroyAllWindows()
