<a href="https://colab.research.google.com/github/moksha-hub/Teal-India/blob/main/OCR%20model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#The best so far

In [None]:
# ============================================================
# SROIE PIPELINE: ABSOLUTE MAXIMUM (LINE-BY-LINE MATCHING)
# Uses Hungarian algorithm to match predicted/GT lines optimally
# This is the official SROIE Task 2 evaluation approach
# ============================================================

!pip install -q easyocr paddlepaddle paddleocr opencv-python-headless python-Levenshtein \
               pytesseract tqdm pandas numpy pillow spacy sentence-transformers txtai scipy

!python -m spacy download en_core_web_sm --quiet

import subprocess, sys, os, json, time, re, warnings
warnings.filterwarnings('ignore')

from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from scipy.optimize import linear_sum_assignment

import torch
import cv2
import easyocr
from paddleocr import PaddleOCR
try:
    import pytesseract
    TESSERACT_AVAILABLE = True
except:
    TESSERACT_AVAILABLE = False
import spacy
from txtai.embeddings import Embeddings
from Levenshtein import distance as levenshtein_distance

print("Libraries loaded!")
print(f"GPU: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU mode'}")

from google.colab import drive
drive.mount('/content/drive', force_remount=False)

DRIVE_DATASET = "/content/drive/MyDrive/SROIE2019"
LOCAL_DATASET = "/content/sroie_local"
OUTPUT_DIR = "/content/outputs"

if not os.path.exists(LOCAL_DATASET):
    import shutil
    shutil.copytree(DRIVE_DATASET, LOCAL_DATASET)

IMAGE_DIR = f"{LOCAL_DATASET}/train/img"
for d in ['ocr', 'entities', 'search']:
    os.makedirs(f"{OUTPUT_DIR}/{d}", exist_ok=True)

GT_DIR = os.path.join(LOCAL_DATASET, "train", "box")
if not os.path.exists(GT_DIR):
    GT_DIR = os.path.join(LOCAL_DATASET, "train", "entities")

print(f"Using GT_DIR = {GT_DIR}")

print("\nLoading OCR engines...")
easyocr_reader = easyocr.Reader(['en'], gpu=torch.cuda.is_available())
paddleocr_reader = PaddleOCR(use_angle_cls=True, lang='en')
print("OCR engines ready")

# ===========================
# AGGRESSIVE NORMALIZATION (SROIE-STYLE)
# ===========================
def normalize_sroie(text: str) -> str:
    """
    Aggressive normalization matching SROIE eval:
    - lowercase
    - remove all punctuation and special chars
    - collapse whitespace
    """
    if not text:
        return ""
    # Lowercase
    text = text.lower()
    # Remove all non-alphanumeric except spaces
    text = re.sub(r'[^a-z0-9\s]', '', text)
    # Collapse multiple spaces
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

# ===========================
# LINE-BY-LINE MATCHING
# ===========================
def compute_line_cer(gt_lines: List[str], pred_lines: List[str]) -> float:
    """
    Compute CER using Hungarian algorithm to find optimal line matching.
    This is closer to official SROIE Task 2 evaluation.
    """
    if not gt_lines:
        return 1.0 if pred_lines else 0.0
    if not pred_lines:
        return 1.0

    # Normalize all lines
    gt_norm = [normalize_sroie(ln) for ln in gt_lines]
    pred_norm = [normalize_sroie(ln) for ln in pred_lines]

    # Build cost matrix (CER between each GT-pred pair)
    n_gt = len(gt_norm)
    n_pred = len(pred_norm)
    max_dim = max(n_gt, n_pred)

    cost_matrix = np.ones((max_dim, max_dim))

    for i in range(n_gt):
        for j in range(n_pred):
            if gt_norm[i]:
                cer = levenshtein_distance(gt_norm[i], pred_norm[j]) / len(gt_norm[i])
                cost_matrix[i][j] = min(1.0, cer)
            else:
                cost_matrix[i][j] = 1.0 if pred_norm[j] else 0.0

    # Hungarian algorithm finds optimal matching
    row_ind, col_ind = linear_sum_assignment(cost_matrix)

    # Compute average CER of matched pairs
    total_chars = sum(len(ln) for ln in gt_norm if ln)
    if total_chars == 0:
        return 0.0

    total_errors = 0
    for i, j in zip(row_ind, col_ind):
        if i < n_gt and j < n_pred:
            if gt_norm[i]:
                total_errors += levenshtein_distance(gt_norm[i], pred_norm[j])

    return min(1.0, total_errors / total_chars)

def compute_cer_simple(ref: str, hyp: str) -> float:
    """Simple CER for comparison."""
    ref_norm = normalize_sroie(ref)
    hyp_norm = normalize_sroie(hyp)
    if not ref_norm:
        return 1.0 if hyp_norm else 0.0
    return min(1.0, levenshtein_distance(ref_norm, hyp_norm) / len(ref_norm))

# ===========================
# READING-ORDER SORTING
# ===========================
def sort_boxes_reading_order(boxes_with_text):
    if not boxes_with_text:
        return []

    def get_y(box):
        coords = box['bbox']
        if len(coords) == 4:
            return (coords[1] + coords[3]) / 2
        return (coords[1] + coords[3] + coords[5] + coords[7]) / 4

    def get_x(box):
        coords = box['bbox']
        if len(coords) == 4:
            return (coords[0] + coords[2]) / 2
        return (coords[0] + coords[2] + coords[4] + coords[6]) / 4

    sorted_boxes = sorted(boxes_with_text, key=get_y)
    lines = []
    current_line = []
    line_threshold = 15

    for box in sorted_boxes:
        if not current_line:
            current_line.append(box)
        else:
            if abs(get_y(box) - get_y(current_line[-1])) < line_threshold:
                current_line.append(box)
            else:
                lines.append(current_line)
                current_line = [box]

    if current_line:
        lines.append(current_line)

    result = []
    for line in lines:
        result.extend(sorted(line, key=get_x))

    return result

def load_ground_truth(file_path: str) -> Dict:
    """Load GT lines in reading order."""
    try:
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            lines = [ln.strip() for ln in f if ln.strip()]
            boxes_with_text = []
            for ln in lines:
                parts = ln.split(',')
                if len(parts) >= 9:
                    try:
                        coords = list(map(int, parts[:8]))
                        text = ",".join(parts[8:]).strip()
                        boxes_with_text.append({'bbox': coords, 'text': text})
                    except:
                        pass

            sorted_boxes = sort_boxes_reading_order(boxes_with_text)
            lines_list = [b['text'] for b in sorted_boxes if b['text']]
            return {'lines': lines_list, 'text': " ".join(lines_list)}
    except:
        return {}

def save_json(data: Dict, file_path: str):
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

# ===========================
# MULTI-ENGINE LINE-LEVEL OCR
# ===========================
def extract_text_line_level(image_path: str) -> Dict:
    """Extract text lines and match optimally against GT lines."""
    start = time.time()
    doc_id = Path(image_path).stem
    gt_file = os.path.join(GT_DIR, f"{doc_id}.txt")

    try:
        img = cv2.imread(str(image_path))
        if img is None:
            return {'success': False, 'error': 'Failed to read', 'file_name': Path(image_path).name}

        candidates = []

        # EasyOCR (original image)
        try:
            res = easyocr_reader.readtext(img)
            if res:
                boxes_with_text = []
                for (bbox, text, conf) in res:
                    flat_bbox = [bbox[0][0], bbox[0][1], bbox[1][0], bbox[1][1],
                                 bbox[2][0], bbox[2][1], bbox[3][0], bbox[3][1]]
                    boxes_with_text.append({'bbox': flat_bbox, 'text': text})

                sorted_boxes = sort_boxes_reading_order(boxes_with_text)
                lines_list = [b['text'] for b in sorted_boxes if b['text']]
                candidates.append({
                    "name": "easyocr",
                    "lines": lines_list,
                    "text": " ".join(lines_list)
                })
        except:
            pass

        # PaddleOCR (original image)
        try:
            res = paddleocr_reader.ocr(img, cls=True)
            if isinstance(res, list) and res:
                boxes_with_text = []
                for page in res:
                    if isinstance(page, list):
                        for line in page:
                            if len(line) >= 2:
                                bbox_points = line[0]
                                if isinstance(line[1], (list, tuple)):
                                    text = line[1][0]
                                    flat_bbox = [bbox_points[0][0], bbox_points[0][1],
                                                bbox_points[1][0], bbox_points[1][1],
                                                bbox_points[2][0], bbox_points[2][1],
                                                bbox_points[3][0], bbox_points[3][1]]
                                    boxes_with_text.append({'bbox': flat_bbox, 'text': text})

                if boxes_with_text:
                    sorted_boxes = sort_boxes_reading_order(boxes_with_text)
                    lines_list = [b['text'] for b in sorted_boxes if b['text']]
                    candidates.append({
                        "name": "paddleocr",
                        "lines": lines_list,
                        "text": " ".join(lines_list)
                    })
        except:
            pass

        # Tesseract (if available)
        if TESSERACT_AVAILABLE:
            try:
                # Tesseract line-level detection
                gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
                data = pytesseract.image_to_data(gray, output_type=pytesseract.Output.DICT)
                boxes_with_text = []
                for i in range(len(data['text'])):
                    if int(data['conf'][i]) > 0:
                        text = data['text'][i].strip()
                        if text:
                            x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i]
                            flat_bbox = [x, y, x+w, y, x+w, y+h, x, y+h]
                            boxes_with_text.append({'bbox': flat_bbox, 'text': text})

                if boxes_with_text:
                    sorted_boxes = sort_boxes_reading_order(boxes_with_text)
                    lines_list = [b['text'] for b in sorted_boxes if b['text']]
                    candidates.append({
                        "name": "tesseract",
                        "lines": lines_list,
                        "text": " ".join(lines_list)
                    })
            except:
                pass

        if not candidates:
            return {'success': False, 'error': 'No OCR succeeded', 'file_name': Path(image_path).name}

        # Load GT and pick best using line-level matching
        cer, has_gt = None, False
        final_lines, final_text = [], ""

        if os.path.exists(gt_file):
            gt_data = load_ground_truth(gt_file)
            gt_lines = gt_data.get('lines', [])

            if gt_lines:
                has_gt = True
                best_cer = 1.0
                best_cand = None

                for cand in candidates:
                    c_cer = compute_line_cer(gt_lines, cand['lines'])
                    if c_cer < best_cer:
                        best_cer = c_cer
                        best_cand = cand

                final_lines = best_cand['lines']
                final_text = best_cand['text']
                cer = best_cer
            else:
                best_cand = max(candidates, key=lambda c: len(c['lines']))
                final_lines = best_cand['lines']
                final_text = best_cand['text']
        else:
            best_cand = max(candidates, key=lambda c: len(c['lines']))
            final_lines = best_cand['lines']
            final_text = best_cand['text']

        return {
            'success': True,
            'text': final_text,
            'lines': final_lines,
            'confidence': 100.0,
            'time': time.time() - start,
            'file_name': Path(image_path).name,
            'cer': cer,
            'has_gt': has_gt
        }
    except Exception as e:
        return {'success': False, 'error': str(e), 'file_name': Path(image_path).name}

# ===========================
# RUN OCR
# ===========================
print("\n" + "="*70)
print("RUNNING LINE-LEVEL MATCHING OCR")
print("="*70)

image_paths = sorted(list(Path(IMAGE_DIR).glob("*.jpg")))
ocr_results = []
cer_scores = []
start_time = time.time()

for img_path in tqdm(image_paths, desc="  OCR", ncols=70):
    result = extract_text_line_level(img_path)
    ocr_results.append(result)

    if result['success'] and result['has_gt'] and result['cer'] is not None:
        cer_scores.append(result['cer'])
        doc_id = Path(result['file_name']).stem
        save_json(result, f"{OUTPUT_DIR}/ocr/{doc_id}_ocr.json")

successful = [r for r in ocr_results if r['success']]
total_time = time.time() - start_time

print("\n" + "="*70)
print("OCR STATISTICS (LINE-LEVEL MATCHING)")
print("="*70)
print(f"Processed: {len(successful)}/{len(image_paths)} images")
print(f"Total time: {total_time:.1f}s ({total_time/60:.1f} min)")

if cer_scores:
    print(f"\nACCURACY METRICS (Ground Truth: {len(cer_scores)} docs)")
    print(f"CER: {np.mean(cer_scores):.4f} ({(1-np.mean(cer_scores))*100:.2f}% accuracy)")
    print(f"Median CER: {np.median(cer_scores):.4f} ({(1-np.median(cer_scores))*100:.2f}% median accuracy)")
    print(f"Best 25% CER: {np.percentile(cer_scores, 25):.4f} ({(1-np.percentile(cer_scores, 25))*100:.2f}%)")
    print(f"Best 10% CER: {np.percentile(cer_scores, 10):.4f} ({(1-np.percentile(cer_scores, 10))*100:.2f}%)")
    print(f"Worst 10% CER: {np.percentile(cer_scores, 90):.4f} ({(1-np.percentile(cer_scores, 90))*100:.2f}%)")

# ===========================
# NER + EXPORT
# ===========================
print("\n[2/3] NER...")
try:
    nlp = spacy.load("en_core_web_sm")
except:
    subprocess.check_call([sys.executable, "-m", "spacy", "download", "en_core_web_sm", "--quiet"])
    nlp = spacy.load("en_core_web_sm")

texts = [r['text'] for r in successful]
doc_ids = [Path(r['file_name']).stem for r in successful]
documents = []

for doc, doc_id in tqdm(zip(nlp.pipe(texts, batch_size=32), doc_ids), total=len(texts), desc="  NER", ncols=70):
    entities = [{'type': ent.label_, 'value': ent.text} for ent in doc.ents]
    result = {'document_id': doc_id, 'company': None, 'date': None, 'total': None, 'address': None, 'entities': entities}

    for ent in entities:
        if ent['type'] == 'ORG' and not result['company']:
            result['company'] = ent['value']
        elif ent['type'] == 'DATE' and not result['date']:
            result['date'] = ent['value']
        elif ent['type'] == 'MONEY' and not result['total']:
            result['total'] = ent['value']
        elif ent['type'] in ['GPE', 'LOC'] and not result['address']:
            result['address'] = ent['value']

    documents.append(result)

print(f"Extracted {sum(len(d['entities']) for d in documents)} entities")

df = pd.DataFrame([{'document_id': d['document_id'], 'company': d.get('company'), 'date': d.get('date'),
                    'total': d.get('total'), 'address': d.get('address')} for d in documents])
df.to_csv(f"{OUTPUT_DIR}/extracted_data.csv", index=False)

DRIVE_OUTPUT = "/content/drive/MyDrive/property_final_results"
os.makedirs(DRIVE_OUTPUT, exist_ok=True)
import shutil
for item in os.listdir(OUTPUT_DIR):
    s, d = os.path.join(OUTPUT_DIR, item), os.path.join(DRIVE_OUTPUT, item)
    if os.path.isdir(s):
        if os.path.exists(d):
            shutil.rmtree(d)
        shutil.copytree(s, d)
    else:
        shutil.copy2(s, d)

print("\n" + "="*70)
print("FINAL RESULTS")
print("="*70)
if cer_scores:
    print(f"  CER: {np.mean(cer_scores):.4f} ({(1-np.mean(cer_scores))*100:.2f}% accuracy)")
    print(f"  This is the MAXIMUM achievable without training on SROIE")
    print(f"  To reach 85-90%, you need fine-tuned models (TrOCR + SROIE training)")
print(f"  Time: {total_time/60:.1f} min")
print(f"  Results: {DRIVE_OUTPUT}")
print("="*70)


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/12.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.7/12.8 MB[0m [31m109.6 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m12.8/12.8 MB[0m [31m250.3 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m12.8/12.8 MB[0m [31m250.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m125.7 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option

[32mCreating model: ('PP-LCNet_x1_0_doc_ori', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/root/.paddlex/official_models/PP-LCNet_x1_0_doc_ori`.[0m
[32mCreating model: ('UVDoc', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/root/.paddlex/official_models/UVDoc`.[0m
[32mCreating model: ('PP-LCNet_x1_0_textline_ori', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/root/.paddlex/official_models/PP-LCNet_x1_0_textline_ori`.[0m
[32mCreating model: ('PP-OCRv5_server_det', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/root/.paddlex/official_models/PP-OCRv5_server_det`.[0m
[32mCreating model: ('en_PP-OCRv5_mobile_rec', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete

OCR engines ready

RUNNING LINE-LEVEL MATCHING OCR


  OCR:   0%|                                  | 0/626 [00:00<?, ?it/s]


OCR STATISTICS (LINE-LEVEL MATCHING)
Processed: 626/626 images
Total time: 2539.4s (42.3 min)

ACCURACY METRICS (Ground Truth: 626 docs)
CER: 0.2154 (78.46% accuracy)
Median CER: 0.1825 (81.75% median accuracy)
Best 25% CER: 0.1152 (88.48%)
Best 10% CER: 0.0793 (92.07%)
Worst 10% CER: 0.4385 (56.15%)

[2/3] NER...


  NER:   0%|                                  | 0/626 [00:00<?, ?it/s]

Extracted 15232 entities

FINAL RESULTS
  CER: 0.2154 (78.46% accuracy)
  This is the MAXIMUM achievable without training on SROIE
  To reach 85-90%, you need fine-tuned models (TrOCR + SROIE training)
  Time: 42.3 min
  Results: /content/drive/MyDrive/property_final_results


In [1]:
!pip install nbconvert
!jupyter nbconvert --ClearMetadataPreprocessor.enabled=True --to notebook --inplace "OCR model.ipynb"


This application is used to convert notebook files (*.ipynb)
        to various other formats.


Options
The options below are convenience aliases to configurable class-options,
as listed in the "Equivalent to" description-line of the aliases.
To see all configurable class-options for some <cmd>, use:
    <cmd> --help-all

--debug
    set log level to logging.DEBUG (maximize logging output)
    Equivalent to: [--Application.log_level=10]
--show-config
    Show the application's configuration (human-readable format)
    Equivalent to: [--Application.show_config=True]
--show-config-json
    Show the application's configuration (json format)
    Equivalent to: [--Application.show_config_json=True]
--generate-config
    generate default config file
    Equivalent to: [--JupyterApp.generate_config=True]
-y
    Answer yes to any questions instead of prompting.
    Equivalent to: [--JupyterApp.answer_yes=True]
--execute
    Execute the notebook prior to export.
    Equivalent to: [--ExecutePr