In [1]:
from ultralytics import YOLO
import cv2
import numpy as np
from datetime import datetime
from heapq import heappush, heappushpop
from collections import Counter
from typing import List, Tuple, Optional
import sys
sys.path.append('../')

In [2]:
from src.OCR_model import ocr_model

In [None]:
global plate_results

In [4]:
# Load the YOLO model
MODEL_PATH = r'../models/PlateYOLO.pt'
model = YOLO(MODEL_PATH)

In [None]:
class PlateDetector:
    def __init__(self, conf_threshold: float = 0.85, cooldown_frames: int = 1, top_k: int = 5):
        """
        Initialize the PlateDetector.
        Args:
            conf_threshold: Minimum confidence score to consider a detection
            cooldown_frames: Number of frames to wait between captures
            top_k: Number of best frames to maintain
        """
        self.conf_threshold = conf_threshold
        self.cooldown_frames = cooldown_frames
        self.current_cooldown = 60
        self.top_k = top_k
        self.top_detections = []  # Store (confidence, processed_plate)
    
    def _process_plate_region(self, plate_region: np.ndarray) -> Optional[np.ndarray]:
        """Process the plate region with preprocessing steps."""
        if plate_region.size == 0:
            return None
            
        plate_gray = cv2.cvtColor(plate_region, cv2.COLOR_BGR2GRAY)
        _, plate_thresh = cv2.threshold(plate_gray, 0, 255, 
                                      cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        return plate_thresh
    
    def process_frame(self, frame: np.ndarray) -> np.ndarray:
        """Process a single frame and update top detections."""
        plate_results = model(frame)
        processed_frame = frame.copy()
        
        # Decrease cooldown counter
        if self.current_cooldown > 0:
            self.current_cooldown -= 1
        
        for result in plate_results:
            boxes = result.boxes
            for box in boxes:
                if box.cls == 0:  # License plate class
                    conf = float(box.conf[0])
                    x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
                    
                    # Draw rectangle
                    color = (0, 255, 0) if conf >= self.conf_threshold else (0, 165, 255)
                    cv2.rectangle(processed_frame, (x1, y1), (x2, y2), color, 2)
                    cv2.putText(processed_frame, f'Conf: {conf:.2f}', (x1, y1-10), 
                              cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                    
                    # If confidence meets threshold and not in cooldown
                    if conf >= self.conf_threshold and self.current_cooldown == 0:
                        plate_region = frame[y1:y2, x1:x2]
                        processed_plate = self._process_plate_region(plate_region)
                        
                        if processed_plate is not None:
                            # Use negative confidence for max heap
                            if len(self.top_detections) < self.top_k:
                                heappush(self.top_detections, (-conf, processed_plate))
                            else:
                                heappushpop(self.top_detections, (-conf, processed_plate))
                            
                            self.current_cooldown = self.cooldown_frames
        
        # Add info to frame
        cv2.putText(processed_frame, f'Threshold: {self.conf_threshold}', 
                    (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        cv2.putText(processed_frame, f'Top Detections: {len(self.top_detections)}/{self.top_k}', 
                    (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        
        return processed_frame
    
    def get_top_plates(self) -> List[Tuple[float, np.ndarray]]:
        """Return the top k plates with their confidence scores."""
        return [(-(conf), plate) for conf, plate in sorted(self.top_detections)]
    
    def clear_detections(self):
        """Clear stored detections from memory."""
        self.top_detections = []

def process_video(video_path: str):
    """Process video and return the most common OCR result from top plates."""
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video: {video_path}")
        return None
    
    detector = PlateDetector(conf_threshold=0.85)
    print(f"Starting processing... Press 'q' to stop")
    
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            processed_frame = detector.process_frame(frame)
            cv2.imshow('License Plate Detection', processed_frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    
    finally:
        cap.release()
        cv2.destroyAllWindows()
    
    # Get top 5 plates and send to OCR
    top_plates = detector.get_top_plates()
    if top_plates:
        # Import OCR model here to avoid circular imports
        
        # Process each plate through OCR
        ocr_results = []
        for conf, plate in top_plates:
            try:
                result = ocr_model(plate,plate_results)
                if result:  # Assuming OCR returns None/empty for failed reads
                    ocr_results.append(result)
                print(f"Plate with confidence {conf:.2f} -> OCR result: {result}")
            except Exception as e:
                print(f"OCR Error: {e}")
        
        # Get the most common OCR result
        if ocr_results:
            final_result = Counter(ocr_results).most_common(1)[0][0]
            print(f"\nFinal plate number: {final_result}")
            
            # Clear detections from memory
            detector.clear_detections()
            return final_result
    
    detector.clear_detections()
    return None

if __name__ == "__main__":
    video_path = '../data/car2.mp4'
    result = process_video(video_path)

Starting processing... Press 'q' to stop

0: 480x640 1 0, 222.0ms
Speed: 3.4ms preprocess, 222.0ms inference, 1.6ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 0, 220.5ms
Speed: 5.6ms preprocess, 220.5ms inference, 1.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 0, 198.4ms
Speed: 3.1ms preprocess, 198.4ms inference, 1.2ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 0, 218.4ms
Speed: 3.3ms preprocess, 218.4ms inference, 1.4ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 0, 218.2ms
Speed: 3.1ms preprocess, 218.2ms inference, 1.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 0, 201.9ms
Speed: 3.7ms preprocess, 201.9ms inference, 1.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 0, 188.7ms
Speed: 3.2ms preprocess, 188.7ms inference, 1.2ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 0, 194.3ms
Speed: 3.3ms preprocess, 194.3ms inference, 1.2ms postprocess per image a