<a href="https://colab.research.google.com/github/ppasyani/GID-OCR-Project/blob/main/%5BVer_3_2%5D_Receipt_OCR_to_Spreadsheet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🧾 OCR-to-Spreadsheet

This notebook trains and evaluates a **spaCy Named Entity Recognition (NER)** model to convert  
noisy OCR-style receipt text into structured tabular data with the following fields:

- **time**  
- **product**  
- **quantity**  
- **price_per_pax**  
- **total_price**   

The workflow supports both **synthetic datasets** and **real OCR exports**, normalizes the text into a consistent schema, trains an NER model, evaluates accuracy, and finally produces a clean spreadsheet (CSV/XLSX).  

---

## 📑 Notebook Sections

1. **Setup**  
   Install dependencies, set up folders, and configure paths.

2. **Dataset (Synthetic or OCR)**  

   <details>
   <summary>🔹 Option 1 — Synthetic Receipts</summary>

   Run this if you want to **simulate OCR data** by generating a dummy dataset (`receipts_dummy_en.csv / .xlsx`).  
   Useful for experimenting or if no OCR output is available yet.  

   </details>

   <details>
   <summary>🔹 Option 2A — OCR Upload + Normalize</summary>

   Run this if you already have an **OCR export file (CSV/XLSX)** with a `raw_text` column.  
   You will upload the file directly from your computer into Colab.  

   ⚠️ Make sure to run the **OCR Normalization Helpers** cell first, since it defines the regex and parsing functions used here.

   </details>

   <details>
   <summary>🔹 Option 2B — OCR from Google Drive + Normalize</summary>

   Run this if your OCR files are already stored in a Google Drive folder.  
   This option supports multiple CSV/XLSX files at once.  

   ⚠️ Make sure to run the **OCR Normalization Helpers** cell first, since it defines the regex and parsing functions used here.

   </details>

3. **Preprocessing → NER Training Examples**  
   Align entity spans (`TIME`, `PRODUCT`, `QTY`, `PRICE_UNIT`, `PRICE_TOTAL`) and handle OCR noise.  

4. **Train/Validation Split**  
   Split annotated examples into training and dev sets.  

5. **Model Training (spaCy NER)**  
   Train and save the model (`last` and `best` checkpoints).  

6. **Evaluation**  
   - Entity-level metrics: Precision / Recall / F1  
   - Spreadsheet-style metrics: per-field accuracy & strict row accuracy  

7. **Inference → Structured Spreadsheet**  
   Apply the model to new receipts, export structured results to **CSV/XLSX** (either in Colab or directly to Google Drive).  

8. **(Optional) Post-processing**  
   Regex fallbacks, math-based repairs (`qty * unit = total`), and product lexicon matching.  

---

> 💡 **Tip:** Do not run both **Option 1** and **Option 2** at the same time. Pick one source, prepare the dataset, then continue to next step

## 1) Setup

In [None]:
# Install classic spaCy baseline
!pip install -U --force-reinstall \
  "numpy==1.26.4" \
  "pandas==2.2.2" \
  "scikit-learn==1.4.2" \
  "spacy==3.7.4" \
  "thinc==8.2.5" \
  "spacy-lookups-data==1.0.5" \
  "tqdm==4.66.4" \
  "openpyxl==3.1.2"

!python -m spacy download en_core_web_sm

In [None]:
# Add project setup block
import os, json, random, numpy as np, pandas as pd
import spacy

CFG = {
    "seed": 42,
    "labels": ["TIME","PRODUCT","QTY","PRICE_UNIT","PRICE_TOTAL"],
    "paths": {
        "data_dir": "/content/data",
        "artifacts_dir": "/content/artifacts",
        "models": {
            "best": "/content/artifacts/model_receipts_ner_best",
            "last": "/content/artifacts/model_receipts_ner_last"
        }
    }
}

# Reproducibility + dirs
random.seed(CFG["seed"]); np.random.seed(CFG["seed"])
os.makedirs(CFG["paths"]["data_dir"], exist_ok=True)
os.makedirs(CFG["paths"]["artifacts_dir"], exist_ok=True)

print("Setup OK")

In [None]:
# (Optional) Mount Drive
from google.colab import drive
drive.mount('/content/drive')

##2) Generate Dataset

There are two main ways for preparing the dataset.
Choose **only one** depending on your use case:

1. **Option 1 — Synthetic Receipts**  
   Run this if you want to simulate OCR data by generating a dummy dataset (receipts_dummy_en.csv / .xlsx). Useful for training, experimenting, or when you don’t have OCR output yet.

2. **Option 2 — OCR Import + Normalize**  
   Run this if you already have an OCR export file (CSV/XLSX) with a raw_text column. You can choose between 2A to upload file directly from computer and 2B to upload file stored in your google drive folder. Both 2A and 2B require you to run the OCR Normalization Helpers cell first.
---

> ⚠️ Only run one option (1, 2A, or 2B).
Once your dataset is ready, continue to step 3

In [11]:
# Option 1 : Synthetic Dataset

import random, re, os
from datetime import datetime, timedelta
import pandas as pd

# Use project seed
random.seed(CFG["seed"])

# Catalogs
products = [
    ("Chicken Noodle","food"), ("Beef Bowl","food"), ("Veggie Burger","food"),
    ("Iced Coffee","drink"), ("Hot Latte","drink"), ("Mineral Water","drink"),
    ("French Fries","food"), ("Grilled Salmon","food"), ("Caesar Salad","food"),
    ("Orange Juice","drink"), ("Chocolate Cake","dessert"), ("Cheesecake","dessert"),
    ("Fried Rice","food"), ("Fried Noodles","food"), ("Chicken Soup","food"),
    ("Sweet Iced Tea","drink"), ("Iced Orange","drink"),
]
stores = ["Simple Diner","Nusantara Cafe","Relaxed Restaurant","Bistro 88","Food Hall","Tasty Depot","Morning Canteen"]

def rand_time(start_date=datetime(2025,3,1), days=180):
    return start_date + timedelta(days=random.randint(0,days),
                                  hours=random.randint(7,21),
                                  minutes=random.randint(0,59))

def ocr_noise(s: str) -> str:
    """Inject tiny OCR-like noise for realism (kept small)."""
    s = s.replace("  ", " ")
    if random.random()<0.03: s = re.sub(r"\b0\b", "O", s)
    if random.random()<0.02: s = s.replace("O", "0")
    if random.random()<0.02: s = s.replace("l", "1")
    if random.random()<0.02: s = s.replace("I", "1")
    return s

def price_for(name):
    base = {
        "Chicken Noodle": 32000, "Beef Bowl": 52000, "Veggie Burger": 40000,
        "Iced Coffee": 22000, "Hot Latte": 28000, "Mineral Water": 8000,
        "French Fries": 18000, "Grilled Salmon": 89000, "Caesar Salad": 45000,
        "Orange Juice": 24000, "Chocolate Cake": 35000, "Cheesecake": 38000,
        "Fried Rice": 32000, "Fried Noodles": 28000, "Chicken Soup": 27000,
        "Sweet Iced Tea": 10000, "Iced Orange": 14000
    }
    jitter = random.randint(-2000,2000)
    return max(5000, base.get(name,30000) + jitter)

def make_line():
    dt = rand_time()
    store = random.choice(stores)
    item, _ = random.choice(products)
    qty = random.randint(1,5)
    ppu = price_for(item)
    total = qty * ppu

    patterns = [
        f"{dt:%Y-%m-%d %H:%M} {store} - {qty}x {item} @ ${ppu} = {total}",
        f"{store} | {dt:%d/%m/%Y %H:%M} | {item} x{qty} ${ppu}/pax Total ${total}",
        f"{dt:%d-%b-%Y %H:%M} {store} :: {qty} {item} @ ${ppu} -> {total}",
        f"{store} {dt:%Y/%m/%d %H:%M} {item} qty:{qty} price:{ppu} total:{total}",
        f"{dt:%m/%d/%Y %H:%M} - {store} - {qty} pax {item} - {ppu} each - total {total}",
        f"{store} {dt:%d-%m-%Y %H:%M} {item} Qty:{qty} Price:{ppu} Total:{total}",
        f"Date {dt:%d/%m/%Y} {store} {item} x {qty} @ ${ppu} TOTAL {total}",
        f"{store} • {dt:%Y.%m.%d %H:%M} • {item} • qty {qty} • unit {ppu} • grand total {total}",
    ]
    text = ocr_noise(random.choice(patterns))

    return {
        "raw_text": text,
        "time": dt.strftime('%Y-%m-%d %H:%M'),
        "product": item,
        "quantity": qty,
        "price_per_pax": ppu,
        "total_price": total,
        "store": store
    }

# Generate and save
N_SAMPLES = 500
df = pd.DataFrame([make_line() for _ in range(N_SAMPLES)]).astype({"raw_text":"string"})

DATA_CSV  = os.path.join(CFG["paths"]["data_dir"], "receipts_dummy_en_v2.csv")
DATA_XLSX = os.path.join(CFG["paths"]["data_dir"], "receipts_dummy_en_v2.xlsx")
df.to_csv(DATA_CSV, index=False)
try:
    df.to_excel(DATA_XLSX, index=False)
except Exception as e:
    print("Excel save skipped:", e)

print("Saved:", DATA_CSV, "| shape:", df.shape)
df.head(5)

Saved: /content/data/receipts_dummy_en_v2.csv | shape: (500, 7)


Unnamed: 0,raw_text,time,product,quantity,price_per_pax,total_price,store
0,11-Aug-2025 08:01 Tasty Depot :: 2 Caesar Sala...,2025-08-11 08:01,Caesar Salad,2,43914,87828,Tasty Depot
1,2025-06-17 07:01 Simple Diner - 2x French Frie...,2025-06-17 07:01,French Fries,2,18069,36138,Simple Diner
2,Date 23/06/2025 Morning Canteen Chicken Noodle...,2025-06-23 16:17,Chicken Noodle,2,32859,65718,Morning Canteen
3,2025-03-24 13:06 Relaxed Restaurant - 5x Chees...,2025-03-24 13:06,Cheesecake,5,37083,185415,Relaxed Restaurant
4,Tasty Depot | 20/07/2025 11:53 | Cheesecake x5...,2025-07-20 11:53,Cheesecake,5,36787,183935,Tasty Depot


> ⚠️ Important: Both 2A and 2B require you to run the OCR Normalization Helpers cell first.

In [18]:
# --- Shared Normalization Helpers ---
import re, pandas as pd
from typing import List, Dict, Optional

# Regex patterns
QTY_PATTERNS = [r"\bx\s*(\d{1,3})\b", r"\b(\d{1,3})\s*x\b",
                r"\bqty\s*[:=]?\s*(\d{1,3})\b", r"\b(\d{1,3})\s*pax\b"]
UNIT_PATTERNS = [r"@\s*\$?\s*(\d{3,8})\b", r"\$\s*(\d{3,8})\b",
                 r"\bprice\s*[:=]?\s*(\d{3,8})\b", r"\bunit\s*[:=]?\s*(\d{3,8})\b",
                 r"\b(\d{3,8})\s*each\b", r"\b(\d{3,8})\s*/\s*pax\b"]
ITEM_TOTAL_PATTERNS = [r"(?:=|->)\s*(\d{3,10})\b",
                       r"\btotal\s*[:=]?\s*\$?\s*(\d{3,10})\b",
                       r"\bTOTAL\s*[:=]?\s*\$?\s*(\d{3,10})\b"]
RECEIPT_TOTAL_PATTERNS = [r"\bgrand\s+total\s*[:=]?\s*\$?\s*(\d{3,10})\b",
                          r"\btotal\s*[:=]?\s*\$?\s*(\d{3,10})\b"]
TIME_PATTERNS = [r"\b\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}\b",
                 r"\b\d{2}/\d{2}/\d{4}\s+\d{2}:\d{2}\b",
                 r"\b\d{2}-[A-Za-z]{3}-\d{4}\s+\d{2}:\d{2}\b",
                 r"\b\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}\b"]

# Shared utils
def _find_first(text, patterns):
    for p in patterns:
        m = re.search(p, text, flags=re.I)
        if m:
            return m.group(1) if m.lastindex else m.group(0)
    return None

def _num(s):
    if not s: return None
    digits = re.sub(r"[^\d]", "", s)
    return int(digits) if digits.isdigit() else None

def split_into_lines_block(text: str):
    if "\n" in text:
        return [p.strip() for p in text.split("\n") if p.strip()]
    return [p.strip() for p in re.split(r"[•;|]", text) if p.strip()]

def clean_product(text: str) -> str:
    t = re.sub(r"(qty|price|unit|total|grand total|pax|each|@|\$|=|->)", " ", text, flags=re.I)
    t = re.sub(r"\d+", " ", t)
    t = re.sub(r"\s{2,}", " ", t).strip()
    return t.title()

def extract_time_from_text(text: str) -> str:
    return _find_first(text, TIME_PATTERNS) or ""

def parse_items_from_text(text: str, known_products: Optional[List[str]] = None) -> List[Dict]:
    lines = split_into_lines_block(text)
    items = []
    kp = [p.lower() for p in (known_products or [])]

    for line in lines:
        qty   = _num(_find_first(line, QTY_PATTERNS))
        unit  = _num(_find_first(line, UNIT_PATTERNS))
        total = _num(_find_first(line, ITEM_TOTAL_PATTERNS))

        if qty and unit and (not total or qty*unit != total): total = qty*unit
        elif qty and total and not unit and total % qty == 0: unit = total // qty
        elif unit and total and not qty and total % unit == 0: qty = total // unit

        product = ""
        if kp:
            low = line.lower()
            cands = [p for p in kp if p in low]
            if cands: product = max(cands, key=len)
        if not product:
            product = clean_product(line)

        if product or any(v is not None for v in (qty, unit, total)):
            items.append({"product": product, "quantity": qty,
                          "price_per_pax": unit, "total_price": total})
    return items

def normalize_input_to_table(df: pd.DataFrame,
                             text_col="raw_text",
                             receipt_id_col: Optional[str] = None,
                             known_products: Optional[list] = None) -> pd.DataFrame:
    rows = []
    if receipt_id_col and receipt_id_col in df.columns:  # Case C
        for rid, g in df.groupby(receipt_id_col):
            block = "\n".join(g[text_col].astype(str).tolist())
            time_txt = extract_time_from_text(block)
            items = parse_items_from_text(block, known_products)
            for it in items or [{"product":"", "quantity":None, "price_per_pax":None, "total_price":None}]:
                rows.append({"raw_text": block, "time": time_txt, **it, "store": ""})
    else:  # Case A/B/D
        for _, r in df.iterrows():
            block = str(r.get(text_col, "") or "")
            time_txt = r.get("time", None) or extract_time_from_text(block)
            items = parse_items_from_text(block, known_products)
            for it in items or [{"product":"", "quantity":None, "price_per_pax":None, "total_price":None}]:
                rows.append({"raw_text": block, "time": time_txt, **it, "store": ""})
    return pd.DataFrame(rows, columns=["raw_text","time","product","quantity","price_per_pax","total_price","store"])

In [None]:
# Option 2A : Upload OCR from Local File

from google.colab import files, drive
import os

uploaded = files.upload()
fname = list(uploaded.keys())[0]
ext = os.path.splitext(fname)[1].lower()

if ext == ".csv":
    df_raw = pd.read_csv(fname)
elif ext in [".xlsx",".xls"]:
    df_raw = pd.read_excel(fname)
else:
    raise ValueError("Unsupported file type")

df = normalize_input_to_table(df_raw, text_col="raw_text")
print("Normalized:", df.shape)
df.head()

**Option 2B — OCR from Google Drive**

If you already have OCR CSV/XLSX files stored in **Google Drive**, you can use this option.  

Steps:
1. Put your OCR files (CSV/XLSX) in a Drive folder.  
2. Mount Drive in Colab.  
3. Change the `DRIVE_DIR` path below to point to your folder.
4. Type number of file you want to run on the folder

> Example: `/content/drive/MyDrive/OCR_Text/Input`  
> (If you have a shared folder link, right-click → *Add shortcut to Drive* → then use the shortcut path under `MyDrive`.)

In [19]:
# Option 2B : Load OCR from Google Drive (Interactive picker) + Normalize
from google.colab import drive
import os, glob, pandas as pd, re

# 0) REQUIREMENT: run the "Normalization Helpers" cell first (normalize_input_to_table)

# 1) Mount Drive
drive.mount('/content/drive')

# 2) Point to your OCR folder in Drive (EDIT THIS)
DRIVE_DIR = "/content/drive/MyDrive/OCR_Text/Dataset"  # <-- change to your folder

# 3) Discover candidate files
all_paths = sorted(
    glob.glob(os.path.join(DRIVE_DIR, "*.csv")) +
    glob.glob(os.path.join(DRIVE_DIR, "*.xlsx")) +
    glob.glob(os.path.join(DRIVE_DIR, "*.xls"))
)

if not all_paths:
    raise FileNotFoundError(f"No CSV/XLSX files found under: {DRIVE_DIR}")

print("Found files:")
for i, p in enumerate(all_paths):
    print(f"[{i:02d}] {os.path.basename(p)}")

# 4) Choose subset interactively (e.g., "0,2,4-6"). Leave blank = ALL
def _parse_indices(s, n):
    s = s.strip()
    if not s:
        return list(range(n))
    out = set()
    for part in s.split(","):
        part = part.strip()
        if re.match(r"^\d+$", part):
            idx = int(part)
            if 0 <= idx < n: out.add(idx)
        elif re.match(r"^\d+-\d+$", part):
            a, b = map(int, part.split("-"))
            for x in range(min(a,b), max(a,b)+1):
                if 0 <= x < n: out.add(x)
    return sorted(out)

choice = input("\nEnter indices to load (e.g., 0,2,4-6) or press Enter for ALL: ")
idxs = _parse_indices(choice, len(all_paths))
paths = [all_paths[i] for i in idxs]

print(f"\nSelected {len(paths)} file(s):")
for p in paths: print(" -", os.path.basename(p))

# 5) Read + normalize selected files
def _read_any(path: str) -> pd.DataFrame:
    ext = os.path.splitext(path)[1].lower()
    if ext == ".csv":
        return pd.read_csv(path)
    elif ext in [".xlsx", ".xls"]:
        return pd.read_excel(path)
    else:
        raise ValueError(f"Unsupported file type: {ext} ({path})")

dfs_norm = []
for p in paths:
    df_raw = _read_any(p)
    # Auto-detect text column if not exactly "raw_text"
    lower_cols = {c.lower(): c for c in df_raw.columns}
    text_col = lower_cols.get("raw_text") or next(
        (lower_cols[c] for c in ["text","ocr_text","content","ocr","receipt_text"] if c in lower_cols),
        df_raw.select_dtypes(include="object").columns.tolist()[0]
    )
    print(f"Normalizing: {os.path.basename(p)}  |  text_col='{text_col}'")
    df_norm = normalize_input_to_table(df_raw, text_col=text_col)
    dfs_norm.append(df_norm)

df = pd.concat(dfs_norm, ignore_index=True)

# Ensure required columns order
wanted = ["raw_text","time","product","quantity","price_per_pax","total_price","store"]
for c in wanted:
    if c not in df.columns:
        df[c] = pd.NA
df = df[wanted]

# Save canonical dataset alongside synthetic pathing
DATA_CSV = f'{CFG["paths"]["data_dir"]}/receipts_dummy_en_from_drive_selected.csv'
df.to_csv(DATA_CSV, index=False)
print("\nOCR dataset (Drive) normalized & saved:", df.shape, "->", DATA_CSV)
df.head(10)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Found files:
[00] ocr_case_A_expanded.csv
[01] ocr_case_A_single_block.csv
[02] ocr_case_B_block_multiline.csv
[03] ocr_case_B_expanded.csv
[04] ocr_case_C_expanded.csv
[05] ocr_case_C_line_items_with_receipt_id.csv
[06] ocr_case_D_expanded.xlsx
[07] ocr_case_D_partial_columns.xlsx

Enter indices to load (e.g., 0,2,4-6) or press Enter for ALL: 02

Selected 1 file(s):
 - ocr_case_B_block_multiline.csv
Normalizing: ocr_case_B_block_multiline.csv  |  text_col='raw_text'

OCR dataset (Drive) normalized & saved: (8, 7) -> /content/data/receipts_dummy_en_from_drive_selected.csv


Unnamed: 0,raw_text,time,product,quantity,price_per_pax,total_price,store
0,Relaxed Restaurant 2025-05-30 19:22\nChicken ...,2025-05-30 19:22,Relaxed Restaurant - - :,,,,
1,Relaxed Restaurant 2025-05-30 19:22\nChicken ...,2025-05-30 19:22,Chicken Soup X,2.0,27000.0,54000.0,
2,Relaxed Restaurant 2025-05-30 19:22\nChicken ...,2025-05-30 19:22,French Fries : : :,1.0,18000.0,18000.0,
3,Relaxed Restaurant 2025-05-30 19:22\nChicken ...,2025-05-30 19:22,:,,,72000.0,
4,Food Hall 02/06/2025 12:10\nVeggie Burger x1 ...,02/06/2025 12:10,Food Hall / / :,,,,
5,Food Hall 02/06/2025 12:10\nVeggie Burger x1 ...,02/06/2025 12:10,Veggie Burger X,1.0,40000.0,40000.0,
6,Food Hall 02/06/2025 12:10\nVeggie Burger x1 ...,02/06/2025 12:10,Iced Orange X,2.0,14000.0,28000.0,
7,Food Hall 02/06/2025 12:10\nVeggie Burger x1 ...,02/06/2025 12:10,,,,68000.0,


## 3) Load Data

In this step, we load the dataset produced in Step 2.

*   If you ran Option 1 (Synthetic) → your file will look like receipts_dummy_en.csv or .xlsx.
*   If you ran Option 2A (Upload + Normalize) → your file may be saved as something like receipts_dummy_en_v3.csv.
*   If you ran Option 2B (Drive + Normalize) → your file will likely be receipts_dummy_en_from_drive.csv.


> 🔑 Make sure to point DATA_PATH to the exact file name that Step 2 saved.

In [21]:
# auto-detect Step 2 output

import os, glob
import pandas as pd

DATA_DIR = CFG["paths"]["data_dir"]

# (Optional) set this to force a specific file; leave as None to auto-detect
MANUAL_DATA_PATH = None  # e.g., f'{DATA_DIR}/receipts_dummy_en_from_drive.csv'

def _pick_latest(patterns):
    files = []
    for pat in patterns:
        files.extend(glob.glob(os.path.join(DATA_DIR, pat)))
    if not files:
        raise FileNotFoundError(
            f"No dataset found in {DATA_DIR}. "
            "Run Step 2 (Option 1/2A/2B) to create a dataset first."
        )
    return max(files, key=os.path.getmtime)  # newest by modification time

if MANUAL_DATA_PATH:
    DATA_PATH = MANUAL_DATA_PATH
else:
    # Look for any of the Step-2 outputs, newest wins
    DATA_PATH = _pick_latest([
        "receipts_dummy_en*.csv",
        "receipts_dummy_en*.xlsx",
    ])

ext = os.path.splitext(DATA_PATH)[1].lower()
if ext == ".csv":
    df = pd.read_csv(DATA_PATH, dtype={"raw_text": "string"})
elif ext in [".xlsx", ".xls"]:
    df = pd.read_excel(DATA_PATH)
    # ensure dtypes are friendly
    if "raw_text" in df.columns:
        df["raw_text"] = df["raw_text"].astype("string")
else:
    raise ValueError(f"Unsupported dataset type: {ext}")

print("Loaded dataset:", df.shape, "from", DATA_PATH)
df.head(5)

Loaded dataset: (8, 7) from /content/data/receipts_dummy_en_from_drive_selected.csv


Unnamed: 0,raw_text,time,product,quantity,price_per_pax,total_price,store
0,Relaxed Restaurant 2025-05-30 19:22 Chicken S...,2025-05-30 19:22,Relaxed Restaurant - - :,,,,
1,Relaxed Restaurant 2025-05-30 19:22 Chicken S...,2025-05-30 19:22,Chicken Soup X,2.0,27000.0,54000.0,
2,Relaxed Restaurant 2025-05-30 19:22 Chicken S...,2025-05-30 19:22,French Fries : : :,1.0,18000.0,18000.0,
3,Relaxed Restaurant 2025-05-30 19:22 Chicken S...,2025-05-30 19:22,:,,,72000.0,
4,Food Hall 02/06/2025 12:10 Veggie Burger x1 @...,02/06/2025 12:10,Food Hall / / :,,,,


## 4) Preprocess → Build NER annotations from labels

In [22]:
import re
from tqdm.auto import tqdm
from spacy.training.iob_utils import offsets_to_biluo_tags

nlp_tmp = spacy.blank("en")  # tokenizer only

def first_span(text, needle):
    m = re.search(re.escape(str(needle)), text)
    return (m.start(), m.end()) if m else None

def search_patterns_digits(text, patterns, flags=re.IGNORECASE):
    for pat in patterns:
        m = re.search(pat, text, flags)
        if m and m.lastindex:
            return (m.start(1), m.end(1))
    return None

def overlaps(a, b):
    if a is None or b is None: return False
    return not (a[1] <= b[0] or b[1] <= a[0])

def token_snap(doc, start, end):
    tok_starts = {t.idx for t in doc}
    tok_ends   = {t.idx + len(t) for t in doc}
    s = max(0, min(start, len(doc.text))); e = max(0, min(end, len(doc.text)))
    while s not in tok_starts and s > 0: s -= 1
    while e not in tok_ends and e < len(doc.text): e += 1
    return (s, e) if (s in tok_starts and e in tok_ends and s < e) else None

def build_ner_examples(df):
    examples = []; kept = dropped_ent = dropped_ex = 0
    for _, r in tqdm(df.iterrows(), total=len(df)):
        text = r["raw_text"]; doc = nlp_tmp.make_doc(text); ents = []

        # TIME (exact)
        time_sp = first_span(text, r["time"])
        if time_sp: ents.append((*time_sp, "TIME"))

        # PRODUCT (exact)
        prod_sp = first_span(text, r["product"])
        if prod_sp and not overlaps(prod_sp, time_sp):
            ents.append((*prod_sp, "PRODUCT"))

        # QTY (digits-only)
        q = r["quantity"]
        qty_pats = [rf"\bx\s*({q})\b", rf"\b({q})\s*x\b", rf"\bqty\s*[:=]?\s*({q})\b", rf"\b({q})\s*pax\b"]
        qty_sp = search_patterns_digits(text, qty_pats)
        if qty_sp and not overlaps(qty_sp, time_sp) and not overlaps(qty_sp, prod_sp):
            ents.append((*qty_sp, "QTY"))

        # PRICE_UNIT (digits-only)
        u = r["price_per_pax"]
        unit_pats = [
            rf"@\s*\$?\s*({u})\b", rf"\$\s*({u})\b", rf"\bprice\s*[:=]?\s*({u})\b",
            rf"\bunit\s*[:=]?\s*({u})\b", rf"\b({u})\s*each\b", rf"\b({u})\s*/\s*pax\b"
        ]
        unit_sp = search_patterns_digits(text, unit_pats)
        if unit_sp and all(not overlaps(unit_sp, sp) for sp in [time_sp, prod_sp, qty_sp]):
            ents.append((*unit_sp, "PRICE_UNIT"))

        # PRICE_TOTAL (digits-only)
        t = r["total_price"]
        total_pats = [
            rf"(?:=|->)\s*({t})\b",
            rf"\btotal\s*[:=]?\s*\$?\s*({t})\b",
            rf"\bgrand\s+total\s*[:=]?\s*\$?\s*({t})\b",
            rf"\bTOTAL\s*[:=]?\s*\$?\s*({t})\b",
        ]
        total_sp = search_patterns_digits(text, total_pats)
        if total_sp and all(not overlaps(total_sp, sp) for sp in [time_sp, prod_sp, qty_sp, unit_sp]):
            ents.append((*total_sp, "PRICE_TOTAL"))

        # snap + validate
        snapped = []
        for (s,e,lbl) in ents:
            se = token_snap(doc, s, e)
            if se: snapped.append((se[0], se[1], lbl))
        if snapped:
            biluo = offsets_to_biluo_tags(doc, [(s,e,l) for (s,e,l) in snapped])
            if "-" in biluo:
                filtered = []
                for (s,e,l) in snapped:
                    b = offsets_to_biluo_tags(doc, [(s,e,l)])
                    if "-" not in b: filtered.append((s,e,l))
                snapped = filtered

        if snapped:
            examples.append((text, {"entities": snapped})); kept += 1
        else:
            dropped_ex += 1

    print(f"Built {kept} examples. Dropped empty examples: {dropped_ex}")
    return examples

# Build examples now
examples = build_ner_examples(df)
len(examples)

  0%|          | 0/8 [00:00<?, ?it/s]

Built 8 examples. Dropped empty examples: 0


8

## 5) Train/Validation Split

In [14]:
# Train & Split

from sklearn.model_selection import train_test_split
from collections import Counter

# Dedup by raw_text
uniq = {}
for text, ann in examples:
    if text not in uniq:
        uniq[text] = ann
examples = [(t, uniq[t]) for t in uniq.keys()]

# Drop sparse (keep at least 2 ents)
def ent_count(ann): return len(ann.get("entities", []))
examples = [(t, a) for (t, a) in examples if ent_count(a) >= 2]

print("Label coverage:",
      dict(Counter(lbl for _, ann in examples for *_, lbl in ann["entities"])))

RANDOM_SEED = CFG["seed"]
train_data, dev_data = train_test_split(examples, test_size=0.2, random_state=RANDOM_SEED, shuffle=True)
print("Train size:", len(train_data), "Dev size:", len(dev_data))

# Persist dev_texts for consistent evaluation later
dev_texts_path = f'{CFG["paths"]["data_dir"]}/dev_texts.txt'
with open(dev_texts_path, "w", encoding="utf-8") as f:
    for t, _ in dev_data:
        f.write(t + "\n")
print("Saved:", dev_texts_path)

Label coverage: {'PRODUCT': 490, 'PRICE_UNIT': 500, 'PRICE_TOTAL': 494, 'TIME': 61, 'QTY': 432}
Train size: 400 Dev size: 100
Saved: /content/data/dev_texts.txt


## 6) Train spaCy NER

In [15]:
# Model Training (spaCy NER)
from spacy.util import minibatch, compounding
from spacy.training.example import Example

best_dir = CFG["paths"]["models"]["best"]
last_dir = CFG["paths"]["models"]["last"]
os.makedirs(best_dir, exist_ok=True)

# Load base English pipeline
nlp = spacy.load("en_core_web_sm")

# Ensure NER exists and add labels
if "ner" in nlp.pipe_names:
    ner = nlp.get_pipe("ner")
else:
    ner = nlp.add_pipe("ner", last=True)

for label in CFG["labels"]:
    if label not in ner.labels:
        ner.add_label(label)

def to_examples(nlp, data):
    exs = []
    for text, ann in data:
        exs.append(Example.from_dict(nlp.make_doc(text), ann))
    return exs

train_examples = to_examples(nlp, train_data)
dev_examples   = to_examples(nlp, dev_data)

optimizer = nlp.initialize(get_examples=lambda: train_examples)

n_iter = 30
dropout = 0.2
best_f1 = -1.0

for epoch in range(1, n_iter+1):
    losses = {}
    random.shuffle(train_examples)
    batches = minibatch(train_examples, size=compounding(16.0, 64.0, 1.2))
    with nlp.select_pipes(enable=["ner"]):
        for batch in batches:
            nlp.update(batch, sgd=optimizer, drop=dropout, losses=losses)

    scores = nlp.evaluate(dev_examples)
    p = scores.get("ents_p",0.0); r = scores.get("ents_r",0.0); f = scores.get("ents_f",0.0)
    print(f"Epoch {epoch:02d}/{n_iter}  Loss: {losses.get('ner',0):.2f}  P:{p:.3f} R:{r:.3f} F1:{f:.3f}")

    # Save last
    nlp.to_disk(last_dir)
    # Save best
    if f > best_f1:
        best_f1 = f
        nlp.to_disk(best_dir)

# Model card
with open(f'{CFG["paths"]["artifacts_dir"]}/model_card.txt', "w") as f:
    f.write(f"Labels: {CFG['labels']}\nBest F1: {best_f1:.3f}\n")

print(f"Best F1: {best_f1:.3f}")
print("Saved BEST ->", best_dir)
print("Saved LAST ->", last_dir)

  matches = self.matcher(doc, allow_missing=True, as_spans=False)


Epoch 01/30  Loss: 5346.88  P:0.000 R:0.000 F1:0.000
Epoch 02/30  Loss: 2544.03  P:0.857 R:0.015 F1:0.030
Epoch 03/30  Loss: 1525.04  P:0.768 R:0.568 F1:0.653
Epoch 04/30  Loss: 1027.94  P:0.905 R:0.694 F1:0.786
Epoch 05/30  Loss: 776.77  P:0.953 R:0.720 F1:0.820
Epoch 06/30  Loss: 160.80  P:0.716 R:0.740 F1:0.728
Epoch 07/30  Loss: 78.62  P:0.729 R:0.740 F1:0.734
Epoch 08/30  Loss: 24.76  P:0.729 R:0.740 F1:0.734
Epoch 09/30  Loss: 5.16  P:0.733 R:0.740 F1:0.736
Epoch 10/30  Loss: 5.79  P:0.893 R:0.740 F1:0.809
Epoch 11/30  Loss: 1.18  P:0.942 R:0.740 F1:0.829
Epoch 12/30  Loss: 0.34  P:0.825 R:0.740 F1:0.780
Epoch 13/30  Loss: 2.06  P:0.749 R:0.740 F1:0.745
Epoch 14/30  Loss: 0.02  P:0.749 R:0.740 F1:0.745
Epoch 15/30  Loss: 0.03  P:0.792 R:0.740 F1:0.765
Epoch 16/30  Loss: 0.22  P:0.788 R:0.740 F1:0.763
Epoch 17/30  Loss: 0.01  P:0.835 R:0.740 F1:0.784
Epoch 18/30  Loss: 1.15  P:0.849 R:0.740 F1:0.791
Epoch 19/30  Loss: 0.00  P:0.818 R:0.740 F1:0.777
Epoch 20/30  Loss: 0.17  P:0.769

## 7) Evaluation

In [16]:
# Evaluation
import re
from difflib import SequenceMatcher
from spacy.training.example import Example

MODEL_DIR = best_dir if os.path.isdir(best_dir) else last_dir
nlp_eval = spacy.load(MODEL_DIR)

# Entity-level metrics
eval_examples = [Example.from_dict(nlp_eval.make_doc(t), ann) for t, ann in dev_data]
scores = nlp_eval.evaluate(eval_examples)
print("Entity P:", round(scores.get("ents_p",0.0),3),
      "R:", round(scores.get("ents_r",0.0),3),
      "F1:", round(scores.get("ents_f",0.0),3))

# Spreadsheet strict/relaxed
def pick_first(ents, label):
    for e in ents:
        if e.label_ == label: return e.text
    return ""

def normalize_num(s): return re.sub(r"[^\d]", "", s or "")

# Build prediction df for the dev split only
dev_texts = [t for t, _ in dev_data]
dev_gt = df[df["raw_text"].isin(dev_texts)].drop_duplicates("raw_text", keep="first").reset_index(drop=True)

pred_rows = []
for _, row in dev_gt.iterrows():
    doc = nlp_eval(row["raw_text"])
    pred_rows.append({
        "raw_text": row["raw_text"],
        "time_pred": pick_first(doc.ents, "TIME"),
        "product_pred": pick_first(doc.ents, "PRODUCT"),
        "quantity_pred": normalize_num(pick_first(doc.ents, "QTY")),
        "price_per_pax_pred": normalize_num(pick_first(doc.ents, "PRICE_UNIT")),
        "total_price_pred": normalize_num(pick_first(doc.ents, "PRICE_TOTAL")),
    })
pred_df = pd.DataFrame(pred_rows)

merged = dev_gt[["raw_text","time","product","quantity","price_per_pax","total_price"]].merge(
    pred_df, on="raw_text", how="left"
)

# strict field accuracy
strict_field = {
    "time":         (merged["time"].astype(str) == merged["time_pred"].astype(str)).mean(),
    "product":      (merged["product"].astype(str) == merged["product_pred"].astype(str)).mean(),
    "quantity":     (merged["quantity"].astype(str) == merged["quantity_pred"].astype(str)).mean(),
    "price_per_pax":(merged["price_per_pax"].astype(str) == merged["price_per_pax_pred"].astype(str)).mean(),
    "total_price":  (merged["total_price"].astype(str) == merged["total_price_pred"].astype(str)).mean(),
}

# relaxed (time/product relaxed, numbers strict)
def time_relaxed(a,b): a=str(a or ""); b=str(b or ""); return (a in b) or (b in a)
def product_relaxed(a,b,th=0.85):
    a=(a or "").lower().strip(); b=(b or "").lower().strip()
    if not a or not b: return False
    if a in b or b in a: return True
    return SequenceMatcher(None,a,b).ratio() >= th

relaxed_field = {
    "time": merged.apply(lambda r: time_relaxed(r["time"], r["time_pred"]), axis=1).mean(),
    "product": merged.apply(lambda r: product_relaxed(r["product"], r["product_pred"]), axis=1).mean(),
    "quantity": strict_field["quantity"],
    "price_per_pax": strict_field["price_per_pax"],
    "total_price": strict_field["total_price"],
}

# row strict
row_strict = (
    (merged["time"].astype(str) == merged["time_pred"].astype(str)) &
    (merged["product"].astype(str) == merged["product_pred"].astype(str)) &
    (merged["quantity"].astype(str) == merged["quantity_pred"].astype(str)) &
    (merged["price_per_pax"].astype(str) == merged["price_per_pax_pred"].astype(str)) &
    (merged["total_price"].astype(str) == merged["total_price_pred"].astype(str))
).mean()

# row relaxed (>=4/5)
def row_relaxed_ok(r):
    m = 0
    m += int(time_relaxed(r["time"], r["time_pred"]))
    m += int(product_relaxed(r["product"], r["product_pred"]))
    m += int(str(r["quantity"]) == str(r["quantity_pred"]))
    m += int(str(r["price_per_pax"]) == str(r["price_per_pax_pred"]))
    m += int(str(r["total_price"]) == str(r["total_price_pred"]))
    return m >= 4

row_relaxed = merged.apply(row_relaxed_ok, axis=1).mean()

print("\nStrict field accuracy:", {k: round(v,3) for k,v in strict_field.items()})
print("Relaxed field accuracy:", {k: round(v,3) for k,v in relaxed_field.items()})
print("Row exact (strict):", round(row_strict,3))
print("Row relaxed (>=4/5):", round(row_relaxed,3))

# Save artifacts
metrics = {
  "ents_p": round(scores.get("ents_p",0.0),3),
  "ents_r": round(scores.get("ents_r",0.0),3),
  "ents_f": round(scores.get("ents_f",0.0),3),
  "strict_field": {k: round(v,3) for k,v in strict_field.items()},
  "relaxed_field": {k: round(v,3) for k,v in relaxed_field.items()},
  "row_strict": round(row_strict,3),
  "row_relaxed": round(row_relaxed,3),
}
with open(f'{CFG["paths"]["artifacts_dir"]}/metrics.json', "w") as f:
    json.dump(metrics, f, indent=2)
print("\nSaved metrics.json")

Entity P: 0.942 R: 0.74 F1: 0.829


  matches = self.matcher(doc, allow_missing=True, as_spans=False)



Strict field accuracy: {'time': 0.0, 'product': 0.08, 'quantity': 0.87, 'price_per_pax': 0.88, 'total_price': 0.98}
Relaxed field accuracy: {'time': 1.0, 'product': 0.13, 'quantity': 0.87, 'price_per_pax': 0.88, 'total_price': 0.98}
Row exact (strict): 0.0
Row relaxed (>=4/5): 0.77

Saved metrics.json


## 8) Inference to Spreadsheet

In this step, we apply the trained NER model to every receipt text and export the structured predictions. We add to validation flags such as **math_ok** to check if the total_price are calculated right and **complete_row** to check if all fields are present.


> ⚠️ Before running, check the output filenames (predicted_receipts.csv / .xlsx) and change them if you don’t want to overwrite previous results.

In [None]:
# Inference + Structured Spreadsheet
import os, re

MODEL_DIR = CFG["paths"]["models"]["best"] if os.path.isdir(CFG["paths"]["models"]["best"]) else CFG["paths"]["models"]["last"]
nlp_pred = spacy.load(MODEL_DIR)
KNOWN_PRODUCTS = set(df["product"].unique())

def num_only(s: str) -> str: return re.sub(r"[^\d]", "", s or "")
def to_int(s: str): s=num_only(s); return int(s) if s.isdigit() else None
def pick_first(ents, label):
    for e in ents:
        if e.label_ == label: return e.text
    return ""
def fallback_find(text: str, kind: str) -> str:
    if kind == "TIME":
        for pat in [r"\b\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}\b", r"\b\d{2}/\d{2}/\d{4}\s+\d{2}:\d{2}\b",
                    r"\b\d{2}-[A-Za-z]{3}-\d{4}\s+\d{2}:\d{2}\b", r"\b\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}\b"]:
            m = re.search(pat, text);
            if m: return m.group(0)
    if kind == "QTY":
        for pat in [r"\bx\s*(\d{1,3})\b", r"\b(\d{1,3})\s*x\b", r"\bqty\s*[:=]?\s*(\d{1,3})\b", r"\b(\d{1,3})\s*pax\b"]:
            m = re.search(pat, text, flags=re.I);
            if m: return m.group(1)
    if kind == "PRICE_UNIT":
        for pat in [r"@\s*\$?\s*(\d{3,8})\b", r"\$\s*(\d{3,8})\b", r"\bprice\s*[:=]?\s*(\d{3,8})\b", r"\bunit\s*[:=]?\s*(\d{3,8})\b",
                    r"\b(\d{3,8})\s*/\s*pax\b", r"\b(\d{3,8})\s*each\b"]:
            m = re.search(pat, text, flags=re.I);
            if m: return m.group(1)
    if kind == "PRICE_TOTAL":
        for pat in [r"(?:=|->)\s*(\d{3,10})\b", r"\bgrand\s+total\s*[:=]?\s*\$?\s*(\d{3,10})\b",
                    r"\btotal\s*[:=]?\s*\$?\s*(\d{3,10})\b", r"\bTOTAL\s*[:=]?\s*\$?\s*(\d{3,10})\b"]:
            m = re.search(pat, text, flags=re.I);
            if m: return m.group(1)
    return ""

def fallback_product(text: str) -> str:
    t = text.lower()
    cands = [p for p in KNOWN_PRODUCTS if p and p.lower() in t]
    return max(cands, key=len) if cands else ""

def extract_row(text: str) -> dict:
    doc = nlp_pred(text)
    time_txt  = pick_first(doc.ents, "TIME")  or fallback_find(text, "TIME")
    prod_txt  = pick_first(doc.ents, "PRODUCT") or fallback_product(text)
    qty_txt   = pick_first(doc.ents, "QTY")   or fallback_find(text, "QTY")
    unit_txt  = pick_first(doc.ents, "PRICE_UNIT")  or fallback_find(text, "PRICE_UNIT")
    total_txt = pick_first(doc.ents, "PRICE_TOTAL") or fallback_find(text, "PRICE_TOTAL")

    qty, unit, total = to_int(qty_txt), to_int(unit_txt), to_int(total_txt)
    # math repairs
    if qty is not None and unit is not None and (total is None or qty*unit != total):
        total = qty * unit
    elif qty is not None and total is not None and unit is None and total % qty == 0:
        unit = total // qty
    elif unit is not None and total is not None and qty is None and unit>0 and total % unit == 0 and (total // unit) <= 999:
        qty = total // unit

    return {
        "raw_text": text, "time": time_txt or "", "product": prod_txt or "",
        "quantity": qty, "price_per_pax": unit, "total_price": total,
        "math_ok": int(qty is not None and unit is not None and total is not None and qty*unit==total),
        "complete_row": int(all([bool(time_txt), bool(prod_txt), qty is not None, unit is not None, total is not None]))
    }

source_texts = df["raw_text"].tolist()
pred_rows = [extract_row(t) for t in source_texts]
pred_df = pd.DataFrame(pred_rows)

OUTPUT_DIR = "/content/drive/MyDrive/OCR_Text/Result"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Use this if you want to save it locally
#pred_csv = f'{CFG["paths"]["artifacts_dir"]}/predicted_receipts.csv'
#pred_xlsx = f'{CFG["paths"]["artifacts_dir"]}/predicted_receipts.xlsx'

# Use this if you want to save it to google drive folder
pred_csv  = os.path.join(OUTPUT_DIR, "predicted_receipts.csv")
pred_xlsx = os.path.join(OUTPUT_DIR, "predicted_receipts.xlsx")

pred_df.to_csv(pred_csv, index=False)
pred_df.to_excel(pred_xlsx, index=False)

print("Saved:", pred_csv, "and", pred_xlsx)
pred_df.head(5)

In [17]:
# Inference + Structured Spreadsheet  (DROP-IN REPLACEMENT)
import os, re, spacy
import pandas as pd

MODEL_DIR = CFG["paths"]["models"]["best"] if os.path.isdir(CFG["paths"]["models"]["best"]) else CFG["paths"]["models"]["last"]
nlp_pred = spacy.load(MODEL_DIR)
KNOWN_PRODUCTS = sorted(set(str(p).strip().title() for p in df["product"].dropna()))

# ---------- helpers ----------
def num_only(s: str) -> str: return re.sub(r"[^\d]", "", s or "")
def to_int(s: str):
    s = num_only(s);
    return int(s) if s.isdigit() else None

def pick_first(ents, label):
    for e in ents:
        if e.label_ == label:
            return e.text
    return ""

def _find_first(text, patterns, flags=re.I):
    for p in patterns:
        m = re.search(p, text, flags)
        if m:
            return m.group(1) if m.lastindex else m.group(0)
    return ""

def _digits(s):
    s = num_only(s);
    return int(s) if s.isdigit() else None

def window(text, center, size=60):
    if not center:
        return text
    i = text.lower().find(center.lower())
    if i < 0:
        return text
    return text[max(0, i-size): i+len(center)+size]

# ---------- Patch 1: robust TIME extraction ----------
from datetime import datetime
try:
    from dateutil.parser import parse as du_parse
    HAS_DATEUTIL = True
except Exception:
    HAS_DATEUTIL = False

def _norm_time_text(t: str) -> str:
    s = t.replace("•", " ").replace("|", " ")
    s = re.sub(r"[.\u2219]+", "-", s)    # 2025.08.24 -> 2025-08-24
    s = re.sub(r"\s+", " ", s).strip()
    return s

TIME_PATS_RICH = [
    r"\b\d{4}-\d{2}-\d{2}\s+\d{1,2}:\d{2}(?:\s*[AP]M)?\b",
    r"\b\d{4}/\d{2}/\d{2}\s+\d{1,2}:\d{2}(?:\s*[AP]M)?\b",
    r"\b\d{2}/\d{2}/\d{4}\s+\d{1,2}:\d{2}(?:\s*[AP]M)?\b",
    r"\b\d{2}-[A-Za-z]{3}-\d{4}\s+\d{1,2}:\d{2}(?:\s*[AP]M)?\b",
    r"\b\d{1,2}\s+[A-Za-z]{3,9}\s+\d{4}\s+\d{1,2}:\d{2}(?:\s*[AP]M)?\b",
    r"\b\d{4}\.\d{2}\.\d{2}\s+\d{1,2}:\d{2}(?:\s*[AP]M)?\b",
]

def _std_fmt(dt: datetime) -> str:
    return dt.strftime("%Y-%m-%d %H:%M")

def robust_find_time(text: str, dayfirst=False) -> str:
    if not text:
        return ""
    t = _norm_time_text(text)
    for pat in TIME_PATS_RICH:
        m = re.search(pat, t, flags=re.I)
        if m:
            snippet = m.group(0)
            try:
                if HAS_DATEUTIL:
                    dt = du_parse(snippet, fuzzy=True, dayfirst=dayfirst)
                else:
                    nums = re.findall(r"\d+", snippet)
                    if len(nums) >= 5:
                        y, mo, d, hh, mm = map(int, nums[:5])
                        dt = datetime(y, mo, d, hh, mm)
                    else:
                        continue
                return _std_fmt(dt)
            except Exception:
                pass
    if HAS_DATEUTIL:
        try:
            dt = du_parse(t, fuzzy=True, dayfirst=dayfirst)
            return _std_fmt(dt)
        except Exception:
            return ""
    return ""

# ---------- Patch 2: product guardrails ----------
try:
    from rapidfuzz import process, fuzz
    HAS_FUZZY = True
except Exception:
    HAS_FUZZY = False

YEAR_RE     = re.compile(r"^\s*(19|20)\d{2}\s*$")
DATEY_RE    = re.compile(r"^\s*\d{1,2}[/\-.][A-Za-z0-9]{2,9}[/\-.]\d{2,4}\s*$")
JUST_NUM_RE = re.compile(r"^\s*\d+\s*$")

def looks_like_date_or_year(s: str) -> bool:
    if not s: return False
    return bool(YEAR_RE.fullmatch(s)) or bool(DATEY_RE.fullmatch(s)) or bool(JUST_NUM_RE.fullmatch(s))

def product_fallback(text: str) -> str:
    low = text.lower()
    # 1) exact containment from lexicon
    hits = [p for p in KNOWN_PRODUCTS if p and p.lower() in low]
    if hits:
        return max(hits, key=len)
    # 2) fuzzy to lexicon
    if HAS_FUZZY and KNOWN_PRODUCTS:
        cand, score, _ = process.extractOne(text, KNOWN_PRODUCTS, scorer=fuzz.token_set_ratio)
        if score >= 85:
            return cand
    # 3) cleaned phrase
    tmp = re.sub(r"(qty|price|unit|total|grand total|pax|each|@|\$|=|->|Rp|IDR)", " ", text, flags=re.I)
    tmp = re.sub(r"\d+", " ", tmp)
    tmp = re.sub(r"\s{2,}", " ", tmp).strip().title()
    return tmp

# ---------- Patch 3: updated extractor ----------
def extract_row(text: str) -> dict:
    doc = nlp_pred(text)

    # TIME: model then robust regex (set dayfirst=True if your data is mostly DD/MM/YYYY)
    time_txt = pick_first(doc.ents, "TIME") or robust_find_time(text, dayfirst=False)

    # PRODUCT: model then validate; if bad, fallback
    prod_txt = pick_first(doc.ents, "PRODUCT")
    if not prod_txt or looks_like_date_or_year(prod_txt):
        prod_txt = product_fallback(text)

    # search numbers near the product mention first (helps avoid picking unrelated numbers)
    zone = window(text, prod_txt, size=60) if prod_txt else text

    qty_txt   = pick_first(doc.ents, "QTY") or _find_first(zone, [
        r"\bx\s*(\d{1,3})\b", r"\b(\d{1,3})\s*x\b", r"\bqty\s*[:=]?\s*(\d{1,3})\b", r"\b(\d{1,3})\s*pax\b"
    ])
    unit_txt  = pick_first(doc.ents, "PRICE_UNIT") or _find_first(zone, [
        r"@\s*\$?\s*(\d{3,8})\b", r"(?:Rp|IDR|\$)\s*(\d{3,8})\b",
        r"\b(?:price|unit)\s*[:=]?\s*(\d{3,8})\b", r"\b(\d{3,8})\s*each\b", r"\b(\d{3,8})\s*/\s*pax\b"
    ])
    total_txt = pick_first(doc.ents, "PRICE_TOTAL") or _find_first(zone, [
        r"(?:=|->)\s*(\d{3,10})\b",
        r"\b(?:item\s*)?total\s*[:=]?\s*(?:Rp|IDR|\$)?\s*(\d{3,10})\b",
        r"\bTOTAL\s*[:=]?\s*(?:Rp|IDR|\$)?\s*(\d{3,10})\b"
    ])

    qty, unit, total = _digits(qty_txt), _digits(unit_txt), _digits(total_txt)

    # math repairs / derivations
    if qty is not None and unit is not None and (total is None or qty*unit != total):
        total = qty * unit
    elif qty is not None and total is not None and unit is None and qty > 0 and total % qty == 0:
        unit = total // qty
    elif unit is not None and total is not None and qty is None and unit > 0 and total % unit == 0 and (total // unit) < 1000:
        qty = total // unit

    return {
        "raw_text": text,
        "time": time_txt or "",
        "product": prod_txt or "",
        "quantity": qty,
        "price_per_pax": unit,
        "total_price": total,
        "math_ok": int(qty is not None and unit is not None and total is not None and qty*unit==total),
        "complete_row": int(all([bool(time_txt), bool(prod_txt), qty is not None, unit is not None, total is not None]))
    }

# ---------- run inference ----------
source_texts = df["raw_text"].tolist()
pred_rows = [extract_row(t) for t in source_texts]
pred_df = pd.DataFrame(pred_rows)

# Save to Drive (your path)
OUTPUT_DIR = "/content/drive/MyDrive/OCR_Text/Result"
os.makedirs(OUTPUT_DIR, exist_ok=True)
pred_csv  = os.path.join(OUTPUT_DIR, "predicted_receipts_3.csv")
pred_xlsx = os.path.join(OUTPUT_DIR, "predicted_receipts_3.xlsx")
pred_df.to_csv(pred_csv, index=False)
pred_df.to_excel(pred_xlsx, index=False)

print("Saved:", pred_csv, "and", pred_xlsx)
pred_df.head(5)

  matches = self.matcher(doc, allow_missing=True, as_spans=False)


Saved: /content/drive/MyDrive/OCR_Text/Result/predicted_receipts_3.csv and /content/drive/MyDrive/OCR_Text/Result/predicted_receipts_3.xlsx


Unnamed: 0,raw_text,time,product,quantity,price_per_pax,total_price,math_ok,complete_row
0,11-Aug-2025 08:01 Tasty Depot :: 2 Caesar Sala...,2025-08-11 08:01,Caesar Salad,2,43914,87828,1,1
1,2025-06-17 07:01 Simple Diner - 2x French Frie...,2025-06-17 07:01,French Fries,2,2025,4050,1,1
2,Date 23/06/2025 Morning Canteen Chicken Noodle...,,Chicken Noodle,2,32859,65718,1,0
3,2025-03-24 13:06 Relaxed Restaurant - 5x Chees...,2025-03-24 13:06,Cheesecake,5,2025,10125,1,1
4,Tasty Depot | 20/07/2025 11:53 | Cheesecake x5...,2025-07-20 11:53,Cheesecake,5,36787,183935,1,1


## 9) (Optional) Post-processing rules

In [None]:
# Quick stats: completeness & math consistency + which fields fail most often
report = {
    "complete_rows_pct": round(float(pred_df["complete_row"].mean()), 3),
    "math_ok_pct": round(float(pred_df["math_ok"].mean()), 3),
    "n_rows": int(len(pred_df))
}

# Field missing rates (for debugging)
missing_rates = {
    "time_missing_pct": round(float((pred_df["time"].astype(str)=="").mean()), 3),
    "product_missing_pct": round(float((pred_df["product"].astype(str)=="").mean()), 3),
    "qty_missing_pct": round(float(pred_df["quantity"].isna().mean()), 3),
    "unit_missing_pct": round(float(pred_df["price_per_pax"].isna().mean()), 3),
    "total_missing_pct": round(float(pred_df["total_price"].isna().mean()), 3),
}

print("Report:", report)
print("Missing rates:", missing_rates)

with open(f'{CFG["paths"]["artifacts_dir"]}/postprocess_report.json', "w") as f:
    json.dump({"report": report, "missing_rates": missing_rates}, f, indent=2)

# Show a few incomplete rows to inspect patterns
incomplete = pred_df[(pred_df["complete_row"]==0) | (pred_df["math_ok"]==0)].head(10)
incomplete