In [None]:
from pathlib import Path
from typing import Generator

import cv2
import numpy as np
import pandas as pd
import pytesseract
from PIL import Image
from tqdm.notebook import tqdm

VIDEO_PATH = Path("videos") / Path("mk27.webm")
RAW_DATA_PATH = Path("data") / Path("raw")

In [None]:
RESUME_FROM = 0
DEBUG_LIMIT = None
FRAME_INTERVAL = 15
TEXT_INTERVAL = 120
FPS = 60


class MyProcessor:
    def __init__(self):
        self.track_battle_start: int | None = None
        self.caption_box_buffer: list[tuple[int, Image.Image]] = []

        self.battles: list[tuple[int, int]] = []
        self.captions: list[list[str]] = []

    @staticmethod
    def get_caption(caption_box: Image.Image):
        caption_data: pd.DataFrame = pytesseract.image_to_data(
            caption_box, output_type=pytesseract.Output.DATAFRAME
        )
        caption_data = caption_data[caption_data["conf"] > 30].sort_values(
            ["word_num", "line_num"]
        )
        lines_data = caption_data.groupby("line_num")["text"].apply(list)
        caption_lines = [" ".join(line) for line in lines_data.to_list()]
        return caption_lines

    def process_caption(self):
        current_frame_num, current_box = self.caption_box_buffer[-1]
        if len(self.captions) > 0:
            _, prev_caption = self.captions[-1]
        else:
            prev_caption = None
        current_caption = MyProcessor.get_caption(current_box)

        if prev_caption != current_caption:
            print(prev_caption, current_caption)
            # something changed, process more detailed
            for frame_num, caption_box in self.caption_box_buffer[:-1]:
                caption = MyProcessor.get_caption(caption_box)
                self.captions.append((frame_num, caption))
        self.captions.append((current_frame_num, current_caption))

    def process_frame(self, frame: cv2.typing.MatLike, frame_num: int):
        if frame_num % FRAME_INTERVAL != 0:
            return

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image = Image.fromarray(frame_rgb)

        width, height = image.size

        # dimensions manually determined
        # still cuts off some longer text...
        caption_box = image.crop((width * 0.57, height * 0.88, width, height))
        caption_box_data = np.array(caption_box)

        # should be range [0, 255]
        avg_intensity: float = caption_box_data.sum() / caption_box_data.size

        # arbitrary threshold for determining battles
        if avg_intensity < 50.0:
            if self.track_battle_start is None:
                print(f"BEGIN BATTLE: {frame_num}")
                self.track_battle_start = frame_num
                self.caption_box_buffer = []

            if len(self.caption_box_buffer) >= TEXT_INTERVAL / FRAME_INTERVAL:
                self.caption_box_buffer.pop(0)
            self.caption_box_buffer.append((frame_num, caption_box))
        else:
            if self.track_battle_start:
                print(f"END BATTLE: {frame_num}")
                self.battles.append((self.track_battle_start, frame_num))
                self.track_battle_start = None

        # arbitrary threshold for determining if there is text
        if (
            self.track_battle_start is not None
            and frame_num % TEXT_INTERVAL == 0
            and avg_intensity > 4
            and frame_num > self.track_battle_start + FRAME_INTERVAL
        ):
            self.process_caption()
        # caption_box.save(f"debug/{i}.jpg")

    def result(self) -> dict[str, pd.DataFrame]:
        battles = [(start / FPS, end / FPS) for start, end in self.battles]
        battles.sort(key=lambda x: x[0])

        flattened_captions = []
        for frame_num, caption_list in self.captions:
            for caption in caption_list:
                flattened_captions.append((frame_num, caption))

        captions = []
        prev_caption = None
        for frame_num, caption in flattened_captions:
            caption = caption.strip()
            if caption != prev_caption and caption != "":
                prev_caption = caption
                captions.append((frame_num / FPS, caption))
        return {
            "battles": pd.DataFrame(battles, columns=["begin", "end"]),
            "captions": pd.DataFrame(captions, columns=["time", "text"]),
        }


def iterate_video() -> Generator:
    cap = cv2.VideoCapture(VIDEO_PATH)
    i = 0
    try:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            if i >= RESUME_FROM:
                yield i, frame

            i += 1

            if DEBUG_LIMIT is not None and i >= RESUME_FROM + DEBUG_LIMIT:
                break
    finally:
        print(f"stopped at frame no: {i}")
        cap.release()

In [None]:
processor = MyProcessor()
for i, frame in tqdm(iterate_video()):
    processor.process_frame(frame, i)
result = processor.result()
result["battles"].to_csv(RAW_DATA_PATH / Path("battles.csv"), index=False)
result["captions"].to_csv(RAW_DATA_PATH / Path("captions.csv"), index=False)