In [0]:
%pip install --upgrade "openai==1.105.0" "pymupdf>=1.24.0" pillow "pytesseract>=0.3.10" "jsonschema>=4.22.0"

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()

In [0]:
# ==== CONFIG ====
USE_OCR  = True           # Use OCR for scanned/blurry PDFs
OCR_DPI  = 600            # High DPI improves OCR quality
OCR_LANG = "eng"          # Add languages if needed, e.g. "eng+ara"
MODEL    = "gpt-4o-2024-08-06"

import os, json, re, io, base64
from typing import List, Dict, Optional

from openai import OpenAI
from jsonschema import Draft202012Validator

# --- API KEY (choose ONE of the two approaches) ---
try:
    # Enterprise workspace with Secrets
    OPENAI_API_KEY = dbutils.secrets.get(scope="kv", key="openai-api-key")
    os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
except Exception:
    # Community Edition (uncomment and paste your key)
    # os.environ["OPENAI_API_KEY"] = "sk-xxxxx"
    pass

client = OpenAI()

# ==== TARGET JSON SCHEMA (your required output) ====
structured_schema = {
    "type": "object",
    "additionalProperties": False,
    "properties": {
        "sections": {
            "type": "array",
            "items": {
                "type": "object",
                "additionalProperties": False,
                "properties": {
                    "section": {"type": "string"},  # Statement number e.g. "211"
                    "OpeningBalance": {"type": ["string","null"]},
                    "ClosingBalance": {"type": ["string","null"]},
                    "EndOfDayInformation": {"type": ["string","null"]},
                    "Transactions": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "additionalProperties": False,
                            "properties": {
                                "ValueDate":    {"type": ["string","null"]},
                                "BookedDate":   {"type": ["string","null"]},
                                "CounterParty": {"type": ["string","null"]},
                                "Amount":       {"type": ["string","null"]}
                            },
                            "required": ["ValueDate","BookedDate","CounterParty","Amount"]
                        }
                    },
                    "BankReference":   {"type": ["string","null"]},
                    "TransactionType": {"type": ["string","null"]}
                },
                "required": [
                    "section",
                    "OpeningBalance",
                    "ClosingBalance",
                    "EndOfDayInformation",
                    "Transactions",
                    "BankReference",
                    "TransactionType"
                ]
            }
        }
    },
    "required": ["sections"]
}
validator = Draft202012Validator(structured_schema)

# ==== PDF TEXT & OCR ====
import fitz  # PyMuPDF
from PIL import Image, ImageEnhance

def _norm_spaces(s: str) -> str:
    s = (s or "")
    s = s.replace("\u00A0"," ").replace("\u2009"," ")
    s = s.replace("\u2013","-").replace("\u2014","-").replace("\u2212","-")
    s = re.sub(r"[ \t]+", " ", s)
    s = re.sub(r"\r\n?","\n", s)
    return s.strip()

def get_text_native(path: str, max_pages=300) -> str:
    doc = fitz.open(path)
    parts=[]
    for i,p in enumerate(doc):
        if i>=max_pages: break
        parts.append(p.get_text("text", sort=True) or "")
    doc.close()
    return _norm_spaces("\n".join(parts))

def get_text_ocr(path: str, dpi=OCR_DPI, max_pages=300, lang=OCR_LANG) -> str:
    import pytesseract
    doc = fitz.open(path)
    parts=[]
    scale = dpi/72.0
    matrix = fitz.Matrix(scale, scale)
    for i,p in enumerate(doc):
        if i>=max_pages: break
        pix = p.get_pixmap(matrix=matrix, alpha=False)
        png = pix.tobytes("png")
        img = Image.open(io.BytesIO(png)).convert("L")
        img = ImageEnhance.Contrast(img).enhance(2.0)
        img = ImageEnhance.Sharpness(img).enhance(1.6)
        txt = pytesseract.image_to_string(img, lang=lang) or ""
        parts.append(txt)
    doc.close()
    return _norm_spaces("\n".join(parts))

def render_pages_as_data_urls(path: str, dpi=OCR_DPI, max_pages=60) -> List[str]:
    doc = fitz.open(path)
    urls=[]
    scale=dpi/72.0
    matrix=fitz.Matrix(scale,scale)
    for i,p in enumerate(doc):
        if i>=max_pages: break
        pix=p.get_pixmap(matrix=matrix, alpha=False)
        png=pix.tobytes("png")
        b64=base64.b64encode(png).decode("utf-8")
        urls.append("data:image/png;base64,"+b64)
    doc.close()
    return urls

# ==== Minimal seed for model context ====
def build_seed(_: str) -> dict:
    # Keep minimal; the system prompt enforces final structure
    return {"sections": []}

# ==== Post-processing to enforce CounterParty = null when blank ====
NULLISH_TOKENS = {"", "-", "â€”", "n/a", "na", "none", "null", "<undefined>", "<unknown>"}

def _null_if_blank(x):
    if x is None:
        return None
    s = str(x).strip()
    return None if s.lower() in NULLISH_TOKENS else (s if s else None)

def enforce_counterparty_none(doc: dict) -> dict:
    if not isinstance(doc, dict):
        return doc
    sections = doc.get("sections", [])
    for sec in sections:
        txns = sec.get("Transactions", [])
        for t in txns:
            for k in ("ValueDate", "BookedDate", "CounterParty", "Amount"):
                t[k] = _null_if_blank(t.get(k))
            if t.get("CounterParty") is None:
                t["CounterParty"] = None
    return doc

# ==== Strong system prompt: exact structure, keep duplicates ====
JSON_ONLY_SYSTEM = (
    "You are an OCR + bank statement extraction agent. "
    "You MUST NOT invent values. If a value is not present, set it to null. "
    "Return ONLY one minified JSON object that VALIDATES against this schema: "
    + json.dumps(structured_schema) + " "
    # IMPORTANT CHANGE: keep duplicates and multiple occurrences
    "Do NOT deduplicate or merge. If the same Statement Number (section) appears multiple times, "
    "create multiple section objects with the SAME 'section' value, in the exact order they appear in the PDF. "
    "If any label/value pair or transaction row repeats, include each instance as-is (no collapsing). "
    "Interpret 'section' as the Statement Number (e.g., '211', '210'). "
    "For each occurrence of a statement number, produce ONE object with keys: "
    "section, OpeningBalance, ClosingBalance, EndOfDayInformation, "
    "Transactions (array of {ValueDate,BookedDate,CounterParty,Amount}), "
    "BankReference, TransactionType. "
    "Use exactly the key names from the schema and do not include extra keys. "
    "If CounterParty is missing for a transaction, set CounterParty to null. "
    "Output JSON only (no commentary, no code fences)."
)

def _validate_json(s: str) -> dict:
    s = re.sub(r"^```(?:json)?\s*|\s*```$","",s.strip(), flags=re.I|re.DOTALL)
    obj = json.loads(s)
    validator.validate(obj)
    return obj

def gpt4o_extract(pdf_path: str, ocr_text: str, seed_doc: dict) -> dict:
    images = render_pages_as_data_urls(pdf_path, dpi=OCR_DPI)
    ocr_chunk  = ocr_text[:120000]
    seed_chunk = json.dumps(seed_doc)[:120000]

    messages = [{
        "role": "system",
        "content": JSON_ONLY_SYSTEM
    },{
        "role": "user",
        "content": (
            [{"type": "input_text", "text": "Below is noisy OCR/native text:\n" + ocr_chunk}] +
            [{"type": "input_text", "text": "Here is a minimal seed JSON:\n" + seed_chunk}] +
            [{"type": "input_image", "image_url": u} for u in images]
        )
    }]

    resp = client.responses.create(model=MODEL, input=messages, temperature=0)
    return _validate_json(resp.output_text)

# ==== Orchestrator (AI ONLY) ====
def extract_structured_pdf(pdf_path: str, use_ocr=USE_OCR, dpi=OCR_DPI, lang=OCR_LANG) -> dict:
    text = get_text_ocr(pdf_path, dpi=dpi, lang=lang) if use_ocr else get_text_native(pdf_path)
    seed = build_seed(text)
    doc  = gpt4o_extract(pdf_path, text, seed)
    doc  = enforce_counterparty_none(doc)  # ensure CounterParty is null when blank
    return doc

# ==== Example run (update path) ====
# pdf_path = "/mnt/data/YourStatement.pdf"
# result = extract_structured_pdf(pdf_path, use_ocr=True, dpi=600)
# print(json.dumps(result, indent=2, ensure_ascii=False))


In [0]:
pdf_path = "/Volumes/workspace/default/pdfvolume/IngStmtPdfWork.pdf"  # change if needed

# ---------- Extract the structured data using pipelien ------------------
doc = extract_structured_pdf(pdf_path, use_ocr=True, dpi=600, lang="eng")
display(doc)
#print(json.dumps(doc, indent=2))

{'sections': [{'section': '211',
   'OpeningBalance': '30/07/25 0,00 AED',
   'ClosingBalance': '30/07/25 0,00 AED',
   'EndOfDayInformation': 'Closing available balance 0,00 AED',
   'Transactions': [{'ValueDate': None,
     'BookedDate': None,
     'CounterParty': None,
     'Amount': '0,00 AED'}],
   'BankReference': '303000 / 3013086001869',
   'TransactionType': '30 Miscellaneous transactions'},
  {'section': '209',
   'OpeningBalance': '28/07/25 0,00 AED',
   'ClosingBalance': '28/07/25 0,00 AED',
   'EndOfDayInformation': 'Closing available balance 0,00 AED',
   'Transactions': [{'ValueDate': '28/07/25',
     'BookedDate': '28/07/25',
     'CounterParty': None,
     'Amount': '0,00 AED'}],
   'BankReference': '30390 / 30129860058',
   'TransactionType': '30 Miscellaneous transactions'},
  {'section': '208',
   'OpeningBalance': '27/07/25 0,00 AED',
   'ClosingBalance': '27/07/25 0,00 AED',
   'EndOfDayInformation': 'Closing available balance 0,00 AED',
   'Transactions': [{'Valu