### Import

In [None]:
import os
import base64
import json
import sys
from pathlib import Path
from pdf2image import convert_from_path
from PIL import Image
from io import BytesIO
from langchain.prompts import (
    HumanMessagePromptTemplate,
    ChatPromptTemplate,
    FewShotChatMessagePromptTemplate,
    AIMessagePromptTemplate
)
from langchain_core.messages.content_blocks import Base64ContentBlock
from langchain_core.messages.utils import count_tokens_approximately
from langchain_core.messages import (
    convert_to_openai_image_block, 
    SystemMessage,
)


from dotenv import load_dotenv
load_dotenv("../.env")
doc_path = os.environ.get("DOC_PATH")
code_path = os.environ.get("CODE_PATH")
sys.path.append(code_path)


### Convert to JPEG

In [None]:
def convert_pdf_to_jpeg(input_dir: str, output_dir: str) -> None:
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    for pdf_file in input_path.glob("*.pdf"):
        print(f"Processing: {pdf_file.name}")
        pages = convert_from_path(str(pdf_file))
        for i, page in enumerate(pages):
            img_file = output_path / (pdf_file.stem + ".jpg")
            page.save(img_file, 'JPEG')

In [None]:
input_dir = doc_path
output_dir = os.path.join(input_dir, "IMG")

In [None]:
convert_pdf_to_jpeg(input_dir, output_dir)

### Crop Image

In [None]:
def calculate_crop_box(image: Image, left_pct: float, 
        upper_pct: float, right_pct: float, lower_pct: float
    ) -> tuple[float, float, float, float]:
        """
        Calculate crop box coordinates based on percentages of image dimensions.s.

        Parameters:
        - image: PIL Image object.
        - left_pct: float, percentage (0 to 1.0) of the width for the left bound.
        - upper_pct: float, percentage (0 to 1.0) of the height for the upper bound.
        - right_pct: float, percentage (0 to 1.0) of the width for the right bound.
        - lower_pct: float, percentage (0 to 1.0) of the height for the lower bound.

        Returns:
        - tuple, crop box coordinates (left, upper, right, lower).
        """
        image_width, image_height = image.size
        left = int(image_width * left_pct)
        upper = int(image_height * upper_pct)
        right = int(image_width * right_pct)
        lower = int(image_height * lower_pct)
        
        return (left, upper, right, lower)


In [None]:
def scale_image(image: Image, pct: float) -> Image:
    new_size = (int(image.width * pct), int(image.height * pct))
    return image.resize(new_size)

In [None]:
def crop_images(input_dir: str, output_dir: str) -> None:
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    for img_path in input_path.glob("*.jpg"):
        print(f"Processing: {img_path.name}")
        img = Image.open(str(img_path))
        box = calculate_crop_box(img, 0.8, 0.65, 1.0, 1.0)
        img = scale_image(img.crop(box), 0.4)
        
        img_file = output_path / (img_path.stem + ".jpg")
        img.save(img_file, 'JPEG')

In [None]:
input_dir = os.path.join(doc_path, "IMG")
output_dir = os.path.join(input_dir, "CROP")

In [None]:
crop_images(input_dir, output_dir)

### Prompt Template

In [None]:
def create_system_message() -> SystemMessage:
    return SystemMessage(
        content="""You are an expert in Optical Character Recognition (OCR) and information extraction 
from engineering drawings. Your task is to analyze an image of an engineering 
drawing title block and extract the following information:

Document No.: The unique identifier for the entire document.
Sheet: The sheet number within the document (e.g., 1 of 3, 2 of 3, etc.). Extract the entire string as it appears.
Rev: The revision number or letter.
Title: The title of the drawing (may be multiline).

Pay close attention to the layout and font styles within the title block. 
The information may be located in different positions within the image. 
Ensure accuracy and completeness in your extraction.
"""
    )

In [None]:
def create_example_message(examples: list[dict]) -> FewShotChatMessagePromptTemplate:
    example_prompt=(
        HumanMessagePromptTemplate.from_template([
            "Example:\n\nImage: ",
            {
                "type": "image_url",
                "image_url": "data:image/jpeg;base64,{image64}"
            }
        ])
        + AIMessagePromptTemplate.from_template("""Extracted Information:

Document No.: {doc_no}
Sheet: {sheet_no}
Rev: {revision}
Title: {title}
Project No.: {project_no}
"""
        )
    )
    return FewShotChatMessagePromptTemplate(
        examples=examples,
        example_prompt=example_prompt
    )

In [None]:
def create_prompt(examples: list[dict]) -> ChatPromptTemplate:
    prompt = HumanMessagePromptTemplate.from_template([
        """Now, analyze the following image and extract the same information:

Images:""",
        {
            "type": "image_url",
            "image_url": "data:image/jpeg;base64,{image64}"
        },
        """Output your answer in the following format:

Document No.: [Extracted Document Number]
Sheet: [Extracted Sheet Information]
Rev: [Extracted Revision]
Title: [Extracted Title]
Project No.: [Extracted Project Number]

Ensure that you follow the same format as shown in the examples.
"""
    ])
    return ChatPromptTemplate.from_messages(
        [
            create_system_message(),
            create_example_message(examples),
            prompt,
        ]
    )

### Data Utils

In [None]:
def image_to_base64(image: Image) -> str:
    buffered = BytesIO()
    image.save(buffered, format="JPEG")
    return base64.b64encode(buffered.getvalue()).decode("utf-8")

In [None]:
def create_image_message_content(image: Image) -> dict:
    content = Base64ContentBlock(
        type="image",
        source_type="base64",
        data=image_to_base64(image),
        mime_type="image/jpeg",
    )
    return convert_to_openai_image_block(content)

In [None]:
def load_test_config() -> dict:
    file_path = os.path.join(doc_path, "TEST", "TEST2.json")
    with open(file_path, 'r') as file:
        return json.load(file)

In [None]:
def load_example_image(test_config: dict) -> Image:
    example_doc_path = os.path.join(doc_path, "IMG", "CROP", test_config["example_doc"])
    return Image.open(example_doc_path)

In [None]:
def load_image_doc(test_config: dict, doc_no: int) -> Image:
    image_path = os.path.join(doc_path, "IMG", "CROP", test_config["docs"][doc_no])
    return Image.open(image_path)

In [None]:
def load_examples(test_config: dict) -> list[dict]:
    image = load_example_image(test_config)
    example_doc = image_to_base64(image)
    example_resp = test_config["response"]
    return [{
        "image64" : example_doc,
        "title" : example_resp["title"],
        "sheet_no" : example_resp["sheet_no"],
        "doc_no" : example_resp["doc_no"],
        "revision" : example_resp["revision"],
        "project_no" : example_resp["project_no"]
    }]

### Prompt Messages

In [None]:
test_config = load_test_config()
# test_config

In [None]:
examples = load_examples(test_config)
examples

In [None]:
image64_example = create_image_message_content(load_example_image(test_config))
image64_example

In [None]:
example_message_prompt = create_example_message(examples)
example_messages = example_message_prompt.format_messages()

In [None]:
for message in example_messages:
    message.pretty_print()

In [None]:
count_tokens_approximately(example_messages)

In [None]:
prompt = create_prompt(examples)
image64_doc = image_to_base64(load_image_doc(test_config, 0))
prompt_messages = prompt.format_messages(image64=image64_doc)

In [None]:
for message in prompt_messages:
    message.pretty_print()

In [None]:
count_tokens_approximately(prompt_messages)

In [None]:
prompt_value = prompt.invoke({
    "image64": image64_doc
})

### LLM

In [None]:

from model import get_ollama_chat

llm = get_ollama_chat()

In [None]:
chain = prompt | llm

In [None]:
config = {"configurable": {"temperature": 0.1}}
message = chain.invoke({
    "image64": image64_doc
}, config=config)

In [None]:
print(message.content)