In [None]:
import torch
import base64
from io import BytesIO
from PIL import Image
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
import tempfile
import pandas as pd
import dspy
import gc
from tqdm import tqdm

from olmocr.data.renderpdf import render_pdf_to_base64png
from pypdf import PdfReader

In [None]:
def encode_image_file(image_path):
    # Open the image file in binary read mode
    with open(image_path, "rb") as image_file:
        # Read and encode the image content in base64, then decode to UTF-8 string
        return base64.b64encode(image_file.read()).decode('utf-8')

def resize_encode_image(image_path, min_pixels, max_pixels):
    # If input is not a string (e.g., file-like object), save it to a temporary file
    if not isinstance(image_path, str):
        with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
            temp_pdf.write(image_path)
            image_path = temp_pdf.name

    # Open the image
    img = Image.open(image_path)
    width, height = img.size

    # Calculate the total number of pixels
    total_pixels = width * height

    # If the image is already within the allowed pixel range, return its base64 encoding
    if total_pixels <= max_pixels:
        print(f"The image {image_path} is already within the range ({width}x{height}).")
        with open(image_path, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')
    else:
        # Compute downscaling factor to respect max_pixels
        scale_factor = (max_pixels / total_pixels) ** 0.5

    # Compute new dimensions preserving aspect ratio
    new_width = int(width * scale_factor)
    new_height = int(height * scale_factor)

    # Resize the image using high-quality filter
    resized_img = img.resize((new_width, new_height), Image.LANCZOS)

    # Save the resized image to an in-memory buffer
    img_bytes = BytesIO()
    resized_img.save(img_bytes, format="PNG")  # Change format if needed
    img_bytes = img_bytes.getvalue()

    # Encode to Base64 and return
    return base64.b64encode(img_bytes).decode('utf-8')

def encode_pdf(path: str, page_indexes: tuple = None):
    # Check if input is not a string, save it to a temporary file
    if not isinstance(path, str):  # FIXED typo: was checking undefined `image_path`
        with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
            temp_pdf.write(path)
            path = temp_pdf.name

    # Load the PDF
    reader = PdfReader(path)
    pages_base64 = []
    num_pages = len(reader.pages)

    # If no page indexes provided, process all
    if page_indexes is None:
        page_indexes = range(num_pages)

    # Render each selected page to image and encode
    for i in tqdm(page_indexes):
        pages_base64.append(render_pdf_to_base64png(path, i, target_longest_image_dim=1024))

    return pages_base64

: 

In [None]:
# Initialize the model
model = Qwen2VLForConditionalGeneration.from_pretrained(
    "allenai/olmOCR-7B-0225-preview", torch_dtype=torch.bfloat16
).eval()

min_pixels = 256 * 28 * 28
max_pixels = 1280 * 28 * 28

# Initialize processor
processor = AutoProcessor.from_pretrained(
    "Qwen/Qwen2.5-VL-7B-Instruct",
    min_pixels=min_pixels,
    max_pixels=max_pixels,
    use_fast=True
)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print("Model loaded")


In [None]:
# Load and encode image
image_path = 'OCR_menu_example/alligalli_section2.png'
image_base64 = resize_encode_image(image_path, max_pixels=max_pixels, min_pixels=min_pixels
                                   
# PDF
#pdf_path = "OCR_menu_example/menu-gustoal129.pdf"
#image_base64 = encode_pdf(pdf_path, (3,))[0]

In [None]:
# Construct input message
messages = [
    {
        "role": "user",
        "content": [
            {"type": "text", "text": """Extract only the menu dishes from the image and provide it as a list
             with following fields: name, price, ingredients.
             Try to dectect the section of each dish among the following categories: Antipasti, Primi, Secondi, Contorni, Dolci, Bevande
            If any of these fields are not present use the value None
            DO NOT PROVIDE ANY OTHER INFORMATION
            Example of output: [
                ['Pizza Margherita', '10.0', 'Pomodoro, Mozzarella', "Secondi"]
             ["Pasta al pomodoro", '8.00', 'Pasta, pomodoro, basilico', "Primi"]
             ]"""},
            {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}},
        ],
    }
]

# Prepare prompt
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
main_image = Image.open(BytesIO(base64.b64decode(image_base64)))

In [None]:
# Clean memory
try:
    del output
    del inputs
except:
    pass

torch.cuda.empty_cache()
gc.collect()

In [None]:
# Preprocess inputs
inputs = processor(
    text=[text],
    images=[main_image],
    padding=True,
    return_tensors="pt",
)
inputs = {key: value.to(device) for (key, value) in inputs.items()}


In [None]:

# Run model inference
output = model.generate(
    **inputs,
    temperature=0.8,
    max_new_tokens=8000,
    num_return_sequences=1,
    do_sample=True,
)

In [None]:
# Decode the generated tokens
prompt_length = inputs["input_ids"].shape[1]
new_tokens = output[:, prompt_length:]
text_output = processor.tokenizer.batch_decode(
    new_tokens, skip_special_tokens=True
)


In [None]:
print(text_output[0])

## Optimization

In [None]:
# Configure language models (select one of the following)
huggingface_token = ""
deep_seek_token = ""

lm_llama = dspy.LM("huggingface/meta-llama/Meta-Llama-3-8B-Instruct", api_key=huggingface_token, cache=False)
lm_deepseek = dspy.LM('openai/deepseek-chat', api_key=deep_seek_token, api_base="https://api.deepseek.com", cache=False, max_tokens=8000)
lm_ollama = dspy.LM('ollama_chat/qwen2.5:32b', api_base='http://localhost:11434', api_key='', max_tokens=8000, cache=False)


dspy.configure(lm=lm_deepseek)

In [None]:
# Define a DSPy Signature for parsing menu items
class BasicQA(dspy.Signature):
    """Format this menu into a list readable for pandas dataframe.
    The dataframe should have three columns: name, ingredients, price, section.
    If there is no section, assign one from these: Antipasti, Primi, Secondi, Contorni, Dolci, Bevande.
    The section MUST BE ONE OF THESE categories.
    Do not split a dish if it is on the same row.
    """
    question = dspy.InputField(desc="Raw menu input")
    answer: list = dspy.OutputField(desc="should be a list")

# Create reasoning chain to format menu
generate_formatted_menu = dspy.ChainOfThought(BasicQA)


In [None]:
# Run formatting logic
formatted_menu = generate_formatted_menu(question=text_output[0])

# %%
# Display results
pd.DataFrame(formatted_menu.answer)