Objective: Traffic Monitoring

- Red Light cameras --> Track vehicle trajectory after detection --. Determine it crossed line during red.
- Highway congestion analysis --> Follow groups of cars --> estimate avergae speed, detect stopped vehicles.
- Wrong way driving alert --. track detection --> if car goes opposie to flow --> alarm
- Toll booth/APNR --> Track vehicle from far --> zoom inw hen close for plate reading.

Steps:

1. Import Video File
2. Video Selection
3. Create Frame
4. Object tracking
5. Drawing Box
6. Testing



# 1. Environment Setup & Installation

In [5]:
!pip install -q ultralytics opencv-python-headless numpy matplotlib lap pytesseract

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m38.2 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.7 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m97.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [6]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from ultralytics import YOLO
import pytesseract

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [1]:
# =========================================
# 2. Configuration (Edit Only This Cell)
# =========================================

class CFG:
    # --- Model ---
    yolo_model = "yolov8n.pt"   # you can change to 'yolov8s.pt' etc.
    tracker_cfg = "bytetrack.yaml"  # or 'botsort.yaml' [web:13]

    # --- Paths (update with your files) ---
    # Place your videos in /content and set names here:
    red_light_video = "/content/4.mp4"
    highway_video   = "/content/4.mp4"
    wrong_way_video = "/content/4.mp4"
    toll_video      = "/content/4.mp4"

    # --- General Detection Filters ---
    vehicle_classes = [2, 3, 5, 7]  # YOLO COCO: car, motorcycle, bus, truck (id depends on model) [web:19]

    conf_thres = 0.4
    iou_thres  = 0.5

    # --- Red-light Configuration ---
    # Stop-line: (x1, y1, x2, y2) in image pixel coords
    stop_line = (200, 400, 1000, 400)  # example; adjust per video
    # Region where traffic light is located (x1, y1, x2, y2)
    signal_roi = (20, 20, 150, 150)
    # For simple demo: manually set current signal state if you don't have a trained light-color model
    # options: "red", "yellow", "green"
    manual_signal_state = "red"

    # --- Highway Speed Estimation ---
    # Reference line for speed measurement (entering and leaving line)
    enter_line = (200, 350, 1000, 350)
    exit_line  = (200, 450, 1000, 450)
    # Approx real-world distance between enter_line and exit_line (meters)
    distance_m = 30.0
    fps_assumed = 30.0  # update if you know true fps

    stopped_speed_thresh = 1.0  # km/h considered 'stopped'
    stopped_frame_thresh = 30   # frames where speed < threshold to mark stopped

    # --- Wrong-way Driving ---
    # Assume allowed direction is "top-to-bottom" (dy > 0).
    # If majority motion is reversed, raise alarm.
    wrong_way_min_frames = 15

    # --- Toll / ANPR ---
    # ROI in which we zoom when vehicle close to booth
    toll_close_roi = (500, 300, 900, 700)  # example

    # Plate OCR config (for pytesseract)
    tesseract_config = "--oem 3 --psm 7"

cfg = CFG()
print("Config loaded. Edit CFG class above for customization.")


Config loaded. Edit CFG class above for customization.


In [2]:
# =========================================
# 3. Utility Functions (Geometry & Helpers)
# =========================================

def line_intersection_y(p1, p2, y_line):
    # returns x at which a line from p1 to p2 crosses horizontal y_line (if any)
    (x1, y1), (x2, y2) = p1, p2
    if (y1 < y_line and y2 < y_line) or (y1 > y_line and y2 > y_line) or (y1 == y2):
        return None
    t = (y_line - y1) / (y2 - y1)
    x_int = x1 + t * (x2 - x1)
    return x_int

def is_crossing_line(prev_center, center, line):
    x1, y1, x2, y2 = line
    if y1 == y2:  # horizontal line
        y_line = y1
        y_prev, y_curr = prev_center[1], center[1]
        if (y_prev < y_line and y_curr >= y_line) or (y_prev > y_line and y_curr <= y_line):
            return True
    else:
        # approximate using bounding box center projection if needed
        pass
    return False

def draw_line(img, line, color=(0, 255, 255), thickness=2, label=None):
    x1, y1, x2, y2 = line
    cv2.line(img, (x1, y1), (x2, y2), color, thickness)
    if label:
        cv2.putText(img, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

def crop_roi(frame, roi):
    x1, y1, x2, y2 = roi
    return frame[y1:y2, x1:x2]

def put_centered_text(img, text, y=50, color=(0,0,255), scale=1.0, thickness=2):
    w = img.shape[1]
    (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, scale, thickness)
    x = (w - tw) // 2
    cv2.putText(img, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, scale, color, thickness)


In [7]:
# =========================================
# 4. Load YOLO Model (with Tracker)
# =========================================

model = YOLO(cfg.yolo_model)
print("YOLO model loaded:", cfg.yolo_model)


[KDownloading https://github.com/ultralytics/assets/releases/download/v8.4.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 144.0MB/s 0.0s
YOLO model loaded: yolov8n.pt


In [8]:
# =========================================
# 5. Base Detection + Tracking Wrapper
# =========================================
# Ultralytics provides built-in multi-object tracking with BoT-SORT / ByteTrack. [web:13]

def yolo_track_video(
    video_path,
    tracker_cfg=cfg.tracker_cfg,
    show=False,
    save_output_path=None,
    process_frame_fn=None
):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Failed to open video:", video_path)
        return

    fps = cap.get(cv2.CAP_PROP_FPS) or cfg.fps_assumed
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    if save_output_path:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(save_output_path, fourcc, fps, (width, height))
    else:
        out = None

    frame_idx = 0

    # state dictionary you can use inside process_frame_fn
    state = {
        "fps": fps,
        "frame_idx": 0,
        "width": width,
        "height": height
    }

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_idx += 1
        state["frame_idx"] = frame_idx

        # YOLO tracking call. 'track' returns tracked boxes with IDs. [web:13]
        results = model.track(
            frame,
            tracker=tracker_cfg,
            conf=cfg.conf_thres,
            iou=cfg.iou_thres,
            persist=True,
            verbose=False
        )

        if len(results) > 0:
            r = results[0]
        else:
            r = None

        # process_frame_fn can annotate frame, update metrics, etc.
        if process_frame_fn is not None:
            frame = process_frame_fn(frame, r, state)

        if out is not None:
            out.write(frame)

        if show:
            cv2.imshow("Traffic Monitoring", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    cap.release()
    if out is not None:
        out.release()
    if show:
        cv2.destroyAllWindows()

    print("Done:", video_path, "->", save_output_path)


In [9]:
# =========================================
# 6. Red-Light Violation Detection
# =========================================
# Idea [web:14][web:21][web:12]:
# - Track vehicles with IDs.
# - Maintain their center positions.
# - When signal is red and trajectory crosses the stop-line from "before" to "after", log violation.

from collections import defaultdict

def red_light_process_frame(frame, result, state):
    stop_line = cfg.stop_line
    draw_line(frame, stop_line, (0, 255, 255), 2, "STOP LINE")

    # For now, use manual signal state; you can replace with a trained light-color detector. [web:8][web:12]
    signal_state = cfg.manual_signal_state
    x1, y1, x2, y2 = cfg.signal_roi
    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255 if signal_state == "red" else 0), 2)
    cv2.putText(frame, f"Signal: {signal_state.upper()}", (x1, y1-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7,
                (0, 0, 255 if signal_state == "red" else (0,255,0)), 2)

    if "track_history" not in state:
        state["track_history"] = defaultdict(list)
    if "violations" not in state:
        state["violations"] = set()

    track_history = state["track_history"]
    violations = state["violations"]

    if result is not None and result.boxes is not None:
        boxes = result.boxes
        for box in boxes:
            cls = int(box.cls[0])
            if cls not in cfg.vehicle_classes:
                continue
            id_ = int(box.id[0]) if box.id is not None else None
            if id_ is None:
                continue

            x1b, y1b, x2b, y2b = map(int, box.xyxy[0])
            cx, cy = int((x1b + x2b) / 2), int((y1b + y2b) / 2)

            # draw box and id
            cv2.rectangle(frame, (x1b, y1b), (x2b, y2b), (0, 255, 0), 2)
            cv2.circle(frame, (cx, cy), 3, (0, 0, 255), -1)
            cv2.putText(frame, f"ID {id_}", (x1b, y1b-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            # update history
            history = track_history[id_]
            history.append((cx, cy))
            if len(history) > 2:
                prev = history[-2]
                # check crossing
                if signal_state == "red" and is_crossing_line(prev, (cx, cy), stop_line):
                    if id_ not in violations:
                        violations.add(id_)
                        cv2.putText(frame, "VIOLATION!", (x1b, y1b-30),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    # overlay count
    cv2.putText(frame, f"Violations: {len(state['violations'])}", (20, state["height"]-20),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    return frame

def run_red_light_module():
    print("Running Red-Light Violation Detection...")
    yolo_track_video(
        cfg.red_light_video,
        tracker_cfg=cfg.tracker_cfg,
        show=False,
        save_output_path="/content/red_light_output.mp4",
        process_frame_fn=red_light_process_frame
    )

run_red_light_module()

Running Red-Light Violation Detection...
Done: /content/4.mp4 -> /content/red_light_output.mp4


In [10]:
# =========================================
# 7. Highway Congestion & Speed Estimation
# =========================================
# Idea [web:11][web:17][web:20]:
# - Maintain entry & exit timestamps per track when crossing two lines.
# - Estimate speed = distance / time.
# - Detect stopped vehicles if speed below threshold for N frames.
# - Compute rolling average speed to infer congestion.

import time
from collections import deque

def highway_process_frame(frame, result, state):
    draw_line(frame, cfg.enter_line, (255, 0, 0), 2, "ENTER")
    draw_line(frame, cfg.exit_line, (0, 255, 0), 2, "EXIT")

    if "track_history" not in state:
        state["track_history"] = {}
    if "enter_time" not in state:
        state["enter_time"] = {}
    if "exit_time" not in state:
        state["exit_time"] = {}
    if "speeds_kmh" not in state:
        state["speeds_kmh"] = {}
    if "stopped_frames" not in state:
        state["stopped_frames"] = {}
    if "avg_speed_window" not in state:
        state["avg_speed_window"] = deque(maxlen=100)

    track_history = state["track_history"]
    enter_time = state["enter_time"]
    exit_time = state["exit_time"]
    speeds_kmh = state["speeds_kmh"]
    stopped_frames = state["stopped_frames"]
    avg_speed_window = state["avg_speed_window"]

    fps = state["fps"]

    if result is not None and result.boxes is not None:
        for box in result.boxes:
            cls = int(box.cls[0])
            if cls not in cfg.vehicle_classes:
                continue

            id_ = int(box.id[0]) if box.id is not None else None
            if id_ is None:
                continue

            x1b, y1b, x2b, y2b = map(int, box.xyxy[0])
            cx, cy = int((x1b + x2b) / 2), int((y1b + y2b) / 2)

            cv2.rectangle(frame, (x1b, y1b), (x2b, y2b), (255, 255, 0), 2)
            cv2.putText(frame, f"ID {id_}", (x1b, y1b-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            if id_ not in track_history:
                track_history[id_] = []
            track_history[id_].append((cx, cy))
            hist = track_history[id_]
            if len(hist) > 2:
                prev = hist[-2]
                # enter line
                if is_crossing_line(prev, (cx, cy), cfg.enter_line) and id_ not in enter_time:
                    enter_time[id_] = state["frame_idx"]
                # exit line
                if is_crossing_line(prev, (cx, cy), cfg.exit_line) and id_ not in exit_time:
                    exit_time[id_] = state["frame_idx"]
                    if id_ in enter_time:
                        dt_frames = exit_time[id_] - enter_time[id_]
                        dt_sec = dt_frames / fps
                        if dt_sec > 0:
                            speed_mps = cfg.distance_m / dt_sec
                            speed_kmh = speed_mps * 3.6
                            speeds_kmh[id_] = speed_kmh
                            avg_speed_window.append(speed_kmh)

            # stopped detection (using short baseline of history)
            if len(hist) >= 5:
                # displacement over last N points
                x0, y0 = hist[-5]
                dx = cx - x0
                dy = cy - y0
                dist_pix = np.sqrt(dx*dx + dy*dy)
                # approximate "speed" in pixel/frame and threshold it
                if id_ not in stopped_frames:
                    stopped_frames[id_] = 0
                if dist_pix < 2:  # low movement -> candidate stopped
                    stopped_frames[id_] += 1
                else:
                    stopped_frames[id_] = 0

                if stopped_frames[id_] >= cfg.stopped_frame_thresh:
                    cv2.putText(frame, "STOPPED", (x1b, y2b+20),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255), 2)

            # draw speed if known
            if id_ in speeds_kmh:
                cv2.putText(frame, f"{speeds_kmh[id_]:.1f} km/h", (x1b, y2b+40),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2)

    # global congestion metrics
    if len(avg_speed_window) > 0:
        avg_speed = np.mean(avg_speed_window)
    else:
        avg_speed = 0.0

    congestion_text = f"Avg speed: {avg_speed:.1f} km/h"
    cv2.putText(frame, congestion_text, (20, 40),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,255), 2)

    return frame

def run_highway_module():
    print("Running Highway Congestion & Speed...")
    yolo_track_video(
        cfg.highway_video,
        tracker_cfg=cfg.tracker_cfg,
        show=False,
        save_output_path="/content/highway_output.mp4",
        process_frame_fn=highway_process_frame
    )

run_highway_module()


Running Highway Congestion & Speed...
Done: /content/4.mp4 -> /content/highway_output.mp4


In [11]:
# =========================================
# 8. Wrong-Way Driving Detection
# =========================================
# Idea [web:9][web:10][web:18][web:21]:
# - For each track, accumulate last N centers.
# - Compute average direction vector (dx, dy).
# - If allowed direction is downwards (dy > 0) but average dy < 0 for long enough, mark as wrong-way.

def wrong_way_process_frame(frame, result, state):
    if "track_history" not in state:
        state["track_history"] = {}
    if "wrong_way_ids" not in state:
        state["wrong_way_ids"] = set()

    track_history = state["track_history"]
    wrong_way_ids = state["wrong_way_ids"]

    if result is not None and result.boxes is not None:
        for box in result.boxes:
            cls = int(box.cls[0])
            if cls not in cfg.vehicle_classes:
                continue

            id_ = int(box.id[0]) if box.id is not None else None
            if id_ is None:
                continue

            x1b, y1b, x2b, y2b = map(int, box.xyxy[0])
            cx, cy = int((x1b + x2b) / 2), int((y1b + y2b) / 2)

            if id_ not in track_history:
                track_history[id_] = []
            track_history[id_].append((cx, cy))
            # keep last 30 points
            track_history[id_] = track_history[id_][-30:]

            hist = track_history[id_]
            if len(hist) >= cfg.wrong_way_min_frames:
                x0, y0 = hist[0]
                x_last, y_last = hist[-1]
                dx = x_last - x0
                dy = y_last - y0

                # Allowed direction: downwards (dy > 0)
                if dy < -10:  # significant upward movement
                    wrong_way_ids.add(id_)

            color = (0, 255, 0)
            if id_ in wrong_way_ids:
                color = (0, 0, 255)
                cv2.putText(frame, "WRONG WAY!", (x1b, y1b-30),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255), 2)

            cv2.rectangle(frame, (x1b, y1b), (x2b, y2b), color, 2)
            cv2.putText(frame, f"ID {id_}", (x1b, y1b-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)

    put_centered_text(frame, f"Wrong-way IDs: {len(state['wrong_way_ids'])}",
                      y=40, color=(0,0,255))

    return frame

def run_wrong_way_module():
    print("Running Wrong-Way Driving Detection...")
    yolo_track_video(
        cfg.wrong_way_video,
        tracker_cfg=cfg.tracker_cfg,
        show=False,
        save_output_path="/content/wrong_way_output.mp4",
        process_frame_fn=wrong_way_process_frame
    )

run_wrong_way_module()

Running Wrong-Way Driving Detection...
Done: /content/4.mp4 -> /content/wrong_way_output.mp4


In [12]:
# =========================================
# 9. Toll Booth / ANPR (Zoom + Plate OCR)
# =========================================
# Idea [web:12][web:21]:
# - Track leading vehicle approaching booth.
# - When its bounding box center is inside toll_close_roi, crop area around vehicle.
# - Run plate detector (could be YOLO plate model or classical methods) + OCR.
# - For demo: simple bbox-based zoom + OCR on whole bbox.

def toll_anpr_process_frame(frame, result, state):
    x1r, y1r, x2r, y2r = cfg.toll_close_roi
    cv2.rectangle(frame, (x1r, y1r), (x2r, y2r), (255, 0, 0), 2)
    cv2.putText(frame, "TOLL ROI", (x1r, y1r-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,0,0), 2)

    if "last_plate" not in state:
        state["last_plate"] = ""

    if result is not None and result.boxes is not None:
        for box in result.boxes:
            cls = int(box.cls[0])
            if cls not in cfg.vehicle_classes:
                continue

            id_ = int(box.id[0]) if box.id is not None else None
            if id_ is None:
                continue

            x1b, y1b, x2b, y2b = map(int, box.xyxy[0])
            cx, cy = int((x1b + x2b) / 2), int((y1b + y2b) / 2)

            cv2.rectangle(frame, (x1b, y1b), (x2b, y2b), (0, 255, 0), 2)
            cv2.putText(frame, f"ID {id_}", (x1b, y1b-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)

            # if vehicle center is inside toll ROI, zoom and OCR
            if x1r < cx < x2r and y1r < cy < y2r:
                veh_crop = frame[y1b:y2b, x1b:x2b]
                if veh_crop.size == 0:
                    continue

                # Optional: resize crop for better OCR
                scale = 3
                veh_zoom = cv2.resize(veh_crop, None, fx=scale, fy=scale,
                                      interpolation=cv2.INTER_CUBIC)

                # For proper ANPR you should train a plate detector. [web:12][web:21]
                # Here we run OCR on whole zoomed vehicle bbox as a placeholder.
                gray = cv2.cvtColor(veh_zoom, cv2.COLOR_BGR2GRAY)
                _, thr = cv2.threshold(gray, 0, 255,
                                       cv2.THRESH_BINARY + cv2.THRESH_OTSU)
                text = pytesseract.image_to_string(thr, config=cfg.tesseract_config)
                text = "".join(ch for ch in text if ch.isalnum()).upper()

                if text:
                    state["last_plate"] = text
                    cv2.putText(frame, text, (x1b, y2b+20),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)

    if state["last_plate"]:
        put_centered_text(frame, f"Last plate: {state['last_plate']}",
                          y=40, color=(0,255,0))

    return frame

def run_toll_module():
    print("Running Toll / ANPR module...")
    yolo_track_video(
        cfg.toll_video,
        tracker_cfg=cfg.tracker_cfg,
        show=False,
        save_output_path="/content/toll_output.mp4",
        process_frame_fn=toll_anpr_process_frame
    )

run_toll_module()


Running Toll / ANPR module...
Done: /content/4.mp4 -> /content/toll_output.mp4
