In [11]:
import os

# Experiment setup
def get_next_exp_folder(base_path):
   os.makedirs(base_path, exist_ok=True)
   num = 0
   while True:
       exp_folder = os.path.join(base_path, str(num))
       if not os.path.exists(exp_folder):
           os.makedirs(exp_folder)
           return exp_folder
       num += 1

# Create experiment folder for this run
expt = "results/vehicle_analysis"
exp_folder = get_next_exp_folder(expt)
print(f"Experiment folder: {exp_folder}")

# Subfolders
output_video_dir = os.path.join(exp_folder, "videos")
output_crops_dir = os.path.join(exp_folder, "vehicle_crops")
output_data_dir = os.path.join(exp_folder, "analysis")

os.makedirs(output_video_dir, exist_ok=True)
os.makedirs(output_crops_dir, exist_ok=True)
os.makedirs(output_data_dir, exist_ok=True)

print(f"Output directories created:")
print(f"- Videos: {output_video_dir}")
print(f"- Crops: {output_crops_dir}")
print(f"- Analysis: {output_data_dir}")

Experiment folder: results/vehicle_analysis\9
Output directories created:
- Videos: results/vehicle_analysis\9\videos
- Crops: results/vehicle_analysis\9\vehicle_crops
- Analysis: results/vehicle_analysis\9\analysis


In [12]:
import ultralytics
from ultralytics import YOLO
# from IPython.display import Image
import cv2
from collections import defaultdict
import os
import numpy as np
import easyocr
import os
import re

In [13]:
# Load models
vehicle_model = YOLO('yolo11l.pt')
lp_model = YOLO("runs/detect/train2/weights/best.pt")
color_model = YOLO("results/col/8/color_model/weights/best.pt")
vehicle_classes = vehicle_model.names

In [None]:
# Utility
def crop_vehicle(frame, box):
    x1, y1, x2, y2 = map(int, box)
    return frame[y1:y2, x1:x2]

def detect_color(vehicle_crop):
    if vehicle_crop.size > 0:
        results = color_model(vehicle_crop)
        return results[0].probs.top1
    return "unknown"

def detect_license_plate(vehicle_crop):
    results = lp_model(vehicle_crop, conf=0.60)
    return len(results[0].boxes) > 0 if results[0].boxes is not None else False

def classify_vehicle_size_and_type(bbox, class_idx, frame_width, frame_height):
    """Classify vehicle size and type for cars/trucks/vans"""
    x1, y1, x2, y2 = bbox
    
    # Calculate dimensions and area
    bbox_width = x2 - x1
    bbox_height = y2 - y1
    bbox_area = bbox_width * bbox_height
    
    # Area as percentage of frame
    frame_area = frame_width * frame_height
    area_percentage = (bbox_area / frame_area) * 100
    
    # Size classification
    if area_percentage < 2.0:
        size = "small"
    elif area_percentage < 5.0:
        size = "medium"
    else:
        size = "large"
    
    # Type classification based on YOLO class and dimensions
    aspect_ratio = bbox_width / bbox_height if bbox_height > 0 else 1
    
    if class_idx == 2:  # car
        if size == "large" and aspect_ratio > 1.7:
            vehicle_type = "SUV"
        elif size == "small":
            vehicle_type = "compact_car"
        elif size == "medium":
            vehicle_type = "sedan"
        else:
            vehicle_type = "car"
    elif class_idx == 6:  # truck
        if size == "medium":
            vehicle_type = "pickup_truck"
        else:
            vehicle_type = "truck"
    elif class_idx == 7:  # van
        vehicle_type = "van"
    else:
        vehicle_type = "vehicle"
    
    return size, vehicle_type

In [None]:
# Initializing data structures
vehicle_data = defaultdict(lambda: {
    'class': '',
    'type': '',      # NEW
    'size': '',      # NEW
    'color': '',
    'has_lp': False,
    'crossed': False,
    'lp_text': '',
    'first_seen_frame': 0
})
crossed_ids = set()

In [None]:
# Video setup and processing config
import easyocr
reader = easyocr.Reader(['en'])

# Video setup
cap = cv2.VideoCapture('videos/sam.mp4')
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output_path = os.path.join(output_video_dir, 'tracked_video.mp4')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Red Line config
line_y_red = 1500
x1 = 850
x2 = 2900
thickness = 5

frame_count = 0
frame_limit = 100

print(f"Processing {min(frame_limit, total_frames)} frames...")

Processing 100 frames...


In [None]:
while cap.isOpened() and frame_count < frame_limit:
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_count += 1
    
    # Draw red line
    cv2.line(frame, (x1, line_y_red), (x2, line_y_red), (0, 0, 255), thickness)
   
    # MODEL 1: Vehicle Detection & Tracking
    vehicle_results = vehicle_model.track(frame, persist=True, classes=[2,6,7], verbose=False)
    
    if vehicle_results[0].boxes is not None and len(vehicle_results[0].boxes) > 0:
        boxes = vehicle_results[0].boxes.xyxy.cpu()
        track_ids = vehicle_results[0].boxes.id.int().cpu().tolist() if vehicle_results[0].boxes.id is not None else []
        class_indices = vehicle_results[0].boxes.cls.int().cpu().tolist()
        
        # Process each detected vehicle
        for i, (box, class_idx) in enumerate(zip(boxes, class_indices)):
            if i < len(track_ids):
                track_id = track_ids[i]
                x1_box, y1_box, x2_box, y2_box = map(int, box)
                cy = (y1_box + y2_box) // 2
                
                # Get size and type classification
                size, refined_type = classify_vehicle_size_and_type(
                    [x1_box, y1_box, x2_box, y2_box], class_idx, width, height
                )
                
                # Initialize vehicle data if new
                if track_id not in vehicle_data:
                    vehicle_data[track_id] = {
                        'class': vehicle_classes[class_idx],
                        'type': refined_type,
                        'size': size,
                        'color': 'unknown',
                        'has_lp': False,
                        'crossed': False,
                        'lp_text': ''
                    }
                
                # Check line crossing and analyze vehicle
                if cy > line_y_red and not vehicle_data[track_id]['crossed']:
                    vehicle_data[track_id]['crossed'] = True
                    crossed_ids.add(track_id)
                    
                    # Crop vehicle for analysis
                    padding = 20
                    x1_crop = max(0, x1_box - padding)
                    y1_crop = max(0, y1_box - padding)
                    x2_crop = min(frame.shape[1], x2_box + padding)
                    y2_crop = min(frame.shape[0], y2_box + padding)
                    vehicle_crop = frame[y1_crop:y2_crop, x1_crop:x2_crop]
                    
                    # COLOR DETECTION
                    try:
                        color_results = color_model(vehicle_crop, verbose=False)
                        if hasattr(color_results[0], 'probs') and color_results[0].probs is not None:
                            color_idx = color_results[0].probs.top1
                            vehicle_data[track_id]['color'] = color_model.names[color_idx]
                        else:
                            vehicle_data[track_id]['color'] = 'unknown'
                    except Exception as e:
                        vehicle_data[track_id]['color'] = 'error'
                    
                    # LICENSE PLATE DETECTION + OCR
                    try:
                        lp_results = lp_model(vehicle_crop, conf=0.25, verbose=False)
                        
                        if lp_results[0].boxes is not None and len(lp_results[0].boxes) > 0:
                            vehicle_data[track_id]['has_lp'] = True
                            
                            best_lp_idx = 0
                            if len(lp_results[0].boxes.conf) > 1:
                                best_lp_idx = lp_results[0].boxes.conf.argmax()
                            
                            lp_box = lp_results[0].boxes.xyxy.cpu()[best_lp_idx]
                            lp_x1, lp_y1, lp_x2, lp_y2 = map(int, lp_box)
                            
                            lp_pad = 5
                            lp_x1 = max(0, lp_x1 - lp_pad)
                            lp_y1 = max(0, lp_y1 - lp_pad)
                            lp_x2 = min(vehicle_crop.shape[1], lp_x2 + lp_pad)
                            lp_y2 = min(vehicle_crop.shape[0], lp_y2 + lp_pad)
                            
                            lp_crop = vehicle_crop[lp_y1:lp_y2, lp_x1:lp_x2]
                            
                            if lp_crop.shape[0] > 10 and lp_crop.shape[1] > 10:
                                scale_factor = 3
                                lp_resized = cv2.resize(lp_crop, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_CUBIC)
                                lp_gray = cv2.cvtColor(lp_resized, cv2.COLOR_BGR2GRAY)
                                
                                ocr_results = reader.readtext(lp_gray)
                                lp_text = ""
                                for result in ocr_results:
                                    if result[2] > 0.3:
                                        text = result[1].strip()
                                        text = re.sub(r'[^A-Z0-9]', '', text.upper())
                                        if len(text) >= 2:
                                            lp_text += text + " "
                                
                                vehicle_data[track_id]['lp_text'] = lp_text.strip()
                        else:
                            vehicle_data[track_id]['has_lp'] = False
                            
                    except Exception as e:
                        vehicle_data[track_id]['has_lp'] = False
                    
                    # Save vehicle crop
                    crop_filename = f"vehicle_{track_id}_{vehicle_data[track_id]['size']}_{vehicle_data[track_id]['type']}_{vehicle_data[track_id]['color']}.jpg"
                    cv2.imwrite(os.path.join(output_crops_dir, crop_filename), vehicle_crop)

        # Draw all tracked vehicles
        for i, (box, class_idx) in enumerate(zip(boxes, class_indices)):
            if i < len(track_ids):
                track_id = track_ids[i]
                x1_box, y1_box, x2_box, y2_box = map(int, box)
                cx = (x1_box + x2_box) // 2
                cy = (y1_box + y2_box) // 2
                
                if track_id in vehicle_data:
                    v_data = vehicle_data[track_id]
                    
                    # Enhanced label with size and type
                    label = f"ID:{track_id} {v_data['size']} {v_data['type']} {v_data['color']}"
                    if v_data['has_lp']:
                        if v_data['lp_text']:
                            label += f" [{v_data['lp_text']}]"
                        else:
                            label += " LP✓"
                    
                    # Colors: green for crossed, blue for tracking
                    box_color = (0, 255, 0) if v_data['crossed'] else (255, 0, 0)
                    
                    cv2.circle(frame, (cx, cy), 4, (0, 0, 255), -1)
                    cv2.putText(frame, label, (x1_box, y1_box - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
                    cv2.rectangle(frame, (x1_box, y1_box), (x2_box, y2_box), box_color, 2)

        # Calculate and display statistics
        crossed_data = {k: v for k, v in vehicle_data.items() if v['crossed']}
        type_counts = defaultdict(int)
        size_counts = defaultdict(int)
        lp_count = 0
        
        for v_data in crossed_data.values():
            type_counts[v_data['type']] += 1
            size_counts[v_data['size']] += 1
            if v_data['has_lp']:
                lp_count += 1
        
        # Display on frame
        y_offset = 30
        for type_name, count in type_counts.items():
            cv2.putText(frame, f"{type_name}: {count}", (50, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
            y_offset += 30
        
        cv2.putText(frame, f"With LP: {lp_count}", (50, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 2)
    
    out.write(frame)

cap.release()
out.release()
print(f"Video saved to {output_path}")

Video saved to results/vehicle_analysis\9\videos\tracked_video.mp4
