In [None]:
import base64
import httpx
import json
import pymupdf

from pathlib import Path
from typing import Dict, Any, Optional, List
import os
import io
from PIL import Image
from pathlib import Path

from dotenv import load_dotenv
load_dotenv()

True

In [2]:
SUMMARIZATION_SYSTEM_MESSAGE = """
Your mission is to summarize the following text in a short and concise way.
Always answer in a well-formatted JSON object containing a single string item called 'summary' 
"""

DESC_FIG_SYSTEM_MESSAGE = """
Your mission is to provide a brief and informative description of each image you will be shown.
Always answer in a well-formatted JSON object containing:
- type: a string describing the type of figure you see (plot, picture, diagram, etc.)
- description: the information you can derive from the figure
"""

In [3]:
def _encode_document_pages_to_base64(pdf_doc_path: str) -> List[str]:
    encoded_pages: List[str] = []
    doc = pymupdf.open(pdf_doc_path)
    for page in doc:
        page_bytes = page.get_pixmap().tobytes("jpeg")
        page_b64_encoded = base64.b64encode(page_bytes).decode("utf-8")
        encoded_pages.append(page_b64_encoded)
    return encoded_pages


def _encode_document_to_base64(document_path: str) -> str:
    with Path(document_path).open(mode="rb") as f_in:
        doc_encoded = base64.b64encode(f_in.read()).decode("utf-8")
        return doc_encoded

def _call_ocr_model(
    endpoint: str, api_key: str, base64_input_data: str
) -> Dict[str, Any]:
    endpoint_url = f"{endpoint}/v1/ocr"
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "Authorization": f"Bearer {api_key}",
    }
    payload = {
        "model": "mistral-ocr-2503",
        "document": {"type": "document_url", "document_url": base64_input_data},
        "include_image_base64": True,
    }
    with httpx.Client() as client:
        ocr_resp = client.post(
            url=endpoint_url, headers=headers, json=payload, timeout=60.0
        )
        ocr_resp.raise_for_status()
        return ocr_resp.json()
    
    
def _call_vlm_model(
    endpoint: str,
    api_key: str,
    user_message: Dict[str, Any],
    system_message: Dict[str, str],
) -> Dict[str, Any]:
    url = f"{endpoint}/v1/chat/completions"
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "Authorization": f"Bearer {api_key}",
    }
    payload = {
        "model": "mistral-small-2503",
        "messages": [system_message, user_message],
        "temperature": 0,
        "response_format": {"type": "json_object"},
    }
    with httpx.Client() as client:
        resp = client.post(url=url, headers=headers, json=payload, timeout=60.0)
        resp.raise_for_status()
        return resp.json()
    
    

In [4]:

class Document:
    def __init__(self, source_file: str | Path | None = None):
        self.source_file: str | Path | None = source_file
        self.parsed_doc: str | None = None

    def parse(self):
        encoded_doc = _encode_document_to_base64(document_path=self.source_file)
        self.parsed_doc = _call_ocr_model(
            endpoint=os.getenv("AZURE_MISTRAL_OCR_ENDPOINT"),
            api_key=os.getenv("AZURE_MISTRAL_OCR_API_KEY"),
            base64_input_data=f"data:application/pdf;base64,{encoded_doc}",
        )

    def summarize(self) -> Dict[str, Any]:
        system_message = {"role": "system", "content": SUMMARIZATION_SYSTEM_MESSAGE}
        user_message_content: List[Dict[str, Any]] = []
        pages = self.parsed_doc["pages"]
        for page in pages:
            user_message_content.append({"type": "text", "text": page["markdown"]})
        user_message = {"role": "user", "content": user_message_content}
        vlm_resp = _call_vlm_model(
            endpoint=os.getenv("AZURE_MISTRAL_SMALL_ENDPOINT"),
            api_key=os.getenv("AZURE_MISTRAL_SMALL_API_KEY"),
            system_message=system_message,
            user_message=user_message,
        )
        return json.loads(vlm_resp["choices"][0]["message"]["content"])
    
    def describe_images(self, pages: Optional[List[int]] = None) -> Dict[str, Any]:
        system_message = {"role": "system", "content": DESC_FIG_SYSTEM_MESSAGE}
        images: List[Dict[str, Any]] = []
        for idx, page in enumerate(self.parsed_doc["pages"]):
            for img in page["images"]:
                user_message = {
                    "role": "user",
                    "content": [
                        {"type": "image_url", "image_url": {"url": img["image_base64"]}}
                    ],
                }
                vlm_resp = _call_vlm_model(
                    endpoint=os.getenv("AZURE_MISTRAL_SMALL_ENDPOINT"),
                    api_key=os.getenv("AZURE_MISTRAL_SMALL_API_KEY"),
                    system_message=system_message,
                    user_message=user_message,
                )
                desc_dict = json.loads(vlm_resp["choices"][0]["message"]["content"])
                fig_desc = {"page": idx, "desc": desc_dict}
                images.append(fig_desc)
        return images
    
    def save_images(self, output_dir: Path):
        output_dir.mkdir(parents=True, exist_ok=True)
        for idx, page in enumerate(self.parsed_doc["pages"]):
            for img in page["images"]:
                img_b64 = img['image_base64']
                img_b64 = img_b64.split(',')[1]
                img_b64 = base64.b64decode(img_b64)
                img_obj = Image.open(io.BytesIO(img_b64))
                img_path = os.path.join(output_dir, img['id'] )
                img_obj.save(img_path)
                
    def save_markdown(self, output_dir: Path, output_file: str):
        output_dir.mkdir(parents=True, exist_ok=True)
        pages = self.parsed_doc["pages"]
        
        md = ""
        for idx, page in enumerate(pages, start=1):
            page_md = page['markdown']
            md += f"<page number=\"{idx}\">\n{page_md}\n</page>\n"
        
        md = "<pages>\n" + md + "\n</pages>"
        
        with open(output_dir / output_file, "w") as f:
            f.write(md)
                

In [None]:
from tqdm import tqdm
import traceback

DATA_DIR = Path("/home/ubuntu/environment/aiopt/spec/data")

SPEC_DIR = DATA_DIR / "specbook/specbook_pdf"
IMG_OUTPUT_DIR = DATA_DIR / "specbook/parsed_by_mistral/images"
MD_OUTPUT_DIR = DATA_DIR / "specbook/parsed_by_mistral/markdown"

IMG_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
MD_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Get list of already processed files
processed_files = set()
for md_file in MD_OUTPUT_DIR.glob("*.txt"):
    processed_files.add(md_file.stem)

CONTINUE = True

spec_files = list(SPEC_DIR.glob("*.pdf"))
for idx, spec_file in enumerate(tqdm(spec_files, desc="Processing spec files"), start=1):
    fname = spec_file.stem
    
    # Skip if file already processed
    if fname in processed_files and CONTINUE:
        continue
        
    try:
        doc = Document(spec_file)
        doc.parse()
        
        # save images
        doc.save_images(IMG_OUTPUT_DIR / fname)
        
        # save markdown 
        doc.save_markdown(MD_OUTPUT_DIR, f"{fname}.txt")
    except Exception as e:
        print(f"Error processing {fname}:")
        print(traceback.format_exc())
        continue

Processing spec files:   0%|          | 0/670 [00:00<?, ?it/s]

Processing spec files:   8%|▊         | 53/670 [00:32<06:23,  1.61it/s]

Error processing VFDSXNEEP0011-VFe34s SPB_SYSTEM_SCHEMATICS:
Traceback (most recent call last):
  File "/tmp/ipykernel_45709/1004845501.py", line 33, in <module>
    doc.save_images(IMG_OUTPUT_DIR / fname)
  File "/tmp/ipykernel_45709/3289181672.py", line 53, in save_images
    for idx, page in enumerate(self.parsed_doc["pages"]):
                               ~~~~~~~~~~~~~~~^^^^^^^^^
TypeError: 'NoneType' object is not subscriptable



Processing spec files:  10%|▉         | 64/670 [01:04<11:33,  1.14s/it]

Error processing VFDSXXCVC2301_VF36_Appendix_3_ICE_VTS_Vehicle Technical Specifications_21MAy21_v1:
Traceback (most recent call last):
  File "/tmp/ipykernel_45709/1004845501.py", line 33, in <module>
    doc.save_images(IMG_OUTPUT_DIR / fname)
  File "/tmp/ipykernel_45709/3289181672.py", line 53, in save_images
    for idx, page in enumerate(self.parsed_doc["pages"]):
                               ~~~~~~~~~~~~~~~^^^^^^^^^
TypeError: 'NoneType' object is not subscriptable



Processing spec files:  14%|█▎        | 92/670 [01:43<12:06,  1.26s/it]

Error processing VFDSXVCVC1801_VF35+VFe35_Appendix_8_Color and Material_Interior_Ver 9:
Traceback (most recent call last):
  File "/tmp/ipykernel_45709/1004845501.py", line 33, in <module>
    doc.save_images(IMG_OUTPUT_DIR / fname)
  File "/tmp/ipykernel_45709/3289181672.py", line 53, in save_images
    for idx, page in enumerate(self.parsed_doc["pages"]):
                               ~~~~~~~~~~~~~~~^^^^^^^^^
TypeError: 'NoneType' object is not subscriptable



Processing spec files:  27%|██▋       | 182/670 [15:18<1:21:45, 10.05s/it]

Error processing VFDSXVEE0048_BMW_FlexRay_0082_TEC_MC_FlexRay_IK-VINCE_v1.2:
Traceback (most recent call last):
  File "/tmp/ipykernel_45709/1004845501.py", line 33, in <module>
    doc.save_images(IMG_OUTPUT_DIR / fname)
  File "/tmp/ipykernel_45709/3289181672.py", line 53, in save_images
    for idx, page in enumerate(self.parsed_doc["pages"]):
                               ~~~~~~~~~~~~~~~^^^^^^^^^
TypeError: 'NoneType' object is not subscriptable



Processing spec files:  31%|███       | 205/670 [18:23<1:51:13, 14.35s/it]

Error processing CSUV_BEV_VTS_TA Version:
Traceback (most recent call last):
  File "/tmp/ipykernel_45709/1004845501.py", line 33, in <module>
    doc.save_images(IMG_OUTPUT_DIR / fname)
  File "/tmp/ipykernel_45709/3289181672.py", line 53, in save_images
    for idx, page in enumerate(self.parsed_doc["pages"]):
                               ~~~~~~~~~~~~~~~^^^^^^^^^
TypeError: 'NoneType' object is not subscriptable



Processing spec files:  33%|███▎      | 222/670 [21:13<1:05:53,  8.82s/it]