Considering the computation time and frame rate, so that the new frames are generated only after the older frame has been evaluated.

This process enables real time video generation, and the intermediate frames encountered while the previous one is being evaluated are ditched.

In [1]:
from ultralytics import YOLO
import os
import cv2
import numpy as np
import easyocr
import math
from paddleocr import PaddleOCR
import time

# --------------------------
# Utility Functions
# --------------------------

def ocr_detect(reader, ocr, idx, x_center, y_center, radius, ocr_regions_folder, image, frame_id):
    """
    Detect and OCR the text in a circular region.
    For saving results we use the frame_id instead of an image path.
    """
    x_center, y_center, radius = int(x_center), int(y_center), int(radius)
    # Create a black mask with a white-filled circle.
    mask = np.zeros_like(image, dtype=np.uint8)
    cv2.circle(mask, (x_center, y_center), radius, (255, 255, 255), thickness=-1)
    circular_region = cv2.bitwise_and(image, image, mask=mask[:, :, 0])
    denoised = cv2.GaussianBlur(circular_region, (3, 3), 0)
    
    # Save the circular region for debugging purposes.
    denoised_path = os.path.join(ocr_regions_folder, f"denoised_circle_{idx}_frame_{frame_id}.jpg")
    cv2.imwrite(denoised_path, circular_region)
    print(f"Saved denoised circular region {idx} for frame {frame_id} to: {denoised_path}")

    result = ocr.ocr(denoised, cls=True)
    if result and result[0]:
        try:
            return float(result[0][0][1][0])
        except ValueError:
            print(f"PaddleOCR could not convert the text to float: {result[0][0][1][0]}")

    value = ""
    results = reader.readtext(denoised)
    for (bbox, text, confidence) in results:
        value += text
    try:
        return float(value)
    except ValueError:
        print(f"EasyOCR could not convert the text to float: {value}")
    return None

def rotate_point(point, angle_of_inclination):
    rotation_matrix = np.array([
        [math.cos(angle_of_inclination), -math.sin(angle_of_inclination)],
        [math.sin(angle_of_inclination), math.cos(angle_of_inclination)]
    ])
    return np.dot(rotation_matrix, point)

def stretch_point(point, major_axis_length, minor_axis_length):
    stretch_factor = major_axis_length / minor_axis_length
    return np.array([point[0] * stretch_factor, point[1]])

def get_angle(point1, point2, major_axis_length, minor_axis_length, ellipse_inclination):
    if point1 is None or point2 is None:
        return None
    # Rotate and stretch to compensate for elliptical distortion.
    rotated_point1 = rotate_point(point1, ellipse_inclination)
    rotated_point2 = rotate_point(point2, ellipse_inclination)
    stretched_point1 = stretch_point(rotated_point1, major_axis_length, minor_axis_length)
    stretched_point2 = stretch_point(rotated_point2, major_axis_length, minor_axis_length)

    # Calculate angle from point1 to point2.
    x1, y1 = point1
    x2, y2 = point2
    raw_angle = (math.atan2(y2 - y1, x2 - x1) + 2 * math.pi) % (2 * math.pi)
    ellipse_correction_factor = major_axis_length / minor_axis_length
    corrected_angle = (raw_angle * ellipse_correction_factor) % (2 * math.pi)
    return corrected_angle

def get_values(max_angle, min_angle, pointer_angle):
    """
    Based on the angles from the gauge, determine the range and the measured value's angle.
    """
    if min_angle < max_angle < pointer_angle:
        angle_range = 2 * math.pi - (max_angle - min_angle)
        val_angle = 2 * math.pi - (pointer_angle - min_angle)
        print('case (i) : ')
    elif pointer_angle < min_angle < max_angle:
        angle_range = 2 * math.pi - (max_angle - min_angle)
        val_angle = min_angle - pointer_angle
        print('case (ii) : ')
    elif max_angle < pointer_angle < min_angle:
        angle_range = min_angle - max_angle
        val_angle = min_angle - pointer_angle
        print('case (iii) : ')
    elif min_angle < pointer_angle < max_angle:
        angle_range = max_angle - min_angle
        val_angle = pointer_angle - min_angle
        print('case (iv) : ')
    elif pointer_angle < max_angle < min_angle:
        angle_range = 2 * math.pi - (min_angle - max_angle)
        val_angle = 2 * math.pi - (min_angle - pointer_angle)
        print('case (v) : ')
    elif max_angle < min_angle < pointer_angle:
        angle_range = 2 * math.pi - (min_angle - max_angle)
        val_angle = pointer_angle - min_angle
        print('case (vi) : ')
    else:
        angle_range = None
        val_angle = None
        print('case (vii) : ')
    
    print(f'angle_range = {math.degrees(angle_range) if angle_range is not None else None}')
    print(f'val_angle = {math.degrees(val_angle) if val_angle is not None else None}')
    return angle_range, val_angle

# --------------------------
# Main Frame Processing Function
# --------------------------

def process_frame(frame, frame_id, segmentation_model, detection_model, reader, ocr, ocr_regions_folder):
    height, width = frame.shape[:2]
    major_axis_length = None
    minor_axis_length = None

    # Run segmentation on the current frame.
    segmentation_results = segmentation_model(frame)
    segmented_image = frame.copy()

    if segmentation_results[0].masks is not None:
        masks = segmentation_results[0].masks.data.cpu().numpy()
        class_ids = segmentation_results[0].boxes.cls.cpu().numpy()
        clock_class_id = 74  # Adjust as needed.
        clock_mask = None
        for i in range(len(class_ids)):
            if int(class_ids[i]) == clock_class_id:
                clock_mask = masks[i]
                break
        if clock_mask is not None:
            clock_mask = (clock_mask * 255).astype(np.uint8)
            mask_resized = cv2.resize(clock_mask, (width, height), interpolation=cv2.INTER_LINEAR)
            kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
            cleaned_mask = cv2.morphologyEx(mask_resized, cv2.MORPH_CLOSE, kernel)
            cleaned_mask = cv2.morphologyEx(cleaned_mask, cv2.MORPH_OPEN, kernel)
            contours, _ = cv2.findContours(cleaned_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            valid_contours = [c for c in contours if cv2.contourArea(c) > 500]
            if valid_contours:
                largest_contour = max(valid_contours, key=cv2.contourArea)
                ellipse = cv2.fitEllipse(largest_contour)
                center, axes, angle = ellipse
                major_axis_length = max(axes)
                minor_axis_length = min(axes)
                cv2.ellipse(segmented_image, ellipse, (0, 255, 0), 2)
                results = detection_model(segmented_image)
            else:
                print(f"No valid contours found in frame {frame_id}")
                results = detection_model(segmented_image)
        else:
            print(f"No 'clock' mask found in frame {frame_id}")
            results = detection_model(frame)
    else:
        print(f"No segmentation results for frame {frame_id}")
        results = detection_model(frame)

    # Initialize final_value before processing detection results.
    final_value = None

    if results is None:
        return frame, final_value

    for result in results:
        boxes = result.boxes.xyxy.cpu().numpy()
        confs = result.boxes.conf.cpu().numpy()
        labels = result.boxes.cls.cpu().numpy()
        label_centers = {}
        for i in range(len(labels)):
            label = int(labels[i])
            center_x = (boxes[i][0] + boxes[i][2]) / 2
            center_y = (boxes[i][1] + boxes[i][3]) / 2
            confidence = confs[i]
            if (label not in label_centers) or (confidence > label_centers[label]['confidence']):
                label_centers[label] = {'center': (center_x, center_y), 'confidence': confidence}
        for data in label_centers.values():
            cv2.circle(frame, (int(data['center'][0]), int(data['center'][1])), radius=5, color=(0, 255, 0), thickness=-1)

        pointer_base_1 = label_centers.get(0, {}).get('center', None)
        maximum_1 = label_centers.get(1, {}).get('center', None)
        minimum_1 = label_centers.get(2, {}).get('center', None)
        pointer_tip_1 = label_centers.get(3, {}).get('center', None)

        if pointer_base_1 is None or pointer_tip_1 is None or maximum_1 is None or minimum_1 is None:
            continue

        pointer_base = [pointer_base_1[0], height - pointer_base_1[1]]
        pointer_tip = [pointer_tip_1[0], height - pointer_tip_1[1]]
        maximum = [maximum_1[0], height - maximum_1[1]]
        minimum = [minimum_1[0], height - minimum_1[1]]

        cv2.circle(frame, (int(minimum_1[0]), int(minimum_1[1])), 
                   radius=int(major_axis_length/10) if major_axis_length else 5, color=(0, 0, 255), thickness=2)
        min_val = ocr_detect(reader, ocr, 0, minimum_1[0], minimum_1[1],
                             major_axis_length/9 if major_axis_length else 10, ocr_regions_folder, frame, frame_id)
        print(f"Frame {frame_id} - Minimum Value = {min_val}")

        cv2.circle(frame, (int(maximum_1[0]), int(maximum_1[1])), 
                   radius=int(major_axis_length/10) if major_axis_length else 5, color=(0, 0, 255), thickness=2)
        max_val = ocr_detect(reader, ocr, 1, maximum_1[0], maximum_1[1],
                             major_axis_length/9 if major_axis_length else 10, ocr_regions_folder, frame, frame_id)
        print(f"Frame {frame_id} - Maximum Value = {max_val}")

        ellipse_inclination = angle if 'angle' in locals() else 0

        min_angle = get_angle(pointer_base, minimum, 
                              major_axis_length if major_axis_length else 1, 
                              minor_axis_length if minor_axis_length else 1, ellipse_inclination)
        max_angle = get_angle(pointer_base, maximum, 
                              major_axis_length if major_axis_length else 1, 
                              minor_axis_length if minor_axis_length else 1, ellipse_inclination)
        pointer_angle = get_angle(pointer_base, pointer_tip, 
                                  major_axis_length if major_axis_length else 1, 
                                  minor_axis_length if minor_axis_length else 1, ellipse_inclination)
        if None not in (min_angle, max_angle, pointer_angle) and (max_val is not None) and (min_val is not None):
            print(f"Frame {frame_id} - min_angle = {math.degrees(min_angle)}")
            print(f"Frame {frame_id} - max_angle = {math.degrees(max_angle)}")
            print(f"Frame {frame_id} - pointer_angle = {math.degrees(pointer_angle)}")
            angle_range, val_angle = get_values(max_angle, min_angle, pointer_angle)
            if angle_range is not None and val_angle is not None:
                final_value = ((max_val - min_val) * val_angle) / angle_range
                print(f"Frame {frame_id} - Final Gauge Reading = {final_value}")
                # cv2.putText(frame, f"Gauge: {final_value:.2f}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

        # --------------------------
        # Add Text Box at Bottom Right
        # --------------------------
        text = f"Detected Reading : {final_value:.2f}" if final_value is not None else "Detected Reading : N/A"
        (text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
        margin = 10
        x_box = frame.shape[1] - text_width - margin
        y_box = frame.shape[0] - margin
        cv2.rectangle(frame, (x_box - 5, y_box - text_height - 5), (x_box + text_width + 5, y_box + 5), (0, 0, 0), thickness=-1)
        cv2.putText(frame, text, (x_box, y_box), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
        # --------------------------

    return frame, final_value

# --------------------------
# Main Video Processing Loop
# --------------------------

def main():
    # Load your YOLO models.
    segmentation_model = YOLO("yolo11n-seg.pt")
    detection_model = YOLO("C:\\Users\\Harsh\\Desktop\\PTZ Camera Zoom with Gauge Detection\\pretrained_models\\4\\last.pt")
    
    # Define output folders for results and OCR regions.
    global output_folder
    output_folder = "C:\\Users\\Harsh\\Desktop\\PTZ Camera Zoom with Gauge Detection\\result_frames_video"
    os.makedirs(output_folder, exist_ok=True)
    ocr_regions_folder = "C:\\Users\\Harsh\\Desktop\\PTZ Camera Zoom with Gauge Detection\\ocr_regions_video"
    os.makedirs(ocr_regions_folder, exist_ok=True)

    # Initialize OCR readers.
    reader = easyocr.Reader(['en'], gpu=True)
    ocr = PaddleOCR(use_angle_cls=True, lang='en', rec=True, device='cpu')
    
    # Open a video file or webcam stream (use 0 for webcam).
    cap = cv2.VideoCapture("C:\\Users\\Harsh\\Desktop\\PTZ Camera Zoom with Gauge Detection\\test_videos\\195199132_main_xxl.mp4")
    # cap = cv2.VideoCapture(0)
    

    # Set up video writer using the same properties as the input video.
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter("C:\\Users\\Harsh\\Desktop\\PTZ Camera Zoom with Gauge Detection\\result_frames_video\\final_output_2.mp4", 
                        fourcc, fps, (width, height))
    if not out.isOpened():
        print("Error: VideoWriter not opened!")

    frame_id = 0
    last_frame_time = 0  # Keep track of the timestamp of the last processed frame.
    while True:
        start_time = time.time()
        
        ret, frame = cap.read()
        if not ret:
            break
        frame_id += 1

        # Process the frame.
        processed_frame, gauge_value = process_frame(frame, frame_id, segmentation_model, detection_model, reader, ocr, ocr_regions_folder)
        
        # Display the processed frame.
        cv2.imshow("Gauge Reading", processed_frame)
        
        # Calculate elapsed processing time and determine frames to skip.
        processing_time = time.time() - start_time
        frames_to_skip = int(processing_time * fps)
        
        # Instead of just skipping, duplicate the processed frame for the number of skipped frames + 1.
        duplicate_count = frames_to_skip + 1
        for _ in range(duplicate_count):
            out.write(processed_frame)
        
        # Skip frames without processing.
        for _ in range(frames_to_skip):
            cap.grab()  # Grab without decoding.
            
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()


[2025/06/06 12:56:15] ppocr DEBUG: Namespace(help='==SUPPRESS==', use_gpu=False, use_xpu=False, use_npu=False, use_mlu=False, ir_optim=True, use_tensorrt=False, min_subgraph_size=15, precision='fp32', gpu_mem=500, gpu_id=0, image_dir=None, page_num=0, det_algorithm='DB', det_model_dir='C:\\Users\\Harsh/.paddleocr/whl\\det\\en\\en_PP-OCRv3_det_infer', det_limit_side_len=960, det_limit_type='max', det_box_type='quad', det_db_thresh=0.3, det_db_box_thresh=0.6, det_db_unclip_ratio=1.5, max_batch_size=10, use_dilation=False, det_db_score_mode='fast', det_east_score_thresh=0.8, det_east_cover_thresh=0.1, det_east_nms_thresh=0.2, det_sast_score_thresh=0.5, det_sast_nms_thresh=0.2, det_pse_thresh=0, det_pse_box_thresh=0.85, det_pse_min_area=16, det_pse_scale=1, scales=[8, 16, 32], alpha=1.0, beta=1.0, fourier_degree=5, rec_algorithm='SVTR_LCNet', rec_model_dir='C:\\Users\\Harsh/.paddleocr/whl\\rec\\en\\en_PP-OCRv4_rec_infer', rec_image_inverse=True, rec_image_shape='3, 48, 320', rec_batch_num=