In [1]:
import json
import datetime
import os
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict

# ============= 1. FILE PATHS =============
USER_HOME = os.path.expanduser("~")
DESKTOP_PATH = os.path.join(USER_HOME, "Desktop")
OUTPUT_FOLDER = os.path.join(DESKTOP_PATH, "heavy_truck_crops")
JSON_OUTPUT_PATH = os.path.join(DESKTOP_PATH, "road_status.json")

# MODEL: YOLO11 Extra Large (Maximum Accuracy)
MODEL_PATH = 'yolo11x.pt' 
VIDEO_PATH = '/Users/mohammadrebh/Desktop/2026-02-10 02.46.45.mp4'

# ============= 2. CONFIGURATION =============
CONFIDENCE_THRESHOLD = 0.2  
LINE_POSITION = 0.5         
IOU_THRESHOLD = 0.5
HEAVY_SIZE_THRESHOLD = 40000 

if not os.path.exists(OUTPUT_FOLDER): os.makedirs(OUTPUT_FOLDER)

# ============= 3. TRAFFIC LOGIC =============
def classify_traffic(total_vehicles, heavy_count, current_hour):
    if 8 <= current_hour < 14:
        if total_vehicles > 45 or heavy_count > 5: return "CONGESTED", "HALT_HEAVY_PERMITS"
        elif total_vehicles > 30 or heavy_count > 2: return "MODERATE", "LIMIT_HEAVY_PERMITS"
        else: return "NORMAL", "ALLOW_PERMITS"
    else:
        if total_vehicles > 35: return "MODERATE", "LIMIT_PERMITS"
        else: return "NORMAL", "ALLOW_PERMITS"

# ============= 4. MAIN EXECUTION =============
def main():
    print(f"ðŸ”„ Loading Best Model: {MODEL_PATH} ...")
    model = YOLO(MODEL_PATH)
    
    # 2=Car, 3=Motorcycle, 5=Bus, 7=Truck
    TARGET_CLASSES = [2, 3, 5, 7]
    
    cap = cv2.VideoCapture(VIDEO_PATH)
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps    = cap.get(cv2.CAP_PROP_FPS)
    
    line_y = int(height * LINE_POSITION)
    
    track_history = defaultdict(lambda: [])
    counts = {"Car": 0, "Motorcycle": 0, "Bus": 0, "Truck": 0, "Heavy Truck": 0}
    crossed_ids = set()
    
    frames_processed = 0
    save_interval = int(fps * 60 * 0.5) 
    
    previous_total = 0
    previous_heavy = 0
    current_status = "CALCULATING..."

    def save_json(note=""):
        nonlocal previous_total, previous_heavy
        current_total = sum(counts.values())
        current_heavy = counts["Heavy Truck"] + counts["Bus"]
        
        interval_total = current_total - previous_total
        interval_heavy = current_heavy - previous_heavy
        
        status, rec = classify_traffic(interval_total, interval_heavy, datetime.datetime.now().hour)
        
        payload = {
            "timestamp": datetime.datetime.now().isoformat(),
            "total_since_start": current_total, # Added useful stat
            "interval_count": interval_total,
            "interval_heavy": interval_heavy,
            "status": status,
            "recommendation": rec,
            "note": note
        }
        
        try:
            if os.path.exists(JSON_OUTPUT_PATH):
                with open(JSON_OUTPUT_PATH, 'r') as f: history = json.load(f)
            else: history = []
        except: history = []
        
        history.append(payload)
        with open(JSON_OUTPUT_PATH, 'w') as f: json.dump(history, f, indent=4)
        
        previous_total = current_total
        previous_heavy = current_heavy
        return status

    print("ðŸš€ STARTED! Press 'ESC' to quit.")

    while cap.isOpened():
        success, frame = cap.read()
        if not success: break
        
        frames_processed += 1
        
        # Track
        results = model.track(frame, persist=True, conf=CONFIDENCE_THRESHOLD, iou=IOU_THRESHOLD, classes=TARGET_CLASSES, verbose=False)
        
        boxes = results[0].boxes
        if boxes.id is not None:
            track_ids = boxes.id.int().cpu().tolist()
            cls_ids = boxes.cls.int().cpu().tolist()
            xyxys = boxes.xyxy.cpu().tolist()
            
            for box, track_id, cls_id in zip(xyxys, track_ids, cls_ids):
                x1, y1, x2, y2 = map(int, box)
                w_box = x2 - x1
                h_box = y2 - y1
                area = w_box * h_box
                current_y = y2 
                
                # Classification Logic
                if cls_id == 7: 
                    if area > HEAVY_SIZE_THRESHOLD:
                        final_class = "Heavy Truck"
                        display_color = (0, 0, 255) 
                    else:
                        final_class = "Truck"
                        display_color = (0, 255, 255) 
                elif cls_id == 5:
                    final_class = "Bus"
                    display_color = (0, 0, 255) 
                elif cls_id == 2:
                    final_class = "Car"
                    display_color = (0, 255, 0)
                elif cls_id == 3:
                    final_class = "Motorcycle"
                    display_color = (0, 255, 0)
                else:
                    final_class = "Unknown"
                    display_color = (255, 255, 255)

                # Count Logic
                track = track_history[track_id]
                track.append(current_y)
                if len(track) > 30: track.pop(0)
                
                if len(track) >= 2:
                    prev_y = track[-2]
                    if prev_y < line_y and current_y > line_y:
                        if track_id not in crossed_ids:
                            crossed_ids.add(track_id)
                            counts[final_class] += 1
                            
                            if final_class == "Heavy Truck":
                                crop = frame[max(0, y1):min(height, y2), max(0, x1):min(width, x2)]
                                cv2.imwrite(f"{OUTPUT_FOLDER}/heavy_{track_id}.jpg", crop)
                                print(f"ðŸ“¸ SAVED HEAVY TRUCK #{track_id}")
                            
                            print(f"âœ… COUNTED: #{track_id} as {final_class}")

                cv2.rectangle(frame, (x1, y1), (x2, y2), display_color, 2)
                cv2.putText(frame, f"#{track_id} {final_class}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, display_color, 2)

        cv2.line(frame, (0, line_y), (width, line_y), (0, 255, 255), 2)
        
        # --- SIDEBAR UI ---
        # 1. Background
        cv2.rectangle(frame, (10, 10), (300, 400), (0, 0, 0), -1)
        
        # 2. Header
        cv2.putText(frame, "TRAFFIC MONITOR", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        cv2.putText(frame, f"STATUS: {current_status}", (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
        
        # 3. SEPARATOR LINE
        cv2.line(frame, (20, 80), (290, 80), (100, 100, 100), 1)

        # 4. TOTAL COUNTER (NEW!)
        total_count = sum(counts.values())
        cv2.putText(frame, f"TOTAL: {total_count}", (20, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)

        # 5. SEPARATOR LINE
        cv2.line(frame, (20, 130), (290, 130), (100, 100, 100), 1)
        
        # 6. Breakdown
        y_pos = 160
        order = ["Heavy Truck", "Bus", "Truck", "Car", "Motorcycle"]
        for key in order:
            count = counts[key]
            if key in ["Heavy Truck", "Bus"]: c = (0, 0, 255)
            elif key == "Truck": c = (0, 255, 255)
            else: c = (0, 255, 0)
            
            cv2.putText(frame, f"{key}: {count}", (20, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.7, c, 2)
            y_pos += 40

        if frames_processed % save_interval == 0:
            current_status = save_json() cv2.imshow("YOLO11 Smart Tracker", frame)
        if cv2.waitKey(1) & 0xFF == 27: break

    cap.release()
    cv2.destroyAllWindows()
    save_json("END")

if __name__ == "__main__":
    main()

ðŸ”„ Loading Best Model: yolo11x.pt ...
ðŸš€ STARTED! Press 'ESC' to quit.
âœ… COUNTED: #1 as Car
âœ… COUNTED: #8 as Car
âœ… COUNTED: #10 as Car
âœ… COUNTED: #14 as Car
âœ… COUNTED: #19 as Car
âœ… COUNTED: #21 as Car
âœ… COUNTED: #22 as Car
âœ… COUNTED: #24 as Car
âœ… COUNTED: #26 as Car
âœ… COUNTED: #33 as Car
âœ… COUNTED: #35 as Car
âœ… COUNTED: #36 as Car
âœ… COUNTED: #37 as Car
âœ… COUNTED: #38 as Car
âœ… COUNTED: #40 as Car
âœ… COUNTED: #42 as Car
âœ… COUNTED: #55 as Car
âœ… COUNTED: #58 as Car
âœ… COUNTED: #60 as Car
âœ… COUNTED: #63 as Car
âœ… COUNTED: #69 as Car
ðŸ“¸ SAVED HEAVY TRUCK #70
âœ… COUNTED: #70 as Heavy Truck
âœ… COUNTED: #80 as Car
âœ… COUNTED: #83 as Car
ðŸ“¸ SAVED HEAVY TRUCK #81
âœ… COUNTED: #81 as Heavy Truck
âœ… COUNTED: #88 as Car
âœ… COUNTED: #89 as Car
âœ… COUNTED: #90 as Car
âœ… COUNTED: #91 as Car
âœ… COUNTED: #92 as Car
âœ… COUNTED: #93 as Car
âœ… COUNTED: #97 as Car
âœ… COUNTED: #98 as Car
âœ… COUNTED: #105 as Car
âœ… COUNTED: #106 as Car
âœ… COUNTED: #1

: 

In [3]:
from ultralytics import YOLO
import cv2

# Load your model
model = YOLO('/Users/mohammadrebh/Desktop/vehicle_classifier/truker_train_run/weights/best.pt')

# Load the video
video_path = '/Users/mohammadrebh/Desktop/2026-02-10 02.46.45.mp4'
cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()

if ret:
    # Run inference with LOW confidence and SHOW logic
    results = model(frame, conf=0.1, imgsz=1280) 
    
    # Show what the model sees natively
    res_plotted = results[0].plot()
    cv2.imshow("DEBUG VIEW", res_plotted)
    print("Press any key to close...")
    cv2.waitKey(0)
    cv2.destroyAllWindows()
else:
    print("Could not read video")


0: 736x1280 1 Auto, 1 Truck, 95.1ms
Speed: 4.7ms preprocess, 95.1ms inference, 0.7ms postprocess per image at shape (1, 3, 736, 1280)
Press any key to close...


In [13]:
from ultralytics import YOLO
import cv2
import os

# PATHS
MODEL_PATH = '/Users/mohammadrebh/Desktop/vehicle_classifier/truker_train_run/weights/best.pt'
VIDEO_PATH = '/Users/mohammadrebh/Desktop/2026-02-10 02.46.45.mp4'

# Load Model
model = YOLO(MODEL_PATH)

# PRINT THE CLASSES YOUR MODEL KNOWS
print("------------------------------------------------")
print(f"ðŸ“‹ YOUR MODEL'S CLASSES: {model.names}")
print("------------------------------------------------")

cap = cv2.VideoCapture(VIDEO_PATH)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break

    # Run inference at DEFAULT size (640) to be safe
    # conf=0.1 means "Show me anything you are even 10% sure about"
    results = model(frame, conf=0.1) 
    
    # Plot results on the frame
    annotated_frame = results[0].plot()

    cv2.imshow("DIAGNOSTIC VIEW - Press 'q' to quit", annotated_frame)
    if cv2.waitKey(1) & 0xFF == ord('q'): break

cap.release()
cv2.destroyAllWindows()

------------------------------------------------
ðŸ“‹ YOUR MODEL'S CLASSES: {0: 'Motorcycle', 1: 'Auto', 2: 'Car', 3: 'Bus', 4: 'LCV', 5: 'Truck', 6: 'Tractor', 7: 'Heavy Truck'}
------------------------------------------------

0: 384x640 1 Car, 1 Truck, 1 Heavy Truck, 35.6ms
Speed: 1.6ms preprocess, 35.6ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Truck, 1 Heavy Truck, 33.4ms
Speed: 1.8ms preprocess, 33.4ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Truck, 1 Heavy Truck, 35.2ms
Speed: 1.5ms preprocess, 35.2ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Truck, 1 Heavy Truck, 33.7ms
Speed: 1.5ms preprocess, 33.7ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Heavy Truck, 63.1ms
Speed: 1.4ms preprocess, 63.1ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Bus, 1 Truck, 1 Heavy Truck, 32.1ms
Speed: 1.2ms preprocess,