In [3]:
import os
import json
import re  
from pathlib import Path
import cv2
import numpy as np
from typing import List, Dict  
from paddleocr import PaddleOCR
from transformers import LayoutLMv2Processor, LayoutLMv2ForTokenClassification
from PIL import Image
import torch
from huggingface_hub import login

class InvoiceExtractor:
    def __init__(self, hf_token=None):
    
        self.ocr = PaddleOCR(use_angle_cls=True, lang='en', show_log=False)
        
        try:
            if hf_token:
                login(token=hf_token)
            
           
            self.processor = LayoutLMv2Processor.from_pretrained(
                "microsoft/layoutlmv2-base-uncased",
                use_auth_token=hf_token if hf_token else None
            )
            self.model = LayoutLMv2ForTokenClassification.from_pretrained(
                "microsoft/layoutlmv2-finetuned-funsd",
                use_auth_token=hf_token if hf_token else None
            )
            
            self.use_layoutlm = True
            print("LayoutLM model loaded successfully")
        except Exception as e:
            print(f"LayoutLM initialization failed: {str(e)}")
            print("Falling back to OCR-only mode with heuristic extraction")
            self.use_layoutlm = False
        
      
        self.label2id = {
            "O": 0,
            "B-INVOICE_NUMBER": 1,
            "I-INVOICE_NUMBER": 2,
            "B-INVOICE_DATE": 3,
            "I-INVOICE_DATE": 4,
            "B-SELLER_NAME": 5,
            "I-SELLER_NAME": 6,
            "B-SELLER_ADDRESS": 7,
            "I-SELLER_ADDRESS": 8,
            "B-TOTAL_AMOUNT": 9,
            "I-TOTAL_AMOUNT": 10,
            "B-LINE_ITEM": 11,
            "I-LINE_ITEM": 12
        }
        self.id2label = {v: k for k, v in self.label2id.items()}
        
       
        self.fallback_patterns = {
            'invoice_number': r'(invoice\s*no[:.]?\s*)([A-Za-z0-9-]+)',
            'invoice_date': r'(date\s*of\s*issue[:.]?\s*)(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})',
            'seller_name': r'(seller[:.]?\s*)([\w\s,.&-]+)',
            'total_amount': r'(total\s*)\$?\s*([\d,.]+)',
            'vat_amount': r'(vat\s*)\$?\s*([\d,.]+)'
        }

    def preprocess_image(self, image_path: str) -> np.ndarray:
        """Preprocess image for better OCR results"""
        img = cv2.imread(image_path)
        if img is None:
            raise ValueError(f"Could not read image at {image_path}")
            
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        thresh = cv2.adaptiveThreshold(
            gray, 255, 
            cv2.ADAPTIVE_THRESH_GAUSSIAN_C, 
            cv2.THRESH_BINARY, 11, 2
        )
        return thresh
    
    def extract_text_with_paddleocr(self, image_path: str) -> List[Dict]:
        """Extract text and bounding boxes using PaddleOCR"""
        try:
            processed_img = self.preprocess_image(image_path)
            temp_path = "temp_preprocessed.jpg"
            cv2.imwrite(temp_path, processed_img)
            
            result = self.ocr.ocr(temp_path, cls=True)
            os.remove(temp_path)
            
            ocr_results = []
            if result and result[0]:
                for line in result[0]:
                    if line and len(line) >= 2:
                        text = line[1][0]
                        confidence = line[1][1]
                        bbox = line[0]
                        ocr_results.append({
                            "text": text,
                            "confidence": float(confidence),
                            "bbox": [[float(p[0]), float(p[1])] for p in bbox]
                        })
            return ocr_results
        except Exception as e:
            print(f"Error in PaddleOCR: {str(e)}")
            return []

    def process_with_layoutlm(self, image_path: str, ocr_results: List[Dict]) -> Dict:
        """Process OCR results with LayoutLM for structured extraction"""
        if not self.use_layoutlm:
            return {}
            
        try:
            image = Image.open(image_path).convert("RGB")
            words = [res["text"] for res in ocr_results]
            boxes = [res["bbox"] for res in ocr_results]
            
            normalized_boxes = []
            for box in boxes:
                x_coords = [p[0] for p in box]
                y_coords = [p[1] for p in box]
                x0, x1 = min(x_coords), max(x_coords)
                y0, y1 = min(y_coords), max(y_coords)
                normalized_boxes.append([x0, y0, x1, y1])
            
            encoding = self.processor(
                image, words, boxes=normalized_boxes,
                return_offsets_mapping=True, padding="max_length",
                truncation=True, max_length=512, return_tensors="pt"
            )
            
            with torch.no_grad():
                outputs = self.model(
                    input_ids=encoding["input_ids"],
                    bbox=encoding["bbox"],
                    attention_mask=encoding["attention_mask"],
                    token_type_ids=encoding["token_type_ids"],
                    image=encoding["image"]
                )
            
            predictions = outputs.logits.argmax(-1).squeeze().tolist()
            results = {}
            current_label = None
            current_text = ""
            
            for pred, word in zip(predictions, words):
                label = self.id2label.get(pred, "O")
                
                if label.startswith("B-"):
                    if current_label:
                        results[current_label] = current_text.strip()
                    current_label = label[2:]
                    current_text = word
                elif label.startswith("I-") and current_label == label[2:]:
                    current_text += " " + word
                else:
                    if current_label:
                        results[current_label] = current_text.strip()
                        current_label = None
                        current_text = ""
            
            if current_label:
                results[current_label] = current_text.strip()
            
            return results
        except Exception as e:
            print(f"Error in LayoutLM processing: {str(e)}")
            return {}

    def extract_with_fallback(self, text: str) -> Dict:
        """Fallback extraction using regex patterns when LayoutLM fails"""
        results = {}
        for field, pattern in self.fallback_patterns.items():
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                results[field.upper()] = match.group(2).strip()
        return results

    def extract_line_items(self, ocr_results: List[Dict]) -> List[Dict]:
        """Extract line items using table detection heuristics"""
        try:
            # Group by approximate y-coordinate (same line)
            line_groups = {}
            for res in ocr_results:
                y_center = sum(p[1] for p in res["bbox"]) / 4
                found_group = False
                for y in line_groups:
                    if abs(y - y_center) < 10:  # 10 pixel tolerance
                        line_groups[y].append(res)
                        found_group = True
                        break
                if not found_group:
                    line_groups[y_center] = [res]
            
            
            sorted_lines = [line_groups[y] for y in sorted(line_groups)]
            
           
            line_items = []
            for line in sorted_lines:
                line.sort(key=lambda x: x["bbox"][0][0])
                
               
                if len(line) >= 4 and line[0]["text"].replace(".", "").isdigit():
                    line_items.append({
                        "item_no": line[0]["text"].replace(".", ""),
                        "description": line[1]["text"],
                        "quantity": line[2]["text"],
                        "unit_price": line[3]["text"],
                        "total": line[4]["text"] if len(line) > 4 else ""
                    })
            
            return line_items
        except Exception as e:
            print(f"Error in line item extraction: {str(e)}")
            return []

    def process_invoice(self, image_path: str) -> Dict:
        """Process single invoice image"""
        try:
            print(f"\nProcessing {os.path.basename(image_path)}...")
            ocr_results = self.extract_text_with_paddleocr(image_path)
            text = " ".join([res["text"] for res in ocr_results])
            
            if self.use_layoutlm:
                print("Using LayoutLM for field extraction...")
                layout_results = self.process_with_layoutlm(image_path, ocr_results)
            else:
                print("Using OCR-only fallback for field extraction...")
                layout_results = self.extract_with_fallback(text)
            
            line_items = self.extract_line_items(ocr_results)
            if line_items:
                layout_results["LINE_ITEMS"] = line_items
            
            # Add OCR confidence summary
            if ocr_results:
                confidences = [res["confidence"] for res in ocr_results]
                layout_results["OCR_QUALITY"] = {
                    "average_confidence": sum(confidences)/len(confidences),
                    "lowest_confidence": min(confidences)
                }
            
            print(f"Completed processing {os.path.basename(image_path)}")
            return layout_results
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")
            return {"error": str(e), "file": os.path.basename(image_path)}

    def batch_process(self, image_dir: str, output_dir: str, max_files: int = None) -> None:
        """Process all images in a directory"""
        image_dir = Path(image_dir)
        output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
        
        
        image_extensions = ['*.png', '*.jpg', '*.jpeg']
        image_paths = []
        for ext in image_extensions:
            image_paths.extend(list(image_dir.glob(ext)))
        
        
        if max_files is not None:
            image_paths = image_paths[:max_files]
        
        print(f"Found {len(image_paths)} images to process in {image_dir}")
        
        for img_path in image_paths:
            try:
                result = self.process_invoice(str(img_path))
                output_path = output_dir / f'{img_path.stem}.json'
                
                with open(output_path, 'w', encoding='utf-8') as f:
                    json.dump(result, f, indent=2, ensure_ascii=False)
                
                print(f'Saved results to {output_path}')
            except Exception as e:
                print(f'Failed to process {img_path.name}: {str(e)}')

if __name__ == "__main__":
    
    extractor = InvoiceExtractor()
    
    
    input_folder = os.path.expanduser("~/Desktop/invoice/batch1_1")
    output_folder = os.path.expanduser("~/Desktop/invoice/batch1_1_results")
    
    
    os.makedirs(output_folder, exist_ok=True)
    
    print(f"Starting processing of {input_folder}")
    print(f"Results will be saved to {output_folder}")
    
    extractor.batch_process(input_folder, output_folder, max_files=10)
    
    print("\nProcessing complete!")

LayoutLM initialization failed: microsoft/layoutlmv2-finetuned-funsd is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo with `use_auth_token` or log in with `huggingface-cli login` and pass `use_auth_token=True`.
Falling back to OCR-only mode with heuristic extraction
Starting processing of C:\Users\HP/Desktop/invoice/batch1_1
Results will be saved to C:\Users\HP/Desktop/invoice/batch1_1_results
Found 10 images to process in C:\Users\HP\Desktop\invoice\batch1_1

Processing batch1-0001.jpg...
Using OCR-only fallback for field extraction...
Completed processing batch1-0001.jpg
Saved results to C:\Users\HP\Desktop\invoice\batch1_1_results\batch1-0001.json

Processing batch1-0002.jpg...
Using OCR-only fallback for field extraction...
Completed processing batch1-0002.jpg
Saved results to C:\Users\HP\Desktop\invoice\batch1_1_results\batch1-0002.json

Proc