<a href="https://colab.research.google.com/github/sssangeetha/OutamationAI_OCR_RAG_Automation/blob/main/AnalyzePixcelsFileandConvertToRawJsonFormat.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
#!/usr/bin/env python3
import os, sys, json, math, argparse, re, io
from dataclasses import dataclass, asdict
from typing import List, Tuple, Dict, Any, Optional

!pip install pymupdf
%pip install pytesseract
%pip install dateparser
import fitz
import cv2
import numpy as np
from PIL import Image
import pytesseract
import regex as rxx
import dateparser

# -----------------------
# Helpers & data classes
# -----------------------

@dataclass
class Token:
    text: str
    conf: float
    bbox: Tuple[int,int,int,int]  # x0,y0,x1,y1
    block_num: int
    par_num: int
    line_num: int
    word_num: int

@dataclass
class Field:
    value: str
    page: int
    bbox: Tuple[int,int,int,int]
    confidence: float

def clamp(v, lo, hi): return max(lo, min(hi, v))

def to_int_bbox(x, y, w, h):
    return (int(x), int(y), int(x+w), int(y+h))

def union_bbox(b1, b2):
    x0 = min(b1[0], b2[0]); y0 = min(b1[1], b2[1])
    x1 = max(b1[2], b2[2]); y1 = max(b1[3], b2[3])
    return (x0,y0,x1,y1)

def bbox_area(b):
    return max(0, b[2]-b[0]) * max(0, b[3]-b[1])

# common OCR confusions
CONFUSION_MAP = {
    "0":"O", "O":"0",
    "1":"I", "I":"1", "l":"1",
    "5":"S", "S":"5",
    "8":"B", "B":"8"
}

def clean_ocr_text(s: str) -> str:
    # basic normalization + confusion fixes for isolated tokens
    s2 = s.replace('\u2014','-').replace('\u2013','-').replace('\u00A0',' ')
    s2 = rxx.sub(r"[^\x20-\x7E]", "", s2)  # strip non-ascii controls (keep it simple)
    # Token-level swap when token is short (likely confused char)
    toks = s2.split()
    out = []
    for t in toks:
        if len(t) == 1 and t in CONFUSION_MAP:
            out.append(CONFUSION_MAP[t])
        else:
            out.append(t)
    return " ".join(out)

# ---------------
# Pre-processing
# ---------------
def preprocess_for_ocr(img_bgr: np.ndarray, max_deskew_angle: float=7.0) -> np.ndarray:
    gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)

    # denoise
    den = cv2.fastNlMeansDenoising(gray, h=15, templateWindowSize=7, searchWindowSize=21)

    # adaptive threshold
    thr = cv2.adaptiveThreshold(den, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                cv2.THRESH_BINARY, 35, 15)

    # morphological opening to remove small noise
    kernel = np.ones((2,2), np.uint8)
    opened = cv2.morphologyEx(thr, cv2.MORPH_OPEN, kernel, iterations=1)

    # deskew (estimate skew via minAreaRect of edges)
    edges = cv2.Canny(opened, 50, 150)
    coords = np.column_stack(np.where(edges > 0))
    if coords.size > 0:
        rect = cv2.minAreaRect(coords[:, ::-1])
        angle = rect[-1]
        if angle < -45:
            angle = 90 + angle
        # limit extreme rotations
        angle = float(clamp(angle, -max_deskew_angle, max_deskew_angle))
        (h, w) = opened.shape[:2]
        M = cv2.getRotationMatrix2D((w//2, h//2), angle, 1.0)
        deskewed = cv2.warpAffine(opened, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
    else:
        deskewed = opened

    return deskewed

# ---------------
# OCR
# ---------------
def ocr_page(img_bin: np.ndarray, psm:int=6) -> List[Token]:
    # Use image_to_data for tokens + bboxes
    config = f"--oem 3 --psm {psm}"
    data = pytesseract.image_to_data(img_bin, output_type=pytesseract.Output.DICT, config=config)
    tokens = []
    n = len(data["text"])
    for i in range(n):
        txt = data["text"][i]
        if txt is None or txt.strip() == "":
            continue
        conf = float(data["conf"][i]) if data["conf"][i] != '-1' else 0.0
        x, y, w, h = data["left"][i], data["top"][i], data["width"][i], data["height"][i]
        tokens.append(Token(
            text=clean_ocr_text(txt),
            conf=conf/100.0,
            bbox=to_int_bbox(x,y,w,h),
            block_num=int(data["block_num"][i]),
            par_num=int(data["par_num"][i]),
            line_num=int(data["line_num"][i]),
            word_num=int(data["word_num"][i]),
        ))
    return tokens

# -----------------------------
# Extraction Heuristics/Regex
# -----------------------------
def load_patterns(cfg_path: str) -> Dict[str, Any]:
    with open(cfg_path, "r") as f:
        return json.load(f)

def tokens_in_region(tokens: List[Token], x0,y0,x1,y1) -> List[Token]:
    out = []
    for t in tokens:
        bx0,by0,bx1,by1 = t.bbox
        if bx0 >= x0 and by0 >= y0 and bx1 <= x1 and by1 <= y1:
            out.append(t)
    return out

def find_title(tokens: List[Token], page_w: int, page_h: int, hints: List[str]) -> Optional[Field]:
    # Look at the top 25% of page for big all-caps words that match hints
    top_region = (0, 0, page_w, int(page_h*0.25))
    region = tokens_in_region(tokens, *top_region)
    text = " ".join([t.text for t in region])
    best = None
    for h in hints:
        # prioritize full word match
        pat = re.compile(rf"\b{re.escape(h)}\b", re.IGNORECASE)
        m = pat.search(text)
        if m:
            # approximate bbox from all tokens in region that match the hint
            hit_tokens = [t for t in region if h.split()[0].lower() in t.text.lower()]
            if hit_tokens:
                b = hit_tokens[0].bbox
                for t in hit_tokens[1:]:
                    b = union_bbox(b, t.bbox)
                best = Field(value=h.title(), page=1, bbox=b, confidence=0.9)
                break
    return best

def nearest_value_after_keyword(tokens: List[Token], keyword_list: List[str], window_tokens:int=12) -> Optional[Field]:
    # Find keyword; then take next N tokens that look like a name (ALL CAPS words) or a phrase ending by comma/newline
    for i,t in enumerate(tokens):
        for kw in keyword_list:
            if kw.lower() in t.text.lower():
                # scan ahead
                seq = tokens[i+1:i+1+window_tokens]
                # collect consecutive title-cased/upper tokens as a name
                name_toks = []
                for s in seq:
                    if re.match(r"^[A-Z][A-Z\-\.',]*$|^[A-Z][a-z\-\.',]+$", s.text) and len(s.text) > 1:
                        name_toks.append(s)
                    elif name_toks:
                        break
                if name_toks:
                    val = " ".join([s.text for s in name_toks])
                    b = name_toks[0].bbox
                    for s in name_toks[1:]:
                        b = union_bbox(b, s.bbox)
                    conf = np.mean([s.conf for s in name_toks]).item()
                    return Field(value=val, page=1, bbox=b, confidence=float(conf))
    return None

def find_currency(tokens: List[Token], currency_regex: str, hints: List[str]) -> Optional[Field]:
    joined = " ".join([t.text for t in tokens])
    m = re.search(currency_regex, joined)
    if m:
        # approximate bbox from participating tokens
        start_idx = None
        end_idx = None
        span_txt = m.group(0)
        # find tokens that cover the span
        for i,t in enumerate(tokens):
            if start_idx is None and span_txt.startswith(t.text):
                start_idx = i
            if start_idx is not None and ''.join([x.text for x in tokens[start_idx:i+1]]).startswith(span_txt.replace(" ", "")):
                end_idx = i
                break
        if start_idx is not None and end_idx is not None:
            bb = tokens[start_idx].bbox
            for s in tokens[start_idx+1:end_idx+1]:
                bb = union_bbox(bb, s.bbox)
            conf = np.mean([s.conf for s in tokens[start_idx:end_idx+1]]).item()
            return Field(value=span_txt, page=1, bbox=bb, confidence=float(conf))
    # fallback: look near hint keywords
    for i,t in enumerate(tokens):
        for kw in hints:
            if kw.lower() in t.text.lower():
                seq = tokens[i:i+15]
                joined2 = " ".join([x.text for x in seq])
                m2 = re.search(currency_regex, joined2)
                if m2:
                    idxs = list(range(i, min(i+15, len(tokens))))
                    bb = tokens[idxs[0]].bbox
                    for k in idxs[1:]:
                        bb = union_bbox(bb, tokens[k].bbox)
                    return Field(value=m2.group(0), page=1, bbox=bb, confidence=0.7)
    return None

def find_address(tokens: List[Token], addr_regex: str, hints: List[str]) -> Optional[Field]:
    joined = " ".join([t.text for t in tokens])
    m = re.search(addr_regex, joined, flags=re.IGNORECASE)
    if m:
        span = m.group(0)
        # crude bbox aggregation over tokens containing parts of span
        used = []
        for t in tokens:
            if any(part in t.text for part in re.split(r"\s+", span) if part):
                used.append(t)
        if used:
            bb = used[0].bbox
            for u in used[1:]:
                bb = union_bbox(bb, u.bbox)
            conf = np.mean([u.conf for u in used]).item()
            return Field(value=span, page=1, bbox=bb, confidence=float(conf))
    # keyword-based local search
    for i,t in enumerate(tokens):
        for kw in hints:
            if kw.lower() in t.text.lower():
                seq = tokens[i:i+25]
                txt = " ".join([x.text for x in seq])
                m2 = re.search(addr_regex, txt, flags=re.IGNORECASE)
                if m2:
                    span = m2.group(0)
                    used = seq
                    bb = used[0].bbox
                    for u in used[1:]:
                        bb = union_bbox(bb, u.bbox)
                    return Field(value=span, page=1, bbox=bb, confidence=0.65)
    return None

def find_dates(tokens: List[Token], date_regex: str, hints: List[str]) -> Dict[str, Field]:
    out = {}
    joined = " ".join([t.text for t in tokens])
    for m in re.finditer(date_regex, joined):
        raw = m.group(0)
        parsed = dateparser.parse(raw)
        if not parsed:
            continue
        # rough heuristic: first date near "DATED" is loan_date; first near "RECORDED" is recording_date
        idx = joined[:m.start()].count(" ")
        # nearest tokens window for bbox
        win = tokens[max(0, idx-5):min(len(tokens), idx+5)]
        bb = win[0].bbox
        for w in win[1:]:
            bb = union_bbox(bb, w.bbox)
        field = Field(value=raw, page=1, bbox=bb, confidence=0.7)
        # classify using neighborhood hints
        neigh = " ".join([w.text for w in win]).upper()
        if any(h in neigh for h in ["RECORDED", "RECORDING"]):
            if "recording_date" not in out:
                out["recording_date"] = field
        elif any(h in neigh for h in ["DATED", "DATE"]):
            if "loan_date" not in out:
                out["loan_date"] = field
        else:
            # fill first available
            out.setdefault("loan_date", field)
    return out

# -----------------------------
# Main pipeline
# -----------------------------
def pdf_to_images(pdf_path: str, dpi: int=300) -> List[np.ndarray]:
    doc = fitz.open(pdf_path)
    pages = []
    for i, page in enumerate(doc):
        mat = fitz.Matrix(dpi/72, dpi/72)
        pix = page.get_pixmap(matrix=mat, annots=False)
        img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
        if pix.n == 4:
            img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
        pages.append(img)
    return pages

def aggregate_tokens(tokens: List[Token]) -> Dict[str, Any]:
    return {
        "tokens": [{
            "text": t.text, "conf": round(t.conf*100,2), "bbox": list(t.bbox),
            "block_id": t.block_num, "line_id": t.line_num, "word_id": t.word_num
        } for t in tokens]
    }

def main(pdf_path: str, out_path: str = "out.json", patterns_path: str = "regex_patterns.json", save_artifacts: bool = False, psm: int = 6, max_deskew_angle: float = 7.0):
    cfg = load_patterns(patterns_path)
    pages = pdf_to_images(pdf_path)

    results = {
        "document_title": None,
        "borrower_name": [],
        "lender_name": [],
        "loan_amount": None,
        "property_address": None,
        "dates": {},
        "pages": []
    }

    os.makedirs("artifacts", exist_ok=True) if save_artifacts else None

    for pageno, bgr in enumerate(pages, start=1):
        bin_img = preprocess_for_ocr(bgr, max_deskew_angle=max_deskew_angle)
        tokens = ocr_page(bin_img, psm=psm)

        h, w = bin_img.shape[:2]

        if save_artifacts:
            cv2.imwrite(os.path.join("artifacts", f"page_{pageno:02d}_preprocessed.png"), bin_img)
            # save token table as CSV
            import csv
            with open(os.path.join("artifacts", f"ocr_page_{pageno:02d}.csv"), "w", newline="") as f:
                cw = csv.writer(f)
                cw.writerow(["text","conf","x0","y0","x1","y1","block","par","line","word"])
                for t in tokens:
                    cw.writerow([t.text, round(t.conf,3), *t.bbox, t.block_num, t.par_num, t.line_num, t.word_num])

        # title from top region of page 1 only
        if pageno == 1 and results["document_title"] is None:
            title = find_title(tokens, w, h, cfg["TITLE_HINTS"])
            if title:
                results["document_title"] = asdict(title)

        # borrower/lender (try each page until found at least once)
        if not results["borrower_name"]:
            b = nearest_value_after_keyword(tokens, cfg["BORROWER_HINTS"])
            if b:
                results["borrower_name"].append(asdict(b))
        if not results["lender_name"]:
            l = nearest_value_after_keyword(tokens, cfg["LENDER_HINTS"])
            if l:
                results["lender_name"].append(asdict(l))

        # loan amount
        if results["loan_amount"] is None:
            la = find_currency(tokens, cfg["CURRENCY_REGEX"], cfg["LOAN_AMOUNT_HINTS"])
            if la:
                results["loan_amount"] = asdict(la)

        # address
        if results["property_address"] is None:
            addr = find_address(tokens, cfg["ADDRESS_REGEX"], cfg["ADDRESS_HINTS"])
            if addr:
                results["property_address"] = asdict(addr)

        # dates
        if not results["dates"]:
            d = find_dates(tokens, cfg["DATE_REGEX"], cfg["DATE_HINTS"] + cfg["RECORDING_HINTS"])
            for k,v in d.items():
                results["dates"][k] = asdict(v)

        # store page tokens
        page_entry = {"page_number": pageno}
        page_entry.update(aggregate_tokens(tokens))
        results["pages"].append(page_entry)

    # Final sanity tweaks: ensure empties are explicit None / []
    results["document_title"] = results["document_title"]
    results["borrower_name"] = results["borrower_name"]
    results["lender_name"] = results["lender_name"]
    results["loan_amount"] = results["loan_amount"]
    results["property_address"] = results["property_address"]
    results["dates"] = results["dates"]

    with open(out_path, "w") as f:
        json.dump(results, f, indent=2)

    print(f"Saved {out_path}")
    if save_artifacts:
        print("Artifacts saved in ./artifacts")

# Call main with the path to your PDF file
main(pdf_path="/content/MTG_10009588.pdf")

Saved out.json


In [9]:
import json

patterns_path = "/content/regex_patterns.json"
with open(patterns_path, "r") as f:
    cfg = json.load(f)



original_address_regex = cfg.get("ADDRESS_REGEX", "")
fixed_address_regex = original_address_regex.replace(r"\-", r"\\-") # Replace `\-` with `\\-`

# If the issue is within a character class like `[\- ]`, try converting to `[- ]`
# fixed_address_regex = original_address_regex.replace("[\\- ]", "[- ]")


cfg["ADDRESS_REGEX"] = fixed_address_regex

with open(patterns_path, "w") as f:
    json.dump(cfg, f, indent=2)

print(f"Updated {patterns_path} with modified ADDRESS_REGEX.")

Updated /content/regex_patterns.json with modified ADDRESS_REGEX.
