In [None]:
import json
import logging
import subprocess
from typing import Optional

import cv2
import easyocr
import numpy as np
import pandas as pd
from tqdm import tqdm
from ultralytics import YOLO

In [None]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

In [8]:
def get_video_rotation(video_path):
    cmd = [
        "ffprobe",
        "-v",
        "error",
        "-select_streams",
        "v:0",
        "-show_entries",
        "stream_tags=rotate:stream_side_data=rotation",
        "-of",
        "json",
        video_path,
    ]
    result = subprocess.run(
        cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
    )
    info = json.loads(result.stdout)

    # First, try the rotate tag (classic)
    try:
        return int(info["streams"][0]["tags"]["rotate"])
    except (KeyError, ValueError):
        pass

    # Then, check side_data_list
    try:
        side_data = info["streams"][0].get("side_data_list", [])
        for item in side_data:
            if "rotation" in item:
                return int(item["rotation"])
    except (KeyError, ValueError):
        pass

    return 0  # default: no rotation

In [21]:
def rotate_frame(frame, rotation) -> np.ndarray:
    if rotation == 90:
        frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
    elif rotation == -90:
        frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
    elif rotation == 180:
        frame = cv2.rotate(frame, cv2.ROTATE_180)

    return frame


def time_to_seconds(timestr: str) -> int:
    parts = timestr.split(":")
    parts = [int(p) for p in parts]
    if len(parts) == 2:
        minutes, seconds = parts
        return minutes * 60 + seconds
    elif len(parts) == 3:
        hours, minutes, seconds = parts
        return hours * 3600 + minutes * 60 + seconds
    else:
        raise ValueError(f"Invalid time format: {timestr}")


def get_start_and_end_frames(
    segment_time_range: tuple[str, str], start_frame: int, end_frame: int, fps: float
) -> tuple[int, int]:
    start_sec, end_sec = map(time_to_seconds, segment_time_range)
    start_sec = max(start_sec - 1, start_frame)
    end_sec = min(end_sec + 1, end_frame)
    start_frame = int(start_sec * fps)
    end_frame = int(end_sec * fps)

    return start_frame, end_frame


def extract_frames(
    video_path: str,
    target_fps: int = 5,
    segment_time_range: Optional[tuple[str, str]] = None,
):
    # Get video rotation
    rotation = get_video_rotation(video_path)

    # Load video
    cap = cv2.VideoCapture(video_path)

    fps = cap.get(cv2.CAP_PROP_FPS)
    skip = int(fps // target_fps) if fps and fps > 0 else 1
    end_frame = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    start_frame = 0

    # Limit to segment
    if segment_time_range is not None:
        start_frame, end_frame = get_start_and_end_frames(
            segment_time_range, start_frame, end_frame, fps
        )

    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

    frames = []
    while (
        cap.isOpened()
        and (frame_idx := int(cap.get(cv2.CAP_PROP_POS_FRAMES))) < end_frame
    ):
        ret, frame = cap.read()

        if not ret:
            break

        if frame_idx % skip == 0:
            frame = rotate_frame(frame, rotation)
            frames.append(frame)

    cap.release()

    return frames


def check_that_airplane_is_in_frame(
    cropped_frame: np.ndarray, box_aspect_ratio_min: float = 2.0
) -> bool:
    # If the bounding box's aspect ratio is too small, the airplane is probably largely off-screen
    h, w, _ = cropped_frame.shape
    box_aspect_ratio = w / h

    logger.debug(
        f"[check_that_airplane_is_in_frame] Bounding box aspect ratio={box_aspect_ratio:.2f} (min={box_aspect_ratio_min:.2f})"
    )

    return False if box_aspect_ratio < box_aspect_ratio_min else True


def run_object_detection(
    model: YOLO,
    frames: list[np.ndarray],
    confidence_threshold: float,
) -> list[np.ndarray]:
    cropped_frames = []
    results = model(frames, verbose=False)

    logger.debug(f"[run_object_detection] Total frames to process: {len(frames)}.")

    for i, (frame, result) in enumerate(zip(frames, results), start=1):
        logger.debug(f"[run_object_detection] Frame {i}/{len(frames)}.")
        if len(result.boxes) == 0:
            logger.debug(
                "[run_object_detection] \t❌ Frame does not contain any object."
            )
            continue

        for box in result.boxes:
            object_type, conf = int(box.cls[0]), float(box.conf[0])
            label = model.names[object_type]
            logger.debug(
                f"[run_object_detection] \tObject type: {label}, confidence: {conf:.2f}"
            )

            if object_type != 4:
                logger.debug(
                    "[run_object_detection] \t❌ Frame does not contain an airplane."
                )
                continue

            if conf < confidence_threshold:
                logger.debug(
                    "[run_object_detection] \t❌ The confidence of the object being in airplane is too low."
                )
                continue

            x1, y1, x2, y2 = tuple(map(int, box.xyxy[0]))
            crop = frame[y1:y2, x1:x2]

            airplane_in_frame = check_that_airplane_is_in_frame(
                crop, box_aspect_ratio_min=2.0
            )

            logger.debug(
                f"[run_object_detection] \tAirplane in frame: {airplane_in_frame}."
            )
            if not airplane_in_frame:
                continue

            logger.debug("[run_object_detection] \t✅ Frame contains an airplane.")
            cropped_frames.append(crop)

    logger.debug(f"[run_object_detection] Total frames found: {len(cropped_frames)}.")

    return cropped_frames


def check_registration_rules(registration: str) -> bool:
    # Check
    # Nikdy imatrikulace nemůže začínat na OR, OA, CK, AE, @,
    # (NIKDY nezačíná žádným znakem, vždy budto písmenem nebo číslicí)
    # → tudíž vsechny výsledky začínající temito „prefixy" jsou automaticky chybné
    disallowed_prefixes = ["OR", "OA", "CK", "AE"]
    return True


def run_ocr(
    ocr_reader: easyocr.Reader,
    frames: list[np.ndarray],
) -> dict[str, list[float]]:
    detected_registrations = {}
    for i, frame in enumerate(frames):
        ocr_results = ocr_reader.readtext(
            frame, allowlist="ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789- "
        )
        for _, text, text_conf in ocr_results:
            cleaned = text.strip().upper()
            text_conf = float(text_conf)

            if cleaned in detected_registrations:
                detected_registrations[cleaned].append(text_conf)
            else:
                detected_registrations[cleaned] = [text_conf]

    return detected_registrations

In [17]:
logger.setLevel(logging.INFO)

In [4]:
PLANE_CONFIDENCE_THRESHOLD = 0.5
TARGET_FPS = 5

model = YOLO("yolov8m.pt", verbose=False)
ocr_reader = easyocr.Reader(["en"])

In [5]:
df = pd.read_csv("dataset.csv")

# Drop duplicates for easier segment handling
df = df.drop_duplicates(subset=["Video file"], keep="first")

# Drop rows without registration
df = df.dropna(subset=["Registration"])

df[["Prediction", "Prediction conf"]] = [None, None]

In [22]:
predictions = []
for idx, row in tqdm(df.iterrows(), total=len(df)):
    video_path = f"data/{row['Video file']}"
    segment_time_range = row["Segment start"], row["Segment end"]

    frames = extract_frames(
        video_path, target_fps=TARGET_FPS, segment_time_range=segment_time_range
    )

    cropped_frames = run_object_detection(
        model, frames, confidence_threshold=PLANE_CONFIDENCE_THRESHOLD
    )

    detected_registrations = run_ocr(ocr_reader, cropped_frames)
    best_registration, best_registration_conf = None, 0.0
    for registration, confs in detected_registrations.items():
        if max(confs) > best_registration_conf:
            best_registration = registration
            best_registration_conf = max(confs)

    predictions.append((best_registration, best_registration_conf))

df[["Prediction", "Prediction conf"]] = predictions

100%|██████████| 21/21 [04:33<00:00, 13.04s/it]


In [23]:
df

Unnamed: 0,Video file,Usable,Registration,Segment start,Segment end,Comment,Prediction,Prediction conf
0,IMG_3353.MOV,Yes,OK-LTY,00:02,00:02,,K-LIK,0.606511
1,IMG_3354.MOV,Yes,OK-BIT,00:15,00:17,,OK-BIT,0.862148
2,IMG_3355.MOV,Yes,OK-BIT,00:01,00:02,,LK-BIT,0.910789
3,IMG_3358.MOV,Yes,OK-LTY,00:02,00:03,,OK-LIY,0.742001
4,IMG_3359.MOV,Yes,OK-BIT,00:20,00:23,,QB,0.472316
5,IMG_3360.MOV,Yes,OK-FAH,00:12,00:16,,ON,0.64382
6,IMG_3363.MOV,Yes,OK-LTY,00:07,00:12,,L,0.452049
7,IMG_3367.MOV,Yes,OK-BIT,00:01,00:03,Úplně perfektní,OK-BIT,0.997488
8,IMG_3368.MOV,Yes,OK-BIT,00:03,00:05,,OK-BIT,0.998075
9,IMG_3370.MOV,Yes,OK-COK,00:10,00:12,,AERO,0.997


In [None]:
cap = cv2.VideoCapture("data/IMG_3353.MOV")
rotation = get_video_rotation("data/IMG_3353.MOV")

# Process every n-th frame
frame_skip = 20
frame_count = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % frame_skip != 0:
        continue  # skip this frame without processing

    if rotation == 90:
        frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
    elif rotation == -90:
        frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
    elif rotation == 180:
        frame = cv2.rotate(frame, cv2.ROTATE_180)

    results = model(frame)
    for result in results:
        for x1, y1, x2, y2 in map(lambda b: map(int, b), result.boxes.xyxy):
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

    cv2.imshow("Detection", frame)
    if cv2.waitKey(1) == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

Python(63139) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.



0: 640x384 (no detections), 451.4ms
Speed: 23.0ms preprocess, 451.4ms inference, 8.4ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 (no detections), 588.8ms
Speed: 2.3ms preprocess, 588.8ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 (no detections), 789.0ms
Speed: 2.1ms preprocess, 789.0ms inference, 1.8ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 (no detections), 434.9ms
Speed: 1.5ms preprocess, 434.9ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 (no detections), 456.3ms
Speed: 3.2ms preprocess, 456.3ms inference, 3.1ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 (no detections), 358.9ms
Speed: 1.6ms preprocess, 358.9ms inference, 1.1ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 airplane, 270.7ms
Speed: 2.6ms preprocess, 270.7ms inference, 17.6ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 airplane, 252.9ms
Speed: 1.3ms preprocess, 2

: 

In [3]:
get_video_rotation("data/IMG_3353.MOV")

-90