In [None]:
PATH_TO_VIDEO = "/home/ezaborshchikov/hse_mc/Masters_tethis/videos/VID_20250415_181952.mp4"
PATH_TO_MODEL = "/home/ezaborshchikov/hse_mc/Masters_tethis/models/yolo11n.pt"

output_path = "/home/ezaborshchikov/hse_mc/Masters_tethis/videos/output_results.mp4"

In [None]:
import cv2
from ultralytics import YOLO
from collections import defaultdict

# Initialize models
car_model = YOLO(PATH_TO_MODEL)  # For car detection
# brand_model = YOLO('models/best_brands.pt')  # Custom model for car brand recognition
# plate_model = YOLO('models/best_plates.pt')  # Custom model for license plate recognition

# Storage for tracked vehicles data
track_history = defaultdict(lambda: {
    'brand': None,
    'plate': None,
    'processed': False,
    'last_box': None
})

def detect_cars(frame):
    """Detect cars in frame with tracking"""
    results = car_model.track(frame, persist=True, tracker="bytetrack.yaml", classes=[2, 3, 5, 7])  # Classes: car, motorcycle, bus, truck
    return results[0] if results else None

def recognize_brand(car_img):
    """Recognize car brand from cropped image"""
    # brands = brand_model(car_img)
    # return brands[0].probs.top1 if brands else None
    return None

def recognize_plate(car_img):
    """Recognize license plate from cropped image"""
    # plates = plate_model(car_img)
    # return plates[0].boxes[0].cls if plates else None
    return None

def process_frame(frame, frame_count):
    """Process each frame with detection and recognition"""
    annotated_frame = frame.copy()
    
    # Skip every other frame for performance (process at 50% of original FPS)
    if frame_count % 2 == 0:
        car_results = detect_cars(frame)
        
        if car_results and car_results.boxes:
            boxes = car_results.boxes.xyxy.cpu().numpy()
            track_ids = car_results.boxes.id.int().cpu().numpy() if car_results.boxes.id is not None else []
            
            for idx, box in enumerate(boxes):
                x1, y1, x2, y2 = map(int, box)
                car_img = frame[y1:y2, x1:x2]
                
                track_id = track_ids[idx] if idx < len(track_ids) else None
                
                if track_id:
                    # Store last known position for skipped frames
                    track_history[track_id]['last_box'] = (x1, y1, x2, y2)
                    
                    # Only process new vehicles (not yet recognized)
                    if not track_history[track_id]['processed']:
                        brand = recognize_brand(car_img)
                        plate = recognize_plate(car_img)
                        
                        track_history[track_id]['brand'] = brand
                        track_history[track_id]['plate'] = plate
                        track_history[track_id]['processed'] = True
    
    # Display information for all tracked vehicles (including skipped frames)
    for track_id, data in track_history.items():
        if data['last_box']:
            x1, y1, x2, y2 = data['last_box']
            
            # Prepare display information
            info = ""
            # if data['brand'] is not None:
            #     info += f"Brand: {brand_model.names[data['brand']]} "
            # if data['plate'] is not None:
            #     info += f"Plate: {plate_model.names[data['plate']]}"
            
            # Draw bounding box and information
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(annotated_frame, info, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)
    
    return annotated_frame

# Video processing setup
cap = cv2.VideoCapture(PATH_TO_VIDEO)

# Get video properties for output
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
original_fps = cap.get(cv2.CAP_PROP_FPS)

# Video writer setup (output at half the original FPS since we're skipping frames)

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, original_fps/2, (frame_width, frame_height))

# Main processing loop
frame_count = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    processed_frame = process_frame(frame, frame_count)
    
    # Write processed frame to output
    out.write(processed_frame)
    
    # # Display processing in real-time (optional)
    # cv2.imshow('Car Recognition', processed_frame)
    # if cv2.waitKey(1) & 0xFF == ord('q'):
    #     break
    
    frame_count += 1

# Cleanup
cap.release()
out.release()
cv2.destroyAllWindows()

print(f"Processing complete. Results saved to {output_path}")


0: 384x640 10 cars, 53.7ms
Speed: 2.3ms preprocess, 53.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 46.9ms
Speed: 2.3ms preprocess, 46.9ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 cars, 36.5ms
Speed: 19.0ms preprocess, 36.5ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 45.5ms
Speed: 1.5ms preprocess, 45.5ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 49.6ms
Speed: 5.1ms preprocess, 49.6ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 41.1ms
Speed: 7.1ms preprocess, 41.1ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 55.4ms
Speed: 2.6ms preprocess, 55.4ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 44.4ms
Speed: 1.8ms preprocess, 44.4ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

