<a href="https://colab.research.google.com/github/jagadeesh-usd/receipt-expense-tracker-cv/blob/jaga-dev/notebooks/06_Field_Extraction_Regex.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Automated Expense Extraction - Receipt Parsing Using YOLO and OCR
### Rule-Based Field Extraction (Regex)

### Objective
Apply **Regular Expressions (Regex)** and heuristic rules to the raw text generated by our Baseline OCR engines (EasyOCR & Tesseract). This step simulates a traditional "Template-Free" parsing approach to establish how well standard algorithms perform without Computer Vision localization.

### Methodology
1.  **Input:** JSON output files from Module 04 (EasyOCR) and Module 05 (Tesseract).
2.  **Extraction Logic:**
    * **Vendor:** Uses a naive heuristic (assuming the first non-generic line of text is the Vendor Name).
    * **Date:** scans for standard date patterns (e.g., `DD/MM/YYYY`, `YYYY-MM-DD`, `DD-Mon-YYYY`).
    * **Total Amount:** Searches for the largest number with two decimal places (standard currency regex).
3.  **Process:** Iterates through all test receipts and applies these rules to the unstructured text blob.
4.  **Output:** Structured CSV files (`processed/extracted/...`) ready for accuracy benchmarking.

#### Setup & Configuration

In [10]:
import json
import re
import csv
import os
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, List

In [11]:
# Check if running in Google Colab
if 'COLAB_GPU' in os.environ:
    # Mount Google Drive (for Colab)
    from google.colab import drive
    drive.mount('/content/drive')

    # Set DATA_PATH for Google Drive
    DATA_PATH = Path('/content/drive/MyDrive/data')
else:
    # Set DATA_PATH for local environment
    DATA_PATH = Path('../data')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [12]:
# CONFIGURATION - Choose which OCR engine(s) to process

# Set to True to process that OCR engine's results
PROCESS_EASYOCR = True      # Extract fields from EasyOCR results
PROCESS_TESSERACT = True    # Extract fields from Tesseract results

# Splits to process (train, test, or both)
SPLITS = ["train", "test"]

# Path Configuration

OCR_ENGINES = []
if PROCESS_EASYOCR:
    OCR_ENGINES.append({
        'name': 'EasyOCR',
        'input_dir': DATA_PATH / 'processed/ocr',
        'output_dir': DATA_PATH / 'processed/extracted/ocr'
    })

if PROCESS_TESSERACT:
    OCR_ENGINES.append({
        'name': 'Tesseract',
        'input_dir': DATA_PATH / 'processed/tesseract_ocr',
        'output_dir': DATA_PATH / 'processed/extracted/tesseract_ocr'
    })

# Verify configuration
print("\n" + "="*70)
print("FIELD EXTRACTION CONFIGURATION")
print("="*70)
print(f"Data Path: {DATA_PATH}")
print(f"\nOCR Engines to Process: {len(OCR_ENGINES)}")
for engine in OCR_ENGINES:
    print(f"  â€¢ {engine['name']}")
print(f"\nSplits: {', '.join(SPLITS)}")
print("="*70 + "\n")

if not OCR_ENGINES:
    print("WARNING: No OCR engines selected! Set PROCESS_EASYOCR or PROCESS_TESSERACT to True.")


FIELD EXTRACTION CONFIGURATION
Data Path: /content/drive/MyDrive/data

OCR Engines to Process: 2
  â€¢ EasyOCR
  â€¢ Tesseract

Splits: train, test



#### 1. Regex Patterns & Helper Functions

In [13]:
# REGEX PATTERNS

# Matches currency-like numbers (e.g., 1,234.56 or 12.50)
CURRENCY_NUM_RE = re.compile(r'([0-9]{1,3}(?:[,][0-9]{3})*(?:\.[0-9]{2})|[0-9]+(?:\.[0-9]{2}))')

# Gap-Tolerant Total Regex
# Matches "Total", "Amount", etc., followed by up to 25 chars of "noise", then the number
AMOUNT_LABEL_RE = re.compile(
    r'(?i)\b(total|amount|grand total|grand|balance|invoice total|amount due|nett|payable)'
    r'(?:[^0-9\n\-\+]{0,25})'  # Allow gap of up to 25 non-digit chars
    r'\s*([0-9,]+\.\d{2})'     # The Amount
)

# Blacklist for vendor guessing (common receipt words to ignore)
VENDOR_BLACKLIST = [
    "total", "subtotal", "amount", "gst", "tax", "invoice", "cash", "change",
    "tel", "fax", "receipt", "date", "time", "document", "table", "pax", "order",
    "thank", "welcome", "regards", "bill", "payment"
]

print("âœ“ Regex patterns loaded")

âœ“ Regex patterns loaded


In [14]:
# HELPER FUNCTIONS

def normalize_number_str(s: str) -> Optional[float]:
    """Turn common currency-like strings into float or None."""
    if not s:
        return None
    s = s.replace(',', '').strip()
    s = re.sub(r'[^0-9\.\-]', '', s)  # Remove currency symbols/letters
    try:
        return float(s)
    except Exception:
        return None


def normalize_date_ocr(date_str: str) -> str:
    """
    Heuristic cleanup for common OCR date errors before parsing.
    Example: 25/4212018 -> 25/04/2018 (separator glitch)
    """
    if not date_str:
        return ""
    s = date_str.strip()

    # Replace common letter-swaps for numbers
    s = s.replace('O', '0').replace('o', '0')
    s = s.replace('l', '1').replace('I', '1')

    # Broken separators in OCR (common in SROIE dataset)
    if "/42120" in s:
        s = s.replace("/42120", "/04/20")
    if "/420" in s:
        s = s.replace("/420", "/04/20")

    return s


def find_date_in_text(full_text: str) -> Optional[str]:
    """Finds date candidates and tries to parse them into YYYY-MM-DD."""
    if not full_text:
        return None

    # Pattern 1: Standard dates (25/04/2018, 2018-04-25, 25.04.18)
    date_pattern = re.compile(r'\b(\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4})\b')
    # Pattern 2: Corrupted OCR dates (e.g., 25/4212018 where separators failed)
    broken_pattern = re.compile(r'\b(\d{1,2}/\d{5,8})\b')

    # Gather all candidates
    candidates = date_pattern.findall(full_text) + broken_pattern.findall(full_text)

    for raw in candidates:
        clean = normalize_date_ocr(raw)
        # Try parsing various common receipt formats
        for fmt in ("%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y",
                    "%d/%m/%y", "%d-%m-%y", "%Y-%m-%d"):
            try:
                dt = datetime.strptime(clean, fmt)
                return dt.strftime("%Y-%m-%d")
            except ValueError:
                continue
    return None


def find_total_in_text(full_text: str) -> Optional[float]:
    """Finds total amount using Label-First strategy, then Fallback to last number."""
    if not full_text:
        return None

    # 1. Label-based search (Best accuracy)
    matches = list(AMOUNT_LABEL_RE.finditer(full_text))
    if matches:
        # Use the last labeled total found (usually the Grand Total at bottom)
        for m in reversed(matches):
            val = m.group(2)
            num = normalize_number_str(val)
            if num is not None:
                return num

    # 2. Fallback: Last currency-formatted number in text
    # (Useful if the word "Total" is missing or OCR failed on the label)
    nums = CURRENCY_NUM_RE.findall(full_text)
    if nums:
        val = nums[-1]
        return normalize_number_str(val)

    return None


def guess_vendor_from_lines(lines: List[str]) -> Optional[str]:
    """
    Generic heuristic to find vendor name.
    Returns the first line that looks like a name (has letters) and isn't blacklisted.
    """
    if not lines:
        return None

    # Check only the top header section (first 8 lines)
    for line in lines[:8]:
        line = line.strip()
        if len(line) < 3:
            continue  # Skip tiny noise

        low = line.lower()

        # Skip blacklisted generic words (Receipt, Tax Invoice, etc.)
        if any(b in low for b in VENDOR_BLACKLIST):
            continue

        # If line contains letters, it's likely the Vendor Name
        if any(c.isalpha() for c in line):
            return line

    return None


def extract_fields_from_ocr_result(ocr_result: dict) -> dict:
    """
    Main extraction function.
    Takes OCR result dict and returns extracted vendor, date, total.
    """
    full_text = (ocr_result.get("full_text") or "").strip()
    lines = ocr_result.get("lines") or []

    vendor = guess_vendor_from_lines(lines)
    date = find_date_in_text(full_text)
    total = find_total_in_text(full_text)

    return {
        "vendor": vendor,
        "date": date,
        "total": total,
    }


print("âœ“ Helper functions loaded")

âœ“ Helper functions loaded


#### 3. Field Extraction Pipeline

In [15]:
# EXTRACTION PIPELINE

def process_ocr_engine(engine_config: dict, splits: List[str]):
    """
    Process one OCR engine's results across multiple splits.

    Args:
        engine_config: Dict with 'name', 'input_dir', 'output_dir'
        splits: List of splits to process (e.g., ['train', 'test'])
    """
    engine_name = engine_config['name']
    input_dir = engine_config['input_dir']
    output_dir = engine_config['output_dir']

    print(f"\n{'='*70}")
    print(f"Processing: {engine_name}")
    print(f"{'='*70}")
    print(f"Input:  {input_dir}")
    print(f"Output: {output_dir}")

    # Create output directory
    output_dir.mkdir(parents=True, exist_ok=True)

    total_processed = 0

    for split in splits:
        src_dir = input_dir / split
        out_csv = output_dir / f"{split}_extracted.csv"

        if not src_dir.exists():
            print(f"\n {split}: Directory not found: {src_dir}")
            continue

        print(f"\nðŸ“‚ Processing {split}...")

        rows = []
        json_files = sorted(src_dir.glob("*.json"))

        if not json_files:
            print(f" No JSON files found in {src_dir}")
            continue

        # Process each OCR result
        for jfile in json_files:
            try:
                with open(jfile, "r", encoding="utf-8") as f:
                    ocr_result = json.load(f)
            except Exception as e:
                print(f" Failed to read {jfile.name}: {e}")
                continue

            # Extract fields
            extracted = extract_fields_from_ocr_result(ocr_result)

            rows.append({
                "image": jfile.stem,
                "vendor": extracted.get("vendor"),
                "date": extracted.get("date"),
                "total": extracted.get("total")
            })

        # Write to CSV
        with open(out_csv, "w", newline="", encoding="utf-8") as csvf:
            writer = csv.DictWriter(csvf, fieldnames=["image", "vendor", "date", "total"])
            writer.writeheader()
            for row in rows:
                writer.writerow(row)

        print(f"   âœ“ Saved: {out_csv}")
        print(f"   âœ“ Records: {len(rows)}")
        total_processed += len(rows)

    print(f"\n{'='*70}")
    print(f"{engine_name} Complete: {total_processed} total records processed")
    print(f"{'='*70}")


print("âœ“ Extraction pipeline ready")

âœ“ Extraction pipeline ready


#### 4. Run Extraction

In [16]:
# RUN EXTRACTION FOR ALL CONFIGURED ENGINES

if not OCR_ENGINES:
    print("No OCR engines configured. Please set PROCESS_EASYOCR or PROCESS_TESSERACT to True.")
else:
    print("\n" + "#"*70)
    print("# STARTING FIELD EXTRACTION")
    print("#"*70)

    for engine_config in OCR_ENGINES:
        try:
            process_ocr_engine(engine_config, SPLITS)
        except Exception as e:
            print(f"\nError processing {engine_config['name']}: {e}")
            import traceback
            traceback.print_exc()

    print("\n" + "#"*70)
    print("# EXTRACTION COMPLETE")
    print("#"*70)
    print(f"\n Processed {len(OCR_ENGINES)} OCR engine(s)")
    print(f" Output saved to: {DATA_PATH / 'processed/extracted'}")


######################################################################
# STARTING FIELD EXTRACTION
######################################################################

Processing: EasyOCR
Input:  /content/drive/MyDrive/data/processed/ocr
Output: /content/drive/MyDrive/data/processed/extracted/ocr

ðŸ“‚ Processing train...
   âœ“ Saved: /content/drive/MyDrive/data/processed/extracted/ocr/train_extracted.csv
   âœ“ Records: 626

ðŸ“‚ Processing test...
   âœ“ Saved: /content/drive/MyDrive/data/processed/extracted/ocr/test_extracted.csv
   âœ“ Records: 347

EasyOCR Complete: 973 total records processed

Processing: Tesseract
Input:  /content/drive/MyDrive/data/processed/tesseract_ocr
Output: /content/drive/MyDrive/data/processed/extracted/tesseract_ocr

ðŸ“‚ Processing train...
   âœ“ Saved: /content/drive/MyDrive/data/processed/extracted/tesseract_ocr/train_extracted.csv
   âœ“ Records: 626

ðŸ“‚ Processing test...
   âœ“ Saved: /content/drive/MyDrive/data/processed/extracted/tessera

#### 5. Summary & Verification

In [17]:
# SUMMARY

print("\n" + "="*70)
print("EXTRACTION SUMMARY")
print("="*70)

for engine_config in OCR_ENGINES:
    engine_name = engine_config['name']
    output_dir = engine_config['output_dir']

    print(f"\n{engine_name}:")

    for split in SPLITS:
        csv_file = output_dir / f"{split}_extracted.csv"

        if csv_file.exists():
            # Count rows
            with open(csv_file, 'r', encoding='utf-8') as f:
                row_count = sum(1 for line in f) - 1  # Subtract header

            print(f"  â€¢ {split}: {row_count} records â†’ {csv_file}")
        else:
            print(f"  â€¢ {split}: Not found")

print("\n" + "="*70)
print("\nField extraction complete!")


EXTRACTION SUMMARY

EasyOCR:
  â€¢ train: 626 records â†’ /content/drive/MyDrive/data/processed/extracted/ocr/train_extracted.csv
  â€¢ test: 347 records â†’ /content/drive/MyDrive/data/processed/extracted/ocr/test_extracted.csv

Tesseract:
  â€¢ train: 626 records â†’ /content/drive/MyDrive/data/processed/extracted/tesseract_ocr/train_extracted.csv
  â€¢ test: 347 records â†’ /content/drive/MyDrive/data/processed/extracted/tesseract_ocr/test_extracted.csv


Field extraction complete!


#### 6. Sample Output Preview  

In [19]:
# PREVIEW SAMPLE RESULTS

import pandas as pd

print("\n" + "="*70)
print("SAMPLE EXTRACTED FIELDS")
print("="*70)

for engine_config in OCR_ENGINES:
    engine_name = engine_config['name']
    output_dir = engine_config['output_dir']

    # Show sample from test set
    csv_file = output_dir / "test_extracted.csv"

    if csv_file.exists():
        print(f"\n{engine_name} - Test Set (first 5 records):")
        print("-" * 70)

        df = pd.read_csv(csv_file)
        print(df.head())

        # Statistics
        print(f"\nStatistics:")
        print(f"  Total records: {len(df)}")
        print(f"  Vendor extracted: {df['vendor'].notna().sum()} ({df['vendor'].notna().sum()/len(df)*100:.1f}%)")
        print(f"  Date extracted: {df['date'].notna().sum()} ({df['date'].notna().sum()/len(df)*100:.1f}%)")
        print(f"  Total extracted: {df['total'].notna().sum()} ({df['total'].notna().sum()/len(df)*100:.1f}%)")
    else:
        print(f"\n{engine_name}: No test results found")

print("\n" + "="*70)


SAMPLE EXTRACTED FIELDS

EasyOCR - Test Set (first 5 records):
----------------------------------------------------------------------
          image                vendor        date  total
0  X00016469670                   tan  2019-01-15  193.0
1  X00016469671                   tan  2019-01-02  170.0
2  X51005200931  PERNIAGAAN ZHENG HUI  2018-02-09  436.2
3  X51005230605  PEIROH BKT LANJAN SB  2018-02-01    6.0
4  X51005230616               4 psez.  2018-01-18   38.9

Statistics:
  Total records: 347
  Vendor extracted: 346 (99.7%)
  Date extracted: 233 (67.1%)
  Total extracted: 333 (96.0%)

Tesseract - Test Set (first 5 records):
----------------------------------------------------------------------
          image                               vendor        date   total
0  X00016469670                         tan chay yee         NaN  193.00
1  X00016469671                         tan chay yee  2019-01-02  170.00
2  X51005200931                               hore 2  2078-02-09 