In [1]:
!pip -q install ultralytics supervision opencv-python

In [5]:
import cv2
import numpy as np
import pandas as pd
from ultralytics import YOLO
from pathlib import Path
from tqdm.notebook import tqdm
import os

_MODEL_CACHE = {}

def get_cached_model(weights: str) -> YOLO:
    """Load model once and cache it for reuse across videos."""
    if weights not in _MODEL_CACHE:
        _MODEL_CACHE[weights] = YOLO(weights)
    return _MODEL_CACHE[weights]


def vehicle_counting(
    video_file_dir: str,
    video_output: bool = True,
    video_root_dir: str = "",
    show_progress: bool = True,
    model_weights: str = "yolov8x.pt",
    conf_threshold: float = 0.30,
):
    
    VIDEO_FILE_DIR = Path(video_root_dir) / video_file_dir
    if not os.path.exists(VIDEO_FILE_DIR):
        print("File not found")
        print(VIDEO_FILE_DIR)
        return False
    TRACKER_CFG = "bytetrack.yaml"

    CR_X, CR_Y, CR_W, CR_H = 38, 220, 288, 160

    RX_TL, RY_TL =  90, 300 
    RX_TR, RY_TR = 305, 297
    RX_BL, RY_BL =  45, 354
    RX_BR, RY_BR = 300, 353
    LANE_X_THRESHOLD = 200
    FRAMERATE_REDUCTION_FACTOR = 2
    
    ROUNDABOUT_POLY = np.array(
        [[RX_TL, RY_TL], [RX_TR, RY_TR], [RX_BR, RY_BR], [RX_BL, RY_BL]],
        dtype=np.float32
    ).reshape((-1, 1, 2))
    
    CROP_OFFSET = np.array([CR_X, CR_Y, CR_X, CR_Y], dtype=np.float32)
    
    CLASSES_TO_TRACK = [2, 5, 7]  # car, bus, truck
    
    OUTPUT_VIDEO = "annotated_video.mp4"
    
    lane_segment = None
    if video_output:
        poly_flat = ROUNDABOUT_POLY.reshape(-1, 2)
        y_min, y_max = int(poly_flat[:, 1].min()), int(poly_flat[:, 1].max())
        inside_ys = []
        for y in range(y_min, y_max + 1):
            if cv2.pointPolygonTest(ROUNDABOUT_POLY, (float(LANE_X_THRESHOLD), float(y)), False) >= 0:
                inside_ys.append(y)
        if inside_ys:
            lane_segment = ((LANE_X_THRESHOLD, min(inside_ys)), (LANE_X_THRESHOLD, max(inside_ys)))
        
        DRAW_POLY = np.array(
            [[RX_TL, RY_TL], [RX_TR, RY_TR], [RX_BR, RY_BR], [RX_BL, RY_BL]],
            dtype=np.int32
        )
    
    model = get_cached_model(model_weights)
    
    cap = cv2.VideoCapture(str(VIDEO_FILE_DIR))
    if not cap.isOpened():
        return False
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    if not fps or fps <= 0:
        fps = 30.0
    
    output_fps = fps / FRAMERATE_REDUCTION_FACTOR
    if fps % FRAMERATE_REDUCTION_FACTOR != 0:
        cap.release()
        raise ValueError(f"FPS {fps} is not divisible by {FRAMERATE_REDUCTION_FACTOR}")
    
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    fps_inv = 1.0 / fps
    
    writer = None
    if video_output:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, int(output_fps), (w, h))
        if not writer.isOpened():
            cap.release()
            raise Exception("Could not open video writer")
    
    track_state = {}
    counts = {"outer_lane_count": 0, "inner_lane_count": 0, "total_count": 0}
    
    frame_idx = 0
    
    with tqdm(total=total_frames, desc="Processing frames", disable=not show_progress) as pbar:
        while True:
            if frame_idx % FRAMERATE_REDUCTION_FACTOR != 0:
                cap.grab()
                frame_idx += 1
                pbar.update(1)
                continue
            
            ret, frame = cap.read()
            if not ret:
                break
            

            
            roi = frame[CR_Y:CR_Y + CR_H, CR_X:CR_X + CR_W]
            
            results = model.track(
                roi,
                persist=True,
                tracker=TRACKER_CFG,
                verbose=False,
                classes=CLASSES_TO_TRACK,
                conf=conf_threshold,
                imgsz=(CR_W, CR_H)
            )
            r = results[0]
            
            if video_output:
                cv2.polylines(frame, [DRAW_POLY], True, (0, 255, 255), 2)
                cv2.rectangle(frame, (CR_X, CR_Y), (CR_X + CR_W, CR_Y + CR_H), (0, 0, 255), 1)
                
                if lane_segment:
                    (lx1, ly1), (lx2, ly2) = lane_segment
                    cv2.line(frame, (lx1, ly1), (lx2, ly2), (255, 255, 255), 2)
                    cv2.putText(
                        frame, f"x={LANE_X_THRESHOLD}",
                        (lx1 + 5, min(ly1, ly2) - 6),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA
                    )
            
            if r.boxes is not None and len(r.boxes) > 0 and r.boxes.id is not None:
                boxes_xyxy = r.boxes.xyxy.cpu().numpy() + CROP_OFFSET
                clses = r.boxes.cls.cpu().numpy().astype(np.int32)
                ids = r.boxes.id.cpu().numpy().astype(np.int32)
                
                for i in range(len(ids)):
                    track_id = ids[i]
                    x1, y1, x2, y2 = boxes_xyxy[i]
                    brx, bry = float(int(((x1+x2)/2))), float(y2)
                    
                    rb_inside_now = cv2.pointPolygonTest(ROUNDABOUT_POLY, (brx, bry), False) >= 0
                    
                    if track_id not in track_state:
                        track_state[track_id] = {
                            "rb_inside": False,
                            "rb_max_x_inside": -np.inf,
                        }
                    st = track_state[track_id]
                    
                    if not st["rb_inside"] and rb_inside_now:
                        st["rb_inside"] = True
                        st["rb_max_x_inside"] = brx
                    elif st["rb_inside"] and rb_inside_now:
                        if brx > st["rb_max_x_inside"]:
                            st["rb_max_x_inside"] = brx
                    elif st["rb_inside"] and not rb_inside_now:
                        lane_is_inner = st["rb_max_x_inside"] > LANE_X_THRESHOLD
                        
                        if lane_is_inner:
                            counts["inner_lane_count"] += 1
                        else:
                            counts["outer_lane_count"] += 1
                        counts["total_count"] += 1
                        
                        st["rb_inside"] = False
                        st["rb_max_x_inside"] = -np.inf
                    
                    if video_output:
                        cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                        pt_color = (0, 0, 255) if rb_inside_now else (255, 0, 0)
                        cv2.circle(frame, (int(brx), int(bry)), 4, pt_color, -1)
                        
                        label = f"id={track_id} cls={clses[i]}"
                        if rb_inside_now:
                            label += " RB_IN"
                        cv2.putText(
                            frame, label,
                            (int(x1), max(0, int(y1) - 7)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 2, cv2.LINE_AA
                        )
            
            if video_output:
                text = f"Outer={counts['outer_lane_count']} Inner={counts['inner_lane_count']} Total={counts['total_count']}"
                cv2.putText(frame, text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2, cv2.LINE_AA)
                writer.write(frame)
            
            frame_idx += 1
            pbar.update(1)
    
    cap.release()
    if writer:
        writer.release()
    
    return pd.DataFrame([{
        "videos": video_file_dir,
        "outer_lane_count": counts["outer_lane_count"],
        "inner_lane_count": counts["inner_lane_count"],
        "total_count": counts["total_count"],
    }])


In [7]:
result = vehicle_counting(
    "video.mp4",
    model_weights="yolo11x.pt",
    video_output=True,
)
print(result)

Processing frames:   0%|          | 0/1933 [00:00<?, ?it/s]

      videos  outer_lane_count  inner_lane_count  total_count
0  video.mp4                23                27           50
