## tesseract - doctr setup

In [1]:
# Tesseract OCR
print("Tesseract OCR setup...")
!apt-get update > /dev/null
!apt-get install -y tesseract-ocr tesseract-ocr-tur > /dev/null
print("Tesseract setup done.")

print("Python libraries")
!pip install "python-doctr[torch]" pytesseract tqdm -q
print("Library set up done.")

# other required dependencies
!pip install opencv-python-headless pytesseract numpy matplotlib
!sudo apt update
!apt-get install -y tesseract-ocr tesseract-ocr-tur

Tesseract OCR setup...
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Tesseract setup done.
Python libraries
[0mLibrary set up done.
Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading packa

# imports

In [2]:
import os
import pytesseract
import numpy as np
import pandas as pd
import json
import glob
import torch
import re
from sentence_transformers import SentenceTransformer, util
from typing import List, Dict
from pytesseract import image_to_data, Output
from tqdm.notebook import tqdm
from skimage.filters import threshold_sauvola
from scipy.ndimage import rotate
from difflib import get_close_matches
import matplotlib.pyplot as plt
from google.colab import files, drive  # For file uploads and Google Drive mounting
from google.colab.patches import cv2_imshow  # For displaying images in Colab
import cv2
# from skimage.filters import threshold_sauvola
from PIL import Image
# Import doctr for OCR
from doctr.io import DocumentFile
from doctr.models import ocr_predictor, db_resnet50, detection_predictor
import tempfile
import unicodedata
from collections import Counter
from sklearn.cluster import DBSCAN
print("Installed dependencies.")


Installed dependencies.


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#### **`load_doctr_model()`** loads the Doctr OCR model **(ocr_predictor)** with detection architecture **DB-ResNet50** and recognition architecture **CRNN-VGG16-BN**.
- The model is set to evaluation mode and returned for use in OCR tasks.

In [4]:
def load_doctr_model():
    """
    Load the Doctr OCR model.

    Returns:
        model (doctr.models.ocr_predictor): The loaded OCR predictor model in evaluation mode.
                                             Returns None if loading fails.
    """
    try:
        _device = 'cuda' if torch.cuda.is_available() else 'cpu'
        model = ocr_predictor(
            pretrained=True,
            det_arch='db_resnet50',
            reco_arch='crnn_vgg16_bn'
        ).to(_device)
        model.eval()  # Set model to evaluation mode
        print(f"Doctr OCR model loaded on {_device}.")
        return model
    except Exception as e:
        print(f"ERROR: Doctr model could not be loaded: {e}. OCR functionality will be limited.")
        return None

In [5]:
doctr_model = load_doctr_model()

Doctr OCR model loaded on cpu.


In [6]:
sem_model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-mpnet-base-v2")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [7]:
# keywords for the receipt detection
keywords_for_detection = [
    'TOPLAM', 'TUTAR', 'KDV', 'TL', 'FATURA', 'TARİH', 'SAAT', 'İŞLEM', 'MÜŞTERİ', 'BAŞARILI',
    'SATIŞ', 'TUTARI', 'YERMİNAL', 'GARANTİ', 'ONAY', 'MERSIS', 'KART', 'BANKA', 'NO', 'ADET',
    'ISLEM', 'PARAMETRE', 'YUKLEME', 'TERMİNAL', 'ISYERİ', 'TERMINAL', 'BATCH', 'GARANTI BBVA',
    'DENIZBANK', 'TOPKDV', '1,00 TL', 'SATIŞ TUTARI', 'İŞLEM TUTARI','IPTAL','İPTAL','ISYERI'
]

### This module below makes sure each receipt image is upright and deskewed before OCR (Doctr). It first fixes coarse rotation (0/90/180/270), then corrects small tilt, and optionally fine-refines alignment.

In [8]:
def rotate_image_precise(image, angle, background_color=(255, 255, 255)):
    """
    Why used:
      - Apply arbitrary-angle rotation WITHOUT cropping, so no text is lost.
      - Used for both coarse (0/90/180/270) and fine skew corrections before OCR.
    """
    if angle == 0: return image
    h, w = image.shape[:2]
    center = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    cos, sin = np.abs(M[0, 0]), np.abs(M[0, 1])
    nW, nH = int((h * sin) + (w * cos)), int((h * cos) + (w * sin))
    M[0, 2] += (nW / 2) - center[0]
    M[1, 2] += (nH / 2) - center[1]
    return cv2.warpAffine(image, M, (nW, nH), flags=cv2.INTER_CUBIC,
                          borderMode=cv2.BORDER_CONSTANT, borderValue=background_color)

def estimate_skew_hough(image):
    """
    Why used:
      - After coarse rotation, receipts may still be slightly tilted.
      - Detects small skew via Hough lines and returns a robust median angle.
    """
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    inverted = cv2.bitwise_not(gray)
    thresh = cv2.adaptiveThreshold(inverted, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                   cv2.THRESH_BINARY, 15, 2)
    dilated = cv2.dilate(thresh, np.ones((3, 3), np.uint8), iterations=1)
    edges = cv2.Canny(dilated, 50, 150)
    lines = cv2.HoughLinesP(edges, 1, np.pi / 180, 150,
                            minLineLength=100, maxLineGap=20)
    angles = []
    if lines is not None:
        for x1, y1, x2, y2 in lines[:, 0]:
            if x2 == x1: continue
            angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
            if -45 < angle < 45:
                angles.append(angle)
    return np.median(angles) if angles else 0

def projection_variance_score(img):
    """
    Why used:
      - Cheap numeric proxy for “how straight” text lines are.
      - Enables micro-refinement by comparing nearby angles (higher = better).
    """
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    _, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    inverted = cv2.bitwise_not(thresh)
    proj = np.sum(inverted, axis=1)
    return np.var(proj)

def compute_avg_conf(data):
    """
    Why used:
      - Orientation selection needs an OCR quality signal.
      - Average token confidence is a stable metric to compare rotations.
    """
    confs = []
    for txt, conf in zip(data['text'], data['conf']):
        try:
            val = float(conf)
            if txt.strip() and 0 < val < 100:
                confs.append(val)
        except:
            continue
    return np.mean(confs) if confs else 0

def count_keywords(text, keywords):
    """
    Why used:
      - Receipts contain domain-specific tokens (TOPLAM, ONAY, etc.).
      - Acts as a prior to break ties when OCR confidences are close.
    """
    text = text.upper()
    return sum(text.count(k.upper()) for k in keywords)

def find_best_orientation(image, keywords, lang='tur'):
    """
    Why used:
      - Doctr is rotation-sensitive; choose best of 0/90/180/270 FIRST.
      - Scores each angle by (keyword hits + OCR confidence), with a small angle penalty,
        then returns the top candidate to feed into skew correction.
    """
    best_img = image
    best_angle = 0
    best_score = -1
    best_conf = 0
    best_kw_count = 0

    angle_penalty = {
        0: 1.0,
        90: 0.85,
        180: 0.7,
        270: 0.85
    }

    for angle in [0, 90, 180, 270]:
        rotated = rotate_image_precise(image, angle)
        data = pytesseract.image_to_data(rotated, lang=lang, output_type=Output.DICT)
        conf = compute_avg_conf(data)
        text = " ".join(data['text'])
        kw_count = count_keywords(text, keywords)

        # Apply penalty to prefer natural upright when signals tie
        base_score = kw_count * 1000 + conf
        final_score = base_score * angle_penalty.get(angle, 0.7)

        if final_score > best_score:
            best_score = final_score
            best_img = rotated
            best_angle = angle
            best_conf = conf
            best_kw_count = kw_count

    return best_img, best_angle

def correct_orientation_with_skew(image, keywords=None, lang='tur+eng',
                                  skew_range=5, step=1.0, top_n_angles=3):
    """
    Why used:
      - Single entry point to produce a Doctr-ready image.
      - Pipeline: coarse rotation → small skew estimation → optional micro-refine.
      - Returns final image + total angle for audit/reproducibility.
    """
    # Step 1: Try 0°, 90°, 180°, 270°
    upright_img, base_angle = find_best_orientation(image, keywords, lang=lang)

    # Step 2: Estimate fine skew angle
    skew_angle = estimate_skew_hough(upright_img)
    corrected_img = rotate_image_precise(upright_img, skew_angle)

    # Step 3: Optionally refine skew using projection variance
    # print("Refining skew...")
    angles = np.arange(skew_angle - skew_range, skew_angle + skew_range + step, step)
    scored_angles = []
    for a in angles:
        r = rotate_image_precise(upright_img, a)
        score = projection_variance_score(r)
        scored_angles.append((a, score))

    top_angles = sorted(scored_angles, key=lambda x: -x[1])[:top_n_angles]
    best_conf = compute_avg_conf(pytesseract.image_to_data(corrected_img, lang=lang, output_type=Output.DICT))
    best_img = corrected_img
    best_angle = base_angle + skew_angle

    for a, _ in top_angles:
        rotated = rotate_image_precise(upright_img, a)
        data = pytesseract.image_to_data(rotated, lang=lang, output_type=Output.DICT)
        conf = compute_avg_conf(data)
        if conf > best_conf:
            best_conf = conf
            best_img = rotated
            best_angle = base_angle + a

    return best_img, best_angle

### `apply_light_clahe(image_bgr)`
**Why used:** Enhance text visibility and contrast without over-saturating the image, especially in low-contrast or faded receipts.  
**How it works:**
1. **Convert to LAB:** Separates luminance (L) from color channels (A, B).  
2. **CLAHE on L-channel:** Applies *Contrast Limited Adaptive Histogram Equalization* with a low clip limit (≤3.0) to subtly boost local contrast.  
3. **Merge & Convert:** Combines adjusted luminance with original color and converts back to BGR.  

**Returns:** A contrast-enhanced BGR image, ready for further preprocessing or OCR.

In [9]:
def apply_light_clahe(image_bgr):
    # Convert to LAB color space
    lab = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)

    # Apply CLAHE to the L-channel
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))  # clipLimit < 3.0 is "light"
    cl = clahe.apply(l)

    # Merge and convert back
    merged = cv2.merge((cl, a, b))
    enhanced = cv2.cvtColor(merged, cv2.COLOR_LAB2BGR)

    return enhanced

### `cluster_receipts_by_centroids` — Purpose

This function groups detected OCR words into **clusters** (likely representing individual receipts)  
based on their **center coordinates** in the image.  
It uses **DBSCAN** (Density-Based Spatial Clustering of Applications with Noise) to find  
groups of nearby words without needing to predefine the number of clusters.

#### Why this is needed
- A single image can contain **multiple receipts**.
- We want to group words that **belong to the same receipt**.
- DBSCAN is ideal because it:
  - Clusters based on **spatial proximity** (word positions).
  - Can handle **noise/outliers** (unwanted words outside receipts).
  - Does not require specifying the number of receipts beforehand.

#### How it works
1. Takes the **center points** of all OCR-detected words.
2. Converts `eps_frac` into an actual pixel distance using the image diagonal.
3. Runs DBSCAN to find clusters.
4. Returns each cluster as a list of word dictionaries.


In [10]:
def cluster_receipts_by_centroids(words, image_shape, eps_frac=0.05, min_samples=5):
    """
    words: list of dicts, each with 'center':(cx,cy), and 'box':(x1,y1,x2,y2)
    image_shape: (h,w) of the image
    Returns: List of clusters, each a list of word dicts
    """
    if not words:
        return []

    # build matrix of (cx, cy)
    X = np.array([w['center'] for w in words], dtype=float)
    # eps = fraction of image diagonal
    h, w = image_shape
    eps = np.hypot(w, h) * eps_frac

    db = DBSCAN(eps=eps, min_samples=min_samples).fit(X)
    labels = db.labels_

    clusters = []
    for lab in sorted(set(labels)):
        if lab < 0:  # noise
            continue
        idxs = np.where(labels == lab)[0]
        cluster = [words[i] for i in idxs]
        clusters.append(cluster)
    return clusters

### `compute_receipt_score` — Purpose

This function below calculates a **confidence score** to decide if a given cluster of OCR lines is likely to be a **receipt**.

#### Why this is needed
- After clustering OCR words into potential receipts, we still need to **validate** if a cluster is truly a receipt.
- A high keyword match alone may not be enough; receipts also have **amounts, dates, and times**.
- This scoring approach gives a **weighted evaluation** instead of a simple yes/no check.

#### How it works
1. **Combine all text** from the cluster into a single string.
2. **Normalize text** for consistent matching (e.g., removing accents, fixing OCR inconsistencies).
3. **Count keyword matches** from `keywords_for_detection`.
4. **Detect patterns** using regex
5. **Calculate weighted score**:
6. **Return results** with:
   - Score
   - Pattern presence booleans
   - Final `is_receipt` flag (score ≥ threshold).

In [31]:
# def compute_receipt_score(line_texts, keywords_for_detection, kw_weight=0.4,
#                           amt_weight=0.6, date_weight=0.5, time_weight=0.4,
#                           threshold=1.8):
#     """
#     line_texts: list[str] (one cluster's lines, already normalized if you want)
#     keywords_for_detection: list[str]
#     returns: dict with score, booleans, and is_receipt
#     """
#     # flatten text for keyword counting
#     full = " ".join(line_texts)
#     full_norm = normalize_ocr_text(full)

#     # keyword hits (count)
#     kw_hits = sum(full_norm.count(normalize_ocr_text(k)) for k in keywords_for_detection)

#     # regex signals
#     has_amount = bool(re.search(r'\d+[.,]\d{2}\s*(TL|TRY)?', full_norm))
#     has_date   = bool(re.search(r'\b\d{2}[./-]\d{2}[./-]\d{2,4}\b', full_norm))
#     has_time   = bool(re.search(r'\b\d{2}[:.]\d{2}(?::\d{2})?\b', full_norm))

#     # score (same scheme you used before)
#     score = (kw_weight * kw_hits +
#              amt_weight * has_amount +
#              date_weight * has_date +
#              time_weight * has_time)

#     return {
#         "score": round(score, 3),
#         "kw_hits": int(kw_hits),
#         "has_amount": bool(has_amount),
#         "has_date": bool(has_date),
#         "has_time": bool(has_time),
#         "is_receipt": score >= threshold,
#     }

In [35]:
def compute_receipt_score(
    line_texts,
    keywords_for_detection,
    kw_weight=0.4,
    amt_weight=0.6,
    date_weight=0.5,
    time_weight=0.4,
    threshold=1.8,
    return_debug=False,
    print_debug=False,
):
    """
    line_texts: list[str] (one cluster's lines)
    keywords_for_detection: list[str]
    Returns: dict with score, booleans, is_receipt, and (optionally) debug info.
    """
    import re

    # 1) Flatten + normalize once
    full = " ".join(line_texts or [])
    full_norm = normalize_ocr_text(full)

    # 2) Keyword hits (count) + which keywords matched
    norm_kws = [normalize_ocr_text(k) for k in (keywords_for_detection or [])]
    matched_kws = sorted({k for k in norm_kws if k and k in full_norm})
    kw_hits = sum(full_norm.count(k) for k in matched_kws)

    # 3) Regex signals (+ capture examples)
    amount_re = re.compile(r'\b\d+[.,]\d{2}\s*(?:TL|TRY)?\b')
    date_re   = re.compile(r'\b\d{2}[./-]\d{2}[./-]\d{2,4}\b')
    time_re   = re.compile(r'\b\d{2}[:.]\d{2}(?::\d{2})?\b')

    amount_matches = amount_re.findall(full_norm)
    date_matches   = date_re.findall(full_norm)
    time_matches   = time_re.findall(full_norm)

    has_amount = bool(amount_matches)
    has_date   = bool(date_matches)
    has_time   = bool(time_matches)

    # 4) Score (with visible component contributions)
    kw_contrib   = kw_weight  * kw_hits
    amt_contrib  = amt_weight * int(has_amount)
    date_contrib = date_weight* int(has_date)
    time_contrib = time_weight* int(has_time)

    score = kw_contrib + amt_contrib + date_contrib + time_contrib
    is_receipt = score >= threshold

    result = {
        "score": round(score, 3),
        "kw_hits": int(kw_hits),
        "has_amount": bool(has_amount),
        "has_date": bool(has_date),
        "has_time": bool(has_time),
        "is_receipt": is_receipt,
    }

    if return_debug:
        result["debug"] = {
            "text_preview": full_norm[:300],
            "matched_keywords": matched_kws,
            "amount_matches": amount_matches[:10],
            "date_matches": date_matches[:10],
            "time_matches": time_matches[:10],
            "weights": {
                "kw_weight": kw_weight,
                "amt_weight": amt_weight,
                "date_weight": date_weight,
                "time_weight": time_weight,
                "threshold": threshold,
            },
            "components": {
                "kw_contrib": round(kw_contrib, 3),
                "amt_contrib": round(amt_contrib, 3),
                "date_contrib": round(date_contrib, 3),
                "time_contrib": round(time_contrib, 3),
            },
        }

    if print_debug:
        print(
            f"[receipt_score] score={score:.3f} (kw={kw_contrib:.2f}, amt={amt_contrib:.2f}, "
            f"date={date_contrib:.2f}, time={time_contrib:.2f}) | "
            f"kw_hits={kw_hits} | amount={has_amount} date={has_date} time={has_time} | "
            f"thr={threshold} => is_receipt={is_receipt}"
        )
        if matched_kws:
            print("  matched_keywords:", matched_kws)
        if amount_matches or date_matches or time_matches:
            print("  samples:",
                  {"amount": amount_matches[:3], "date": date_matches[:3], "time": time_matches[:3]})

    return result


In [12]:
def adaptive_dbscan_params(words, H, W):
    # Image diagonal
    diag = (H**2 + W**2) ** 0.5
    n_words = len(words)

    if n_words == 0:
        return 0.08, 5  # fallback

    # Estimate spacing from word centers
    centers = np.array([w["center"] for w in words])
    if len(centers) > 1:
        from sklearn.neighbors import NearestNeighbors
        nbrs = NearestNeighbors(n_neighbors=2).fit(centers)
        distances, _ = nbrs.kneighbors(centers)
        avg_dist = np.mean(distances[:, 1])
    else:
        avg_dist = diag * 0.05

    # eps fraction based on spacing (clamp between 0.05 and 0.15)
    eps_frac = min(0.15, max(0.05, avg_dist / diag * 1.5))

    # min_samples based on total word count
    if n_words < 30:
        min_samples = 3
    elif n_words < 80:
        min_samples = 5
    else:
        min_samples = 8

    return eps_frac, min_samples


In [13]:
# 1) Keep your existing dict but lowercase all entries
RECEIPT_TYPE_KEYWORDS = {
    "gunsonu": [
        "günsonu raporu","günsonu işlemi","rapor başlangıcı","rapor sonu",
        "başarılı olarak tamamlanmıştır","gönderim raporu","genel toplam",
        "peşin iptal","peşin ipt","batch no"
    ],
    "satis": [
        "satış","satış tutarı","işlem tutarı","kredi kartı","onay numarası",
        "onay kodu","kart sahibine aittir","topkdv","toplam","1,00","1,00tl","i:","t:"
    ],
    "iptal": [
        "satış iptal","satış iptal tutarı","iptal","ref no","onay numarası",
        "tarih","1,00tl","kart sahibine aittir","i:","t:"
    ],
    "parametre": [
        "parametre yükleme","parametre","host","tid","mid","os ver",
        "başlangıç","bitiş","pos aktivasyon başarılı","key exchange başarılı"
    ],
    "detay_listesi": [
        "detay işlemler listesi","grup 1 işlem sayısı","grup başarılı"
    ]
}

AMOUNT_RE = re.compile(r'\b\d{1,4}[.,]\d{2}\s*(?:TL|TRY)?\b')
DATE_RE   = re.compile(r'\b\d{2}[./-]\d{2}[./-]\d{2,4}\b')
TIME_RE   = re.compile(r'\b\d{2}[:.]\d{2}(?::\d{2})?\b')

### `score_cluster_type` — Purpose

This function determines the **most likely receipt type** for a given cluster of OCR-extracted lines by combining **keyword-based** and **structural** pattern analysis.

#### Why this is needed
- A single image can contain **different receipt formats** (e.g., *satış*, *iptal*, *günsonu*, *parametre*).
- We need a **type classification step** so downstream processing can apply type-specific extraction rules.
- This approach uses **both text keywords** and **receipt-structure patterns** for higher accuracy.

#### How it works
1. **Text Normalization**
   - Converts all text to lowercase, removes accents, and collapses spaces for consistent matching.

2. **Keyword Hit Counting**
   - For each known receipt type (from `RECEIPT_TYPE_KEYWORDS_NORM`), counts how many of its normalized keywords appear in the cluster.

3. **Structural Pattern Boosts**
   - Checks for additional receipt-specific patterns:
     - Amounts (e.g., `1,00 TL`)
     - Dates
     - Approval codes ("Onay numarası")
     - Reference numbers
     - Special tags like `I:` or `T:` with digits.
   - Adds weighted boosts to *satış* and *iptal* types based on these patterns.

4. **Conflict Resolution Rules**
   - Penalizes *satış* score if "iptal" appears.
   - Penalizes *satış*, *iptal*, and *parametre* if *günsonu* signals dominate.
   - Penalizes *satış*, *iptal*, and *günsonu* if *parametre* signals dominate.

5. **Final Decision**
   - Selects the highest-scoring type as `best_type`.
   - Also lists all plausible `tags` within a score delta (`delta = 0.6`).
   - Returns:
     - `type` → most probable receipt type
     - `scores` → score per type
     - `confidence` → normalized confidence (0-1)
     - `tags` → all candidate types above the threshold

In [16]:
def normalize_ocr_text(text: str) -> str:
    # 1) Uppercase
    text = text.upper()
    # 2) Remove accents (İ -> I, Ş -> S, etc.)
    text = unicodedata.normalize("NFKD", text)
    text = "".join([c for c in text if not unicodedata.combining(c)])
    # 3) Replace multiple spaces with single
    text = re.sub(r"\s+", " ", text)
    # 4) Strip edges
    return text.strip()

In [17]:
def _norm(s: str) -> str:
    # your normalize_ocr_text: upper + strip accents + collapse spaces
    s = normalize_ocr_text(s)
    return s.lower()

# Build a normalized copy ONCE at import time
RECEIPT_TYPE_KEYWORDS_NORM = {
    t: [_norm(kw) for kw in kws]
    for t, kws in RECEIPT_TYPE_KEYWORDS.items()
}

I_COLON_RE = re.compile(r'\b[i1]\s*:\s*\d+')   # I:/1: followed by digits
T_COLON_RE = re.compile(r'\b[t7]\s*:\s*\d+')   # T:/7: followed by digits
REFNO_RE   = re.compile(r'\bref(?:erans)?\s*no\b')  # avoid bare 'ref'
ONE_LIRA_RE = re.compile(r'\b1[.,]00\s*tl?\b')      # 1,00 / 1.00 TL

In [18]:
def score_cluster_type(line_texts):
    full = _norm(" ".join(line_texts))

    # 1) Keyword hits per type (normalize keywords too)
    base_scores, per_type_hits = {}, {}
    for t, kws in RECEIPT_TYPE_KEYWORDS_NORM.items():
        hits = sum(1 for kw in kws if kw in full)  # each kw max 1
        per_type_hits[t] = hits
        base_scores[t] = float(hits)

    # 2) Structural boosts
    has_amt  = bool(AMOUNT_RE.search(full)) or bool(ONE_LIRA_RE.search(full))
    has_date = bool(DATE_RE.search(full))
    has_time = bool(TIME_RE.search(full))
    has_onay = ("onay numarasi" in full) or ("onay kodu" in full) or ("onay" in full)
    has_ref  = bool(REFNO_RE.search(full))

    # i:/t: only count if they actually carry a number (reduces noise)
    has_i_tag = bool(I_COLON_RE.search(full))
    has_t_tag = bool(T_COLON_RE.search(full))

    w_amt, w_date, w_time, w_onay, w_ref, w_tags = 0.7, 0.4, 0.3, 0.5, 0.4, 0.25
    struct_score = (w_amt*has_amt + w_date*has_date + w_time*has_time +
                    w_onay*has_onay + w_ref*has_ref +
                    w_tags*(has_i_tag or has_t_tag))

    scores = base_scores.copy()
    for t in ("satis", "iptal"):
        scores[t] += struct_score

    # 4) Conflict rules
    if "iptal" in full or "satis iptal" in full:
        scores["satis"] -= 0.8

    if per_type_hits.get("gunsonu", 0) >= 2:
        scores["satis"] -= 0.5; scores["iptal"] -= 0.5; scores["parametre"] -= 0.3

    if per_type_hits.get("parametre", 0) >= 2:
        scores["satis"] -= 0.5; scores["iptal"] -= 0.5; scores["gunsonu"] -= 0.3

    # 5) Decide
    best_type = max(scores, key=scores.get)
    best_score = scores[best_type]
    delta = 0.6
    tags = [t for t, sc in scores.items() if sc >= best_score - delta and sc > 0]

    confidence = max(0.0, min(1.0, best_score / 4.0))

    return {
        "type": best_type if best_score > 0 else None,
        "scores": scores,
        "confidence": round(confidence, 3),
        "tags": sorted(tags, key=lambda t: -scores[t]),
    }

In [19]:
def group_boxes_into_lines(boxes, y_threshold=20):
    """Groups word-boxes into lines by vertical proximity."""
    if not boxes:
        return []
    sorted_boxes = sorted(boxes, key=lambda b: b["center"][1])
    lines, current = [], [sorted_boxes[0]]
    for b in sorted_boxes[1:]:
        if abs(b["center"][1] - current[-1]["center"][1]) <= y_threshold:
            current.append(b)
        else:
            lines.append(current)
            current = [b]
    lines.append(current)
    return lines

In [20]:
def _first_match(text: str, patterns):
    for p in patterns:
        m = re.search(p, text)
        if m:
            return m
    return None

def extract_fields_from_text_lines(lines, image_path):
    results = {}
    filename = os.path.basename(image_path)
    m = re.search(r"(\d+)", filename)  # first number in filename
    results["receipt_id"] = m.group(1) if m else os.path.splitext(filename)[0]

    bank_keywords = ["GARANTI BBVA","ISBANK","ZIRAAT","YAPI KREDI",
                     "AKBANK","HALKBANK","QNB","TEB","DENIZBANK"]

    # collect potential Mali IDs (there can be more than one)
    mali_ids = []

    for raw in lines:
        upper = normalize_ocr_text(raw)

        # --- Date ---
        m = re.search(r"\b(\d{2}[./-]\d{2}[./-](\d{2,4}))\b", upper)
        if m: results.setdefault("tarih", m.group(1))

        # --- Time ---
        m = re.search(r"\b(\d{2}[:.]\d{2}(?::\d{2})?)\b", upper)
        if m: results.setdefault("saat", m.group(1))

        # --- İşyeri No (I:, ISYERI NO, ISYERI:) ---
        m = _first_match(upper, [
            r"\bISYERI\s*NO[:\s\-]*([0-9]{5,})\b",
            r"\bISYERI[:\s\-]*([0-9]{5,})\b",
            r"\bI\s*[:\-]\s*([0-9]{5,})\b",
        ])
        if m: results.setdefault("isyeri_no", m.group(1))

        # --- Terminal No (T:, TERMINAL NO, TERMINAL:) ---
        m = _first_match(upper, [
            r"\bTERMINAL\s*NO[:\s\-]*([0-9]{4,})\b",
            r"\bTERMINAL[:\s\-]*([0-9]{4,})\b",
            r"\bT\s*[:\-]\s*([0-9]{4,})\b",
        ])
        if m: results.setdefault("terminal_no", m.group(1))

        # --- MERSIS ---
        m = re.search(r"\bMERSIS\s*NO[:\s]*([0-9\s]{10,})\b", upper)
        if m: results.setdefault("mersis_no", m.group(1).replace(" ",""))

        # --- VKN ---
        m = re.search(r"\bVKN[:\s]*([0-9]{5,})\b", upper)
        if m: results.setdefault("vkn", m.group(1))

        # --- Ref No ---
        m = re.search(r"\b(REF|REFERANS)\s*(NO)?[:\s]*([0-9]{5,})\b", upper)
        if m: results.setdefault("ref_no", m.group(3))

        # --- Onay Numarasi / Code ---
        m = _first_match(upper, [
            r"\bONAY\s*NUMARASI[:\s]*([0-9]{5,})\b",
            r"\bONAY\s*KODU[:\s]*([0-9]{5,})\b",
        ])
        if m: results.setdefault("onay_numarasi", m.group(1))

        # --- Amount (TL/TRY) ---
        m = re.search(r"\b(\d{1,4}[.,]\d{2})\s*(TL|TRY)\b", upper)
        if m: results.setdefault("amount_tl", m.group(1))

        # --- Mali ID: AS / AT / AV + digits (allow spaces or hyphens) ---
        for mi in re.finditer(r"\bA[STV][\s\-]*\d{6,}\b", upper):
            token = mi.group(0).replace(" ", "").replace("-", "")
            mali_ids.append(token)

        # --- Bank name ---
        for bank in bank_keywords:
            if bank in upper:
                results.setdefault("bank", bank)
                break

    if mali_ids and "mali_id" not in results:
        # keep the longest or first; adjust as you prefer
        results["mali_id"] = max(mali_ids, key=len)

    return results

In [21]:
def detect_receipt_types(text, keyword_dict, threshold=1):
    """
    Returns a list of receipt types that matched at least `threshold` keywords.
    """
    normalized_text = text.lower()
    matched_types = []

    for receipt_type, keywords in keyword_dict.items():
        count = sum(1 for kw in keywords if kw.lower() in normalized_text)
        if count >= threshold:
            matched_types.append(receipt_type)

    return matched_types

### `is_receipt_image_from_path` — Purpose

This is the **main OCR processing pipeline** for detecting and analyzing receipts from a given image file path.

#### Why this is needed
- It combines **image preprocessing**, **OCR text extraction**, **clustering**, **receipt scoring**, and **field/type detection** into one unified function.
- Ensures the image is **correctly rotated** before OCR (important for Doctr accuracy).
- Groups detected words into logical **receipt regions** for multi-receipt images.

#### How it works
1. **Image Loading & Preprocessing**
   - Reads the image from disk.
   - Enhances contrast using `apply_light_clahe`.
   - Corrects rotation/skew using `correct_orientation_with_skew` based on keywords.

2. **OCR with Doctr**
   - Converts image to RGB and feeds it into the `doctr_model`.
   - Extracts each word’s:
     - Original text
     - Normalized text
     - Bounding box coordinates (absolute pixel values)
     - Word center
     - Confidence score

3. **Adaptive Clustering**
   - Uses `adaptive_dbscan_params` to dynamically calculate DBSCAN parameters (`eps_frac`, `min_samples`) based on word density and image size.
   - Clusters words into potential receipts with `cluster_receipts_by_centroids`.

4. **Per-Cluster Analysis**
   - For each cluster:
     - Computes bounding box (with padding).
     - Groups words into lines (`group_boxes_into_lines`).
     - Creates `line_texts` list for processing.
     - Scores cluster using `compute_receipt_score` (keywords, amount, date, time).
     - Extracts fields with `extract_fields_hybrid`.
     - Detects receipt types with `detect_receipt_types`.
     - Draws bounding boxes for visualization.

5. **Final Decision**
   - Marks image as containing a receipt if **any** cluster meets the `is_receipt` threshold.
   - Returns:
     - `receipts` → List of detected receipt regions with metadata
     - `rotated_image` → Visualization with bounding boxes (optional)
     - `words` → All OCR word objects
     - `clusters` → Grouped words per receipt
     - `is_receipt_image` → Boolean overall decision


In [34]:
def is_receipt_image_from_path(image_path, doctr_model, return_rotated=True,
                               eps_frac=None, min_samples=None):
    img = cv2.imread(image_path)
    if img is None:
        raise ValueError(f"Could not load image: {image_path}")
    img_clahe = apply_light_clahe(img)
    rotated, _ = correct_orientation_with_skew(img_clahe, keywords=keywords_for_detection, lang='tur')
    H, W = rotated.shape[:2]

    # OCR with Doctr
    rgb = cv2.cvtColor(rotated, cv2.COLOR_BGR2RGB)
    with torch.no_grad():
        result = doctr_model([rgb])
    page = result.pages[0]

    # collect words
    words = []
    for block in page.blocks:
        for line in block.lines:
            for w in line.words:
                txt = w.value.strip()
                norm = normalize_ocr_text(txt)
                if not norm:
                    continue
                (x1n, y1n), (x2n, y2n) = w.geometry
                x1, y1 = int(x1n * W), int(y1n * H)
                x2, y2 = int(x2n * W), int(y2n * H)
                words.append({
                    "text": txt,
                    "normalized_text": norm,
                    "box": [x1, y1, x2, y2],
                    "center": [(x1 + x2) // 2, (y1 + y2) // 2],
                    "confidence": getattr(w, "confidence", -1),
                })

    # --- Adaptive DBSCAN parameters ---
    if eps_frac is None or min_samples is None:
        eps_frac, min_samples = adaptive_dbscan_params(words, H, W)

    # cluster into receipts
    clusters = cluster_receipts_by_centroids(words, (H, W), eps_frac=eps_frac, min_samples=min_samples)

    results = []
    vis = rotated.copy()
    for cluster in clusters:
        xs1 = [wd["box"][0] for wd in cluster]
        ys1 = [wd["box"][1] for wd in cluster]
        xs2 = [wd["box"][2] for wd in cluster]
        ys2 = [wd["box"][3] for wd in cluster]
        x1, y1, x2, y2 = min(xs1), min(ys1), max(xs2), max(ys2)

        # padding
        pad_x = int(0.05 * (x2 - x1))
        pad_y = int(0.05 * (y2 - y1))
        x1p, y1p = max(0, x1 - pad_x), max(0, y1 - pad_y)
        x2p, y2p = min(W, x2 + pad_x), min(H, y2 + pad_y)

        # lines for THIS cluster
        lines = group_boxes_into_lines(cluster)
        line_texts = [" ".join(wd["normalized_text"] for wd in line) for line in lines]

        # score cluster as receipt / not receipt
        score_info = compute_receipt_score(line_texts, keywords_for_detection, return_debug=True, print_debug=True)

        # # regex + semantic
        # regex_fields = extract_fields_from_text_lines(line_texts, image_path)
        # sem_fields = extract_with_semantics(line_texts)
        fields = extract_fields_hybrid(line_texts, image_path)

        cluster_text = " ".join(line_texts).lower()
        type_tags = detect_receipt_types(cluster_text, RECEIPT_TYPE_KEYWORDS, threshold=1)


        # draw
        color = (0, 255, 0) if score_info["is_receipt"] else (0, 165, 255)
        cv2.rectangle(vis, (x1p, y1p), (x2p, y2p), color, 2)

        results.append({
            "region": (x1p, y1p, x2p, y2p),
            "fields": fields,
            "words": cluster,
            "lines": line_texts,
            "score": score_info["score"],
            "kw_hits": score_info["kw_hits"],
            "has_amount": score_info["has_amount"],
            "has_date": score_info["has_date"],
            "has_time": score_info["has_time"],
            "is_receipt": score_info["is_receipt"],
            "receipt_types": type_tags,
        })

    image_is_receipt = any(r["is_receipt"] for r in results)

    return {
        "receipts": results,
        "rotated_image": vis if return_rotated else None,
        "words": words,
        "clusters": clusters,
        "is_receipt_image": image_is_receipt
    }


In [29]:
BANK_NAMES = [
    "Garanti BBVA", "İş Bankası", "Ziraat Bankası", "Akbank", "Halkbank", "Yapı Kredi",
    "VakıfBank", "QNB Finansbank", "DenizBank", "TEB", "ING Bank", "HSBC",
]

DIGIT_PATTERNS = {
    "isyeri_no": r"(?:\bISYERI\s*NO\b|\bISYERI\b|(?:\b[I1]\b)\s*[:\-])\s*[:\-]?\s*([0-9]{5,})",
    "terminal_no": r"(?:\bTERMINAL\s*NO\b|\bTERMINAL\b|(?:\b[T7]\b)\s*[:\-])\s*[:\-]?\s*([0-9]{4,})",
    "onay_numarasi": r"(?:\bONAY(?:\s*NO|\s*NUMARASI|\s*KODU)?\b)\s*[:\-]?\s*([0-9]{5,})",
    "ref_no": r"(?:\bREF(?:ERANS)?\s*NO?\b)\s*[:\-]?\s*([0-9]{5,})",
    "tarih": r"\b(\d{2}[./-]\d{2}[./-]\d{2,4})\b",
    "saat": r"\b(\d{2}[:.]\d{2}(?::\d{2})?)\b",
    "amount_tl": r"\b(\d{1,4}[.,]\d{2})\s*(?:TL|TRY)?\b",
    "mali_id": r"\b(A[STV][\s\-]*\d{6,})\b",
}

# (optional) synonyms used only to decide neighbors when labels are split across lines
FIELD_QUERIES = {
    "isyeri_no":     ["İşyeri", "ISYERI", "I:"],
    "terminal_no":   ["Terminal", "TERMINAL", "T:"],
    "onay_numarasi": ["Onay numarası", "Onay kodu", "ONAY"],
    "ref_no":        ["Ref no", "Referans no", "REF", "REFERANS"],
    "tarih":         ["Tarih", "Date"],
    "saat":          ["Saat", "Time"],
    "amount_tl":     ["Tutar", "Toplam", "Amount", "İşlem tutarı"],
    "mali_id":       ["Mali id", "AV", "AS", "AT"],
    "bank":          ["Banka", "Bank"],
}

In [30]:
def _norm_line(s: str) -> str:
    # same normalization you use elsewhere
    s = s.upper()
    s = unicodedata.normalize("NFKD", s)
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = re.sub(r"\s+", " ", s)
    return s.strip()

def _search_with_anchors(field, lines_norm, start_idx):
    """Try the best line, then ±2 neighbors, then all lines using the field's anchored regex."""
    pat = re.compile(DIGIT_PATTERNS[field], re.IGNORECASE)
    # 1) best line
    if 0 <= start_idx < len(lines_norm):
        m = pat.search(lines_norm[start_idx])
        if m:
            return m.group(1)

    # 2) neighborhood
    for idx in range(max(0, start_idx-2), min(len(lines_norm), start_idx+3)):
        if idx == start_idx:
            continue
        m = pat.search(lines_norm[idx])
        if m:
            return m.group(1)

    # 3) global (still anchored to label, so safe)
    for idx in range(len(lines_norm)):
        m = pat.search(lines_norm[idx])
        if m:
            return m.group(1)

    return None
def semantic_pick_best_line(lines, queries, model, min_score=0.35):
    if not lines: return None, 0.0
    line_emb = model.encode(lines, convert_to_tensor=True, normalize_embeddings=True)
    q_emb = model.encode(queries, convert_to_tensor=True, normalize_embeddings=True)
    sims = util.cos_sim(q_emb, line_emb).max(dim=0).values
    top_idx = int(torch.argmax(sims))
    top_score = float(sims[top_idx])
    if top_score < min_score:
        return None, 0.0
    return top_idx, top_score


def extract_with_semantics(line_texts):
    """
    line_texts: list[str] for ONE receipt (cluster)
    Uses semantics to pick the *right* line per field,
    then extracts with label-anchored regexes. Prevents
    'ISYERI NO' from filling every field.
    """
    out = {}
    if not line_texts:
        return out

    # normalized lines for matching
    lines_norm = [_norm_line(s) for s in line_texts]

    # BANK: simple case-insensitive name scan across all lines
    for ln in lines_norm:
        for bank in BANK_NAMES:
            if _norm_line(bank) in ln:
                out["bank"] = bank
                break
        if "bank" in out:
            break

    # all other fields via semantic selection + anchored regex
    for field, queries in FIELD_QUERIES.items():
        if field == "bank":
            continue

        # 1) semantic: pick the most likely line index
        best_idx, score = semantic_pick_best_line(line_texts, queries, sem_model, min_score=0.35)
        if best_idx is None:
            continue

        # use index directly; no .index(...) re-search
        val = _search_with_anchors(field, lines_norm, best_idx)

        if not val:
            continue

        # post-process some fields
        if field == "mali_id":
            val = val.replace(" ", "").replace("-", "")
        if field == "amount_tl":
            # keep just the numeric part (already captured as group 1)
            pass
        if field == "terminal_no":
            # sanity: terminal numbers are usually short (4–8)
            if not (4 <= len(val) <= 8):
                continue

        out[field] = val
        out[field + "_score"] = round(score, 3)

    return out

In [24]:
def extract_fields_hybrid(line_texts, image_path):
    """
    Run regex-based extraction first, then fill missing values using semantic model.
    """
    regex_results = extract_fields_from_text_lines(line_texts, image_path)
    semantic_results = extract_with_semantics(line_texts)

    final = regex_results.copy()
    for k, v in semantic_results.items():
        if k.endswith("_score"):
            continue
        if not final.get(k):  # only fill if missing
            final[k] = v
            final[k + "_score"] = semantic_results.get(k + "_score", None)
    return final


In [28]:
# fields we expect to be UNIQUE per image (take consensus across clusters)
UNIQUE_FIELDS = [
    "isyeri_no", "terminal_no", "onay_numarasi", "ref_no",
    "tarih", "saat", "bank", "vkn", "mersis_no", "mali_id"
]

AMOUNT_FIELD = "amount_tl"

In [38]:
# --------- CONFIG ----------
IMAGE_FOLDER = "/content/drive/MyDrive/denem1"
OUTPUT_CSV   = "receipts_output.csv"
ALLOWED_EXTS = ("*.jpg", "*.jpeg", "*.png")
# ---------------------------

In [39]:
def iter_image_paths(folder, exts=ALLOWED_EXTS):
    paths = []
    for ext in exts:
        paths += glob.glob(os.path.join(folder, ext))
    return sorted(paths)

def _normalize_token(s):
    if s is None:
        return None
    s = str(s).strip()
    # collapse spaces & uppercase (helps majority vote)
    s = re.sub(r"\s+", " ", s).upper()
    return s

def _choose_by_majority(values, scores=None):
    """
    values: list of strings (may include None or '')
    scores: optional list of numeric scores (same length) — e.g., cluster scores
    Rule:
      1) Majority vote on normalized token
      2) Tie-break by best score (if provided) else first occurrence
    Returns the original (un-normalized) value chosen.
    """
    # map normalized -> list of (orig_value, idx)
    buckets = {}
    for i, v in enumerate(values):
        if not v:
            continue
        norm = _normalize_token(v)
        if not norm:
            continue
        buckets.setdefault(norm, []).append((v, i))

    if not buckets:
        return None

    # majority size
    counts = {k: len(vs) for k, vs in buckets.items()}
    best_norm = max(counts.keys(), key=lambda k: counts[k])
    tied_norms = [k for k,c in counts.items() if c == counts[best_norm]]

    if len(tied_norms) == 1 or not scores:
        # single winner or no scores -> take first original in that bucket
        return buckets[tied_norms[0]][0][0]

    # tie-break with scores (higher better)
    best = None
    best_score = float("-inf")
    for norm in tied_norms:
        for (orig, idx) in buckets[norm]:
            sc = scores[idx] if idx < len(scores) and scores[idx] is not None else 0.0
            if sc > best_score:
                best_score = sc
                best = orig
    return best

def _uniq_preserve_order(seq):
    seen = set()
    out = []
    for x in seq:
        if x in seen:
            continue
        seen.add(x)
        out.append(x)
    return out

def consolidate_receipt_fields(receipts):
    """
    receipts: list of cluster dicts from is_receipt_image_from_path
              each has .fields, .score, .lines, etc.
    Returns a single dict of consolidated fields for ONE image.
    """
    # gather cluster scores for tie-breaking
    cluster_scores = [r.get("score") for r in receipts]

    consolidated = {}

    # 1) unique fields: majority vote across clusters
    for field in UNIQUE_FIELDS:
        vals = []
        for r in receipts:
            val = (r.get("fields") or {}).get(field)
            vals.append(val)
        consolidated[field] = _choose_by_majority(vals, scores=cluster_scores)

    # 2) amounts: collect all amounts we found across clusters (dedup)
    all_amounts = []
    for r in receipts:
        # if your extractor put amount into fields:
        f = (r.get("fields") or {}).get(AMOUNT_FIELD)
        if f:
            all_amounts.append(str(f))

        # optional: scan lines to catch amounts missed by extractor
        for ln in r.get("lines") or []:
            for m in re.finditer(r"\b(\d{1,4}[.,]\d{2})\s*(TL|TRY)?\b", ln.upper()):
                all_amounts.append(m.group(1))

    amounts_uniq = _uniq_preserve_order([a.replace(" ", "") for a in all_amounts])
    consolidated["amounts_all"] = ";".join(amounts_uniq)

    # Optional: derive max/total numeric amounts (best-effort)
    def _parse_amount(a):
        # convert "1.234,56" or "1234,56" or "1234.56" to float
        a = a.replace(" ", "")
        if a.count(",") == 1 and a.count(".") >= 1:
            # assume thousand sep '.' and decimal ','
            a = a.replace(".", "").replace(",", ".")
        elif a.count(",") == 1 and a.count(".") == 0:
            a = a.replace(",", ".")
        try:
            return float(a)
        except:
            return None

    nums = [ _parse_amount(a) for a in amounts_uniq ]
    nums = [x for x in nums if x is not None]
    consolidated["amount_max"] = max(nums) if nums else None
    consolidated["amount_sum"] = round(sum(nums), 2) if nums else None

    return consolidated

# ================= RUN OVER A FOLDER =================

rows = []

for img_path in iter_image_paths(IMAGE_FOLDER):
    try:
        out = is_receipt_image_from_path(img_path, doctr_model, return_rotated=False)

        receipts = out.get("receipts", [])
        image_is_receipt = any(r.get("is_receipt") for r in receipts) if receipts else False

        # consolidate even if no clusters (returns mostly None)
        if receipts:
            consolidated = consolidate_receipt_fields(receipts)
        else:
            consolidated = {k: None for k in UNIQUE_FIELDS}
            consolidated.update({"amounts_all":"", "amount_max":None, "amount_sum":None})

        # you can also union receipt types per image if you already compute them in your pipeline
        types_union = sorted({t for r in receipts for t in r.get("receipt_types", [])})
        row = {
            "image_path": img_path,
            "num_clusters": len(receipts),
            "is_receipt_image": image_is_receipt,
            "receipt_types": ";".join(types_union) if types_union else "",
        }
        # add consolidated unique fields + amounts
        row.update(consolidated)

        rows.append(row)

    except Exception as e:
        print(f"Error processing {img_path}: {e}")
        rows.append({
            "image_path": img_path,
            "num_clusters": 0,
            "is_receipt_image": False,
            "receipt_types": "",
            **{k: None for k in UNIQUE_FIELDS},
            "amounts_all": "",
            "amount_max": None,
            "amount_sum": None,
        })

df = pd.DataFrame(rows)
df.to_csv(OUTPUT_CSV, index=False, encoding="utf-8-sig")
print(f"Saved {len(df)} image rows to {OUTPUT_CSV}")


KeyboardInterrupt: 