In [None]:
%cd E:/Python/book-translator/

In [None]:
import pymupdf
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
from langchain_google_genai import ChatGoogleGenerativeAI
from google.generativeai.types import HarmCategory, HarmBlockThreshold, GenerationConfig
import re
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.messages.ai import UsageMetadata

import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from typing import TypedDict, Tuple, List, Dict, Any, Optional, Set
from traceback import format_exc
from IPython.display import display_png, Image
import pickle

# Test the translator

In [None]:
GEMINI_DEFAULT_CONFIG = {
    "api_key": "AIzaSyA1sWw4g6ZP0xIlQeJF6YY0_2WTUYsS7JQ",
    "safety_settings": {
        HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
    },
}

In [None]:
llm = ChatGoogleGenerativeAI(
    # model="gemini-2.5-flash-lite-preview-06-17",
    model="gemini-2.5-flash",
    # model="gemini-2.5-pro",
    **GEMINI_DEFAULT_CONFIG,
    generation_config=GenerationConfig(
        max_output_tokens=3276,
        temperature=0.0,
    ),
)
llm

In [None]:
SYS_PROMPT = """
Act as a professional book translator.
Translate the following Chinese text into English, ensuring that the translation is accurate, fluent, and maintains the original meaning. Pay attention to the context and nuances of the text to provide a high-quality translation.
The text to be translated is enclosed within tag <b[number]> and </b[number]>, where [number] is a unique identifier for each segment.
The attached PDF file of the translating page is provided for reference to help understand the context better.


Answer output of each segment in the following format:
<b[number]-[line_number]>Translated text</b[number]-[line_number]>
Ignore the segments that:
- cannot be translated, such as those containing only symbols or noise.
- are a part of another segment, such as characters of a word.
For ignore segments, do not write tags and provide a brief explanation in English of why they are ignored outside the tags.

Make sure to keep the tags in the output.
You can draft your output outside the tags first, recheck it for accuracy, and then write the final output inside the tags.
Do not place note or explanation inside the tags.

EXAMPLE OF INPUT:
<b1-1>这是一个例子。</b1-1>
<b2-0>请将这段文字翻译成英文。</b2-0>
<b3-0>. * . . </b3-0>
<b4-1>这</b4-1>
<b4-2>是</b4-2>
<b4-3>垂</b4-3>
<b4-4>直</b4-4>
<b4-5>文</b4-5>
<b4-6>本</b4-6>

EXAMPLE OF OUTPUT:
<b1-1>This is an example.</b1-1>
<b2-0>Please translate this text into English.</b2-0>
The segment 3-0 contains only symbols that may be noise.
<b4-1>This</b4-1>
<b4-2>is</b4-2>
<b4-3>vertical</b4-3>
The segment 4-4 is omitted because it is a character of 4-3 word.
<b4-5>text</b4-5>
The segment 4-6 is omitted because it is a character of 4-5 word.

""".strip()

print(SYS_PROMPT)

# Load file

In [None]:
path_pdf = os.listdir("pdf")
path_pdf

In [None]:
raw_doc = pymupdf.open(os.path.abspath(os.path.join("pdf", path_pdf[1])))
raw_doc

# Explore Data

In [None]:
page = raw_doc[22]

blocks = page.get_text("blocks")

# take text for translation
texts = "\n".join(
    f"<b{block_no}>\n{txt}\n</b{block_no}>"
    for txt, block_no in (
        (block[4].strip(), block[5]) for block in blocks if block[6] == 0
    )
    # check if contains chinese characters
    if txt
    and any(
        ("\u4e00" <= char <= "\u9fff") or ("\u3400" <= char <= "\u4dbf") for char in txt
    )
)
print(texts)

In [None]:
class LineInfo(TypedDict):
    bbox: Tuple[float, float, float, float]
    block_no: int
    line_no: int
    text: str
    font: List[str]
    font_size: float

In [None]:
for page in raw_doc[29:30]:
    data = page.get_text("dict")
    block_texts: Dict[Tuple[int, int], LineInfo] = {}
    for _block in data["blocks"]:
        if _block.get("type") != 0:
            continue
        _block_no = _block["number"]
        if lines := _block.get("lines"):
            for line_number, line in enumerate(lines):
                _text: List[str] = []
                _font: List[str] = []
                _font_size: List[float] = []
                for span in line.get("spans", []):
                    text = (span.get("text", "") or "").strip()
                    if text:
                        _text.append(text)
                        # Check if the text contains Chinese characters
                        if __font_size := span.get("size"):
                            _font_size.append(__font_size)
                        if __font := span.get("font"):
                            _font.append(__font)
                block_texts[(_block_no, line_number)] = LineInfo(
                    bbox=tuple(line.get("bbox", (0.0, 0.0, 0.0, 0.0))),
                    block_no=_block_no,
                    line_no=line_number,
                    text=" ".join(_text).strip(),
                    font=_font,
                    font_size=sum(_font_size) / len(_font_size) if _font_size else 0.0,
                )
    break
block_texts

In [None]:
blocks[1]['lines'][0]

In [None]:
new_pdf = pymupdf.open()
new_pdf.insert_pdf(raw_doc, from_page=page.number, to_page=page.number)
pdf_page_bytes = new_pdf.write()
len(pdf_page_bytes)

In [None]:
import base64

base64_string = base64.b64encode(pdf_page_bytes).decode("utf-8")

output = await llm.ainvoke(
    [
        SystemMessage(content=SYS_PROMPT),
        HumanMessage(
            content=[
                texts,
                {
                    "type": "media",
                    "data": base64_string,
                    "mime_type": "application/pdf",
                },
            ]
        ),
    ]
)
output

In [None]:
# parsed tag <b[number]>...</b[number]>
import re
pattern = re.compile(r"<b(?P<block_no>\d+)>(?P<content>.*?)</b\1>", re.DOTALL|re.I)

In [None]:
print(output.content)

In [None]:
for i in pattern.finditer(output.content):
    print(int(i.group("block_no")), i.group("content").strip())

In [None]:
output.usage_metadata

In [None]:
output.usage_metadata

# Translate Whole Book

In [None]:
# regex to check text contains Chinese characters
# pattern_cn = re.compile(r"[\u4e00-\u9fff\u3400-\u4dbf\uf900-\ufaff]")
pattern_cn = re.compile(r"[\u4e00-\u9fff\u3400-\u4dbf]")
pattern_cn

In [None]:
def max_color_normalized(sample: bytes) -> float:
    hist, bin_edges = np.histogram(
        np.fromiter(sample, dtype=np.uint8), bins=256, range=(0, 255)
    )
    return bin_edges[np.argmax(hist)] / 255.0

In [None]:
for page in raw_doc:
    # Get all drawing paths on the page
    drawings = page.get_drawings()

    blocks = page.get_text("blocks")
    for block in blocks:
        x0, y0, x1, y1, text, block_no, block_type = block
        if block_type == 0:  # 0 for text blocks
            block_text = text
            block_rect = pymupdf.Rect(x0, y0, x1, y1)

            pix = page.get_pixmap(clip=block_rect, dpi=72)

            # Calculate the average color of the pixmap

            if pix.width > 0 and pix.height > 0:
                r = max_color_normalized(pix.samples[0 :: pix.n])
                g = max_color_normalized(pix.samples[1 :: pix.n])
                b = max_color_normalized(pix.samples[2 :: pix.n])
            else:
                r = g = b = 1  # default to white if no size

            # if background is near gray band -> add more contrast
            _bg = (r + g + b) / 3
            if 0.4 < _bg < 0.5:
                r = max(r / 1.25, 0)
                g = max(g / 1.25, 0)
                b = max(b / 1.25, 1)
            elif 0.5 <= _bg < 0.6:
                r = min(r * 1.25, 1)
                g = min(g * 1.25, 1)
                b = min(b * 1.25, 1)

            # determine text color to be black or white
            text_color = (0, 0, 0) if _bg < 0.5 else (1, 1, 1)
            background_color = (r, g, b)
            print(
                f"Block {block_no}: Text Color: {text_color}, Background Color: {background_color}"
            )
            break
    break

# Run translate pipeline

## Define function

In [None]:
tag_pattern = re.compile(r"<b(?P<block_no>\d+)[-_\s]+(?P<line_no>\d+)>(?P<content>.*?)</b\1[-_\s]+\2>", re.DOTALL|re.I)
tag_pattern

In [None]:
class PageTranslationResult(TypedDict):
    page_number: int
    raw_page: pymupdf.Page
    block_texts: Dict[Tuple[int, int], LineInfo]
    translator_input: str
    translated_text: str
    bg_color: Dict[Tuple[int, int], Tuple[float, float, float]]
    text_color: Dict[Tuple[int, int], Tuple[float, float, float]]
    token_usage: Optional[UsageMetadata]


async def extract_and_translate_page(
    page: pymupdf.Page, page_number: int
) -> PageTranslationResult:
    data = page.get_text("dict")
    block_texts: Dict[Tuple[int, int], LineInfo] = {}
    for _block in data["blocks"]:
        if _block.get("type") != 0:
            continue
        _block_no = _block["number"]
        if lines := _block.get("lines"):
            for line_number, line in enumerate(lines):
                _text: List[str] = []
                _font: List[str] = []
                _font_size: List[float] = []
                for span in line.get("spans", []):
                    text = (span.get("text", "") or "").strip()
                    if text and pattern_cn.search(text):
                        _text.append(text)
                        # Check if the text contains Chinese characters
                        if __font_size := span.get("size"):
                            _font_size.append(__font_size)
                        if __font := span.get("font"):
                            _font.append(__font)
                _join_text = " ".join(_text).strip()
                if not _join_text:
                    continue
                _line_info = LineInfo(
                    bbox=tuple(line.get("bbox", (0.0, 0.0, 0.0, 0.0))),
                    block_no=_block_no,
                    line_no=line_number,
                    text=" ".join(_text).strip(),
                    font=_font,
                    font_size=sum(_font_size) / len(_font_size) if _font_size else 0.0,
                )
                block_texts[(_block_no, line_number)] = _line_info

    if not block_texts:
        return PageTranslationResult(
            page_number=page_number,
            raw_page=page,
            block_texts={},
            translator_input="",
            translated_text="",
            bg_color={},
            text_color={},
            token_usage=None,
        )

    texts = "\n".join(
        f"<b{_block_no}-{_line_no}>\n{_line_info['text']}\n</b{_block_no}-{_line_no}>"
        for (_block_no, _line_no), _line_info in sorted(
            block_texts.items(), key=lambda x: x[0]
        )
    )

    # task page as bytes of PDF for Gemini API
    new_pdf = pymupdf.open()
    new_pdf.insert_pdf(raw_doc, from_page=page.number, to_page=page.number)
    pdf_page_bytes = new_pdf.write()

    # call Gemini API
    # overlay text on image
    base64_string = base64.b64encode(pdf_page_bytes).decode("utf-8")
    task_llm = asyncio.create_task(
        llm.ainvoke(
            [
                SystemMessage(content=SYS_PROMPT),
                HumanMessage(
                    content=[
                        texts,
                        {
                            "type": "media",
                            "data": base64_string,
                            "mime_type": "application/pdf",
                        },
                    ]
                ),
            ]
        )
    )

    # overlay rect on image
    # calculate block text & background color
    bg_color: Dict[Tuple[int, int], Tuple[float, float, float]] = {}
    text_color: Dict[Tuple[int, int], Tuple[float, float, float]] = {}
    for _block_line, _block in block_texts.items():
        x0, y0, x1, y1 = _block["bbox"]
        # add more margin
        x0 = max(0, x0 - 10)
        y0 = max(0, y0 - 10)
        x1 = min(page.rect.width, x1 + 10)
        y1 = min(page.rect.height, y1 + 10)
        block_rect = pymupdf.Rect(x0, y0, x1, y1)

        pix = page.get_pixmap(clip=block_rect, dpi=72)

        # Calculate the average color of the pixmap

        if pix.width > 0 and pix.height > 0:
            r = max_color_normalized(pix.samples[0 :: pix.n])
            g = max_color_normalized(pix.samples[1 :: pix.n])
            b = max_color_normalized(pix.samples[2 :: pix.n])
        else:
            r = g = b = 1  # default to white if no size

        _bg = (r + g + b) / 3
        if 0.4 < _bg < 0.5:
            r = max(r / 1.25, 0)
            g = max(g / 1.25, 0)
            b = max(b / 1.25, 1)
        elif 0.5 <= _bg < 0.6:
            r = min(r * 1.25, 1)
            g = min(g * 1.25, 1)
            b = min(b * 1.25, 1)

        # determine text color to be black or white
        text_color[_block_line] = (0, 0, 0) if _bg > 0.5 else (1, 1, 1)
        bg_color[_block_line] = (r, g, b)

    # get gemini output
    output: AIMessage = await task_llm

    return PageTranslationResult(
        page_number=page_number,
        raw_page=page,
        block_texts=block_texts,
        translator_input=texts,
        translated_text=output.content,
        bg_color=bg_color,
        text_color=text_color,
        token_usage=output.usage_metadata,
    )

In [None]:
# test_page = await extract_and_translate_page(raw_doc[0], 0)
# test_page

In [None]:
def render_translated_page(result: PageTranslationResult) -> pymupdf.Page:
    page_no = result["page_number"]
    page = result["raw_page"]
    block_texts = result["block_texts"]
    bg_color = result["bg_color"]
    text_color = result["text_color"]
    translated_text = result["translated_text"]

    new_pdf = pymupdf.open()
    new_page = new_pdf.new_page(
        width=page.rect.width,
        height=page.rect.height,
    )
    new_page.show_pdf_page(new_page.rect, page.parent, page_no)

    for i in tag_pattern.finditer(translated_text):
        try:
            block_no = int(i.group("block_no"))
            line_no = int(i.group("line_no"))
            block_text = i.group("content").strip()
            block_key = (block_no, line_no)
            if (
                (block_key in block_texts)
                and block_text
                and (block_key in bg_color)
                and (block_key in text_color)
            ):
                _block = block_texts[block_key]
                x0, y0, x1, y1 = _block["bbox"]
                _bg_color = bg_color[block_key]
                # draw background rect
                new_page.draw_rect(
                    pymupdf.Rect(x0, y0, x1, y1),
                    color=_bg_color,
                    fill=_bg_color,
                    overlay=True,
                )
                _text_color = text_color[block_key]
                _font_size = _block["font_size"]
                # adjust font size to fit in box
                _text_width = pymupdf.get_text_length(block_text, fontsize=_font_size)
                if _text_width > (x1 - x0 - 1):
                    _adjusted_font_size = max(
                        _font_size * (x1 - x0 - 1) / _text_width, 10
                    )
                else:
                    _adjusted_font_size = _font_size
                # recheck text width
                _text_width = pymupdf.get_text_length(
                    block_text, fontsize=_adjusted_font_size
                )
                if _text_width > (x1 - x0 - 1):
                    # adjusted x0 and x1
                    _center_x = (x0 + x1) / 2
                    x0 = min(_center_x - _text_width / 2, x0)
                    x1 = max(_center_x + _text_width / 2, x1)

                new_page.insert_textbox(
                    pymupdf.Rect(x0, y0, x1, y1),
                    block_text,
                    color=_text_color,
                    fontsize=_adjusted_font_size,
                    # fontname="Helvetica",
                )
                # print(
                #     f"Processed block {i.group('block_no')} line {i.group('line_no')}, font {_font_size}->{_adjusted_font_size}: {block_text} ({_text_color})"
                # )
        except Exception as e:
            print(f"Error processing block {i.group('block_no')}: {e}\n{format_exc()}")
            continue

    return new_page

In [None]:
# display_png(Image(raw_doc[0].get_pixmap(dpi=150).pil_tobytes("png")))

In [None]:
# test_render_page = render_translated_page(test_page)
# # convert to image and render on jupyter notebook

# display_png(Image(test_render_page.get_pixmap(dpi=150).pil_tobytes("png")))

In [None]:
class PageTranslationResult(TypedDict):
    page_number: int
    raw_page: pymupdf.Page
    block_texts: Dict[Tuple[int, int], LineInfo]
    translator_input: str
    translated_text: str
    bg_color: Dict[Tuple[int, int], Tuple[float, float, float]]
    text_color: Dict[Tuple[int, int], Tuple[float, float, float]]
    token_usage: Optional[UsageMetadata]

In [None]:
async def process_page(
    page: pymupdf.Page,
    page_number: int,
) -> Tuple[int, Optional[pymupdf.Page], PageTranslationResult]:
    result = await extract_and_translate_page(page, page_number)
    rendered_page = render_translated_page(result)
    return (page_number, rendered_page, result)

In [None]:
page_no, rendered_page, result = await process_page(raw_doc[29], 29)
display_png(Image(result["raw_page"].get_pixmap(dpi=150).pil_tobytes("png")))
display_png(Image(rendered_page.get_pixmap(dpi=150).pil_tobytes("png")))

In [None]:
(
    (result["token_usage"]["input_tokens"] * 0.3)
    + (result["token_usage"]["output_tokens"] * 2.5)
) * (40e-6)

## Run pipeline

In [None]:
batch = 2
pdf_file = path_pdf[1]
raw_doc = pymupdf.open(os.path.abspath(os.path.join("pdf", pdf_file)))

print(f"Processing PDF: {pdf_file}, total pages: {len(raw_doc)}")

In [None]:
total_pages = len(raw_doc)

results: List[Optional[PageTranslationResult]] = []
finished_pages: List[pymupdf.Page] = []

In [None]:
running_tasks: Set[
    asyncio.Task[Tuple[int, Optional[pymupdf.Page], PageTranslationResult]]
] = set()
progress = tqdm(total=total_pages, desc=f"PDF: {pdf_file}")
processing_pages: Set[int] = set()
finished_pages: Set[int] = set()

while True:
    if (len(running_tasks) < batch) and (
        (len(finished_pages) + len(running_tasks)) < total_pages
    ):
        page_number = len(finished_pages) + len(running_tasks)
        task = asyncio.create_task(process_page(raw_doc[page_number], page_number))
        task.add_done_callback(lambda _: progress.update(1))
        running_tasks.add(task)
        progress.set_postfix_str(
            f"run:{len(running_tasks)}@{page_number}|finished:{len(finished_pages)}"
        )
        processing_pages.add(page_number)
        continue

    done, running_tasks = await asyncio.wait(
        running_tasks, return_when=asyncio.FIRST_COMPLETED, timeout=60
    )
    for d in done:
        try:
            running_tasks.remove(d)
            page_number, rendered_page, result = d.result()
            results.append(result)
            finished_pages.add(page_number)
        except Exception as e:
            print(f"Error processing page: {e}\n{format_exc()}")
            continue
    if (not running_tasks) and (len(finished_pages) >= total_pages):
        break

In [None]:
new_pdf = pymupdf.open()
for result in tqdm(results):
    raw_page = result["raw_page"]
    if rendered_page:
        new_pdf.insert_pdf(
            raw_page.parent,
            from_page=raw_page.number,
            to_page=raw_page.number,
        )
        new_pdf.insert_pdf(
            rendered_page.parent,
            from_page=rendered_page.number,
            to_page=rendered_page.number,
        )
        finished_pages.append(rendered_page)

In [None]:
new_pdf.save(os.path.abspath(os.path.join("output", pdf_file)))