In [1]:
import pdfplumber
import pandas as pd
import os

def extract_all_tables(pdf_path, output_dir, page_num=0, settings=None):
    """
    Extracts all tables from a PDF page using pdfplumber and exports each to a CSV file.
    Args:
        pdf_path (str): Path to the PDF file.
        output_dir (str): Directory to save CSV files.
        page_num (int): Page number to extract tables from (0-indexed).
        settings (dict): pdfplumber table extraction settings.
    """
    if settings is None:
        settings = {
            "vertical_strategy": "lines",
            "horizontal_strategy": "lines",
            "snap_tolerance": 7,
            "join_tolerance": 7,
            "edge_min_length": 30,
            "text_x_tolerance": 2,
            "text_y_tolerance": 7,
            "intersection_tolerance": 5,
        }
    os.makedirs(output_dir, exist_ok=True)
    with pdfplumber.open(pdf_path) as pdf:
        page = pdf.pages[page_num]
        tables = page.extract_tables(settings)
        for idx, table in enumerate(tables):
            if not table or not table[0]:
                continue
            df = pd.DataFrame(table[1:], columns=table[0]) if len(table) > 1 else pd.DataFrame(table)
            csv_path = os.path.join(output_dir, f"table_{idx+1}.csv")
            df.to_csv(csv_path, index=False)
            print(f"Exported table {idx+1} to {csv_path}")



# Simple Text Extraction

We are now going to build a simple text extractor for our PDF Parser so that our layout model (LayoutLMv3) and make sense of it from an input perspective.

Here is our strategy:

(From the Data_load):

- Fetch the 10K filings from `SEC-EDGAR` library and `SEC-API` to generate raw data as txt to convert it to PDFs.

(In this file):

- Use `pdfplumber` library for thorough text extraction and `Tesseract` library for the OCR fallback. 

## Library Imports

In [2]:
import os
import glob
import json
from pathlib import Path
import pdfplumber
import pytesseract
from PIL import Image
import io
from datetime import datetime
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Optional
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from dataclasses import dataclass, asdict

## PDF Processing Loop

- We begin by iterating through the documents in `data/raw`.
- We also provide folder paths for the raw data and parsed data, implement checks using os if there are no directories available at the designated paths.

In [3]:
# Define paths
raw_pdf_dir = "../data/raw/MSFT/10-K/PDFs"
parsed_output_dir = "../data/parsed/MSFT"

In [4]:
# Create output directory if it doesn't exist
os.makedirs(parsed_output_dir, exist_ok=True)

### Classes

In [4]:
@dataclass
class WordBox:
    """Data class for word box information with comprehensive positioning data"""
    text: str
    x0: float
    y0: float
    x1: float
    y1: float
    width: float
    height: float
    fontname: Optional[str] = None
    fontsize: Optional[float] = None
    fontcolor: Optional[str] = None
    confidence: Optional[float] = None
    page_number: int = 0
    word_index: int = 0
    doctop: Optional[float] = None  # Document-level top position (page offset)
    upright: Optional[bool] = None  # Text orientation (True = normal, False = rotated)
    top: Optional[float] = None     # Page-relative top position
    bottom: Optional[float] = None  # Page-relative bottom position
    left: Optional[float] = None    # Page-relative left position  
    right: Optional[float] = None   # Page-relative right position
    
    def to_dict(self):
        return asdict(self)
    
    @property
    def center_x(self):
        return (self.x0 + self.x1) / 2
    
    @property
    def center_y(self):
        return (self.y0 + self.y1) / 2
    
    @property
    def area(self):
        return self.width * self.height
    
    @property
    def is_rotated(self):
        """Check if text is rotated (not upright)"""
        return self.upright is False
    
    @property
    def document_position(self):
        """Get document-level position including page offset"""
        if self.doctop is not None:
            return {
                'document_top': self.doctop,
                'page_relative_top': self.top,
                'page_number': self.page_number
            }
        return None

In [5]:
@dataclass
class PageLayout:
    """Data class for page layout information with comprehensive analysis"""
    page_number: int
    page_width: float
    page_height: float
    word_boxes: List[WordBox]
    text_blocks: List[Dict]
    reading_order: List[int]
    layout_analysis: Dict
    
    def to_dict(self):
        """Convert to dictionary for JSON serialization"""
        return {
            'page_number': self.page_number,
            'page_width': self.page_width,
            'page_height': self.page_height,
            'word_boxes': [box.to_dict() for box in self.word_boxes],
            'text_blocks': self.text_blocks,
            'reading_order': self.reading_order,
            'layout_analysis': self.layout_analysis
        }
    
    @property
    def total_words(self):
        """Total number of words on the page"""
        return len(self.word_boxes)
    
    @property
    def text_density(self):
        """Text density as percentage of page area covered by text"""
        if not self.word_boxes:
            return 0.0
        
        total_text_area = sum(box.area for box in self.word_boxes)
        page_area = self.page_width * self.page_height
        return (total_text_area / page_area) * 100 if page_area > 0 else 0.0
    
    @property
    def average_font_size(self):
        """Average font size across all words"""
        font_sizes = [box.fontsize for box in self.word_boxes if box.fontsize is not None]
        return np.mean(font_sizes) if font_sizes else 0.0
    
    @property
    def layout_type(self):
        """Get the classified layout type"""
        return self.layout_analysis.get('layout_type', 'unknown')
    
    @property
    def estimated_columns(self):
        """Get the estimated number of columns"""
        return self.layout_analysis.get('columns', 1)
    
    def get_words_by_column(self, column_index):
        """Get words belonging to a specific column"""
        if self.estimated_columns <= 1:
            return self.word_boxes
        
        column_width = self.page_width / self.estimated_columns
        column_start = column_index * column_width
        column_end = (column_index + 1) * column_width
        
        return [box for box in self.word_boxes 
                if column_start <= box.center_x < column_end]
    
    def get_words_by_line(self, line_tolerance=10):
        """Group words into lines based on Y-coordinate"""
        if not self.word_boxes:
            return []
        
        lines = []
        current_line = []
        
        for box in sorted(self.word_boxes, key=lambda b: b.y0):
            if not current_line or not any(abs(box.y0 - line_box.y0) < line_tolerance for line_box in current_line):
                if current_line:
                    lines.append(sorted(current_line, key=lambda b: b.x0))
                current_line = [box]
            else:
                current_line.append(box)
        
        if current_line:
            lines.append(sorted(current_line, key=lambda b: b.x0))
        
        return lines
    
    def get_rotated_words(self):
        """Get all rotated words on the page"""
        return [box for box in self.word_boxes if box.is_rotated]
    
    def get_high_confidence_words(self, min_confidence=80):
        """Get words with confidence above threshold (for OCR results)"""
        return [box for box in self.word_boxes 
                if box.confidence is not None and box.confidence >= min_confidence]
    
    def analyze_text_flow(self):
        """Analyze the text flow pattern on the page"""
        if not self.word_boxes:
            return {'flow_type': 'empty', 'flow_score': 0}
        
        # Analyze reading order consistency
        reading_order_boxes = [self.word_boxes[i] for i in self.reading_order if i < len(self.word_boxes)]
        
        # Check if reading order follows logical flow
        flow_violations = 0
        for i in range(len(reading_order_boxes) - 1):
            current = reading_order_boxes[i]
            next_box = reading_order_boxes[i + 1]
            
            # Check for major violations (next word is significantly above or to the right)
            if next_box.y0 < current.y0 - 20:  # Next word is much higher
                flow_violations += 1
            elif next_box.x0 > current.x1 + 100:  # Next word is far to the right
                flow_violations += 1
        
        flow_score = max(0, 100 - (flow_violations / len(reading_order_boxes)) * 100)
        
        if flow_score > 90:
            flow_type = 'excellent'
        elif flow_score > 75:
            flow_type = 'good'
        elif flow_score > 50:
            flow_type = 'fair'
        else:
            flow_type = 'poor'
        
        return {
            'flow_type': flow_type,
            'flow_score': flow_score,
            'violations': flow_violations,
            'total_transitions': len(reading_order_boxes) - 1
        }

### Remaining Code

In [6]:
def process_all_pdfs():
    """Main function to process all MSFT 10-K PDFs with word box extraction and layout analysis"""
    
    # Get all PDF files
    pdf_files = glob.glob(os.path.join(raw_pdf_dir, "*.pdf"))
    print(f"🔍 Found {len(pdf_files)} PDF files to process")
    
    processing_stats = {
        'total_files': len(pdf_files),
        'processed_files': 0,
        'failed_files': 0,
        'total_pages': 0,
        'total_word_boxes': 0,
        'pdfplumber_pages': 0,
        'ocr_pages': 0,
        'poor_quality_pages': 0
    }
    
    for pdf_path in pdf_files:
        filename = os.path.basename(pdf_path)
        year = filename.split('_')[2][:4]
        
        print(f"\n📄 Processing: {filename}")
        print(f"📅 Year: {year}")
        
        # Create year-specific output directory
        year_output_dir = os.path.join(parsed_output_dir, year)
        os.makedirs(year_output_dir, exist_ok=True)
        
        table_settings = {
            "vertical_strategy": "lines",
            "horizontal_strategy": "lines",
            "snap_x_tolerance": 10,
            "snap_y_tolerance": 10,
            "join_tolerance": 3,
            "edge_min_length": 3,
            "min_words_vertical": 3,
            "min_words_horizontal": 1,
            "intersection_tolerance": 3,
            "text_tolerance": 3,
            "text_x_tolerance": 3,
            "text_y_tolerance": 3
        }
        
        # Process the PDF
        file_stats = process_single_pdf(pdf_path, year_output_dir, filename, table_settings)
        
        # Update overall statistics with safe key access
        processing_stats['processed_files'] += 1
        processing_stats['total_pages'] += file_stats.get('total_pages', 0)
        processing_stats['total_word_boxes'] += file_stats.get('total_word_boxes', 0)
        processing_stats['pdfplumber_pages'] += file_stats.get('pdfplumber_pages', 0)
        processing_stats['ocr_pages'] += file_stats.get('ocr_pages', 0)
        processing_stats['poor_quality_pages'] += file_stats.get('poor_quality_pages', 0)
    
    # Print final statistics
    print_processing_summary(processing_stats)

In [7]:
def extract_word_boxes_with_layout(page, page_num, method='pdfplumber'):
    """Extract word boxes with comprehensive layout analysis including document positioning"""
    
    word_boxes = []
    text_blocks = []
    
    try:
        if method == 'pdfplumber':
            # First, let's check what's available in the page
            try:
                # Try basic word extraction first
                words = page.extract_words(
                    x_tolerance=3,
                    y_tolerance=3,
                    keep_blank_chars=False
                )
                
                # Debug: Check what attributes are available
                if words:
                    first_word = words[0]
                    available_attrs = list(first_word.keys())
                    print(f"      🔍 Page {page_num} - Available word attributes: {available_attrs}")
                    
                    # Check for required attributes
                    required_attrs = ['text', 'x0', 'y0', 'x1', 'y1']
                    missing_attrs = [attr for attr in required_attrs if attr not in available_attrs]
                    
                    if missing_attrs:
                        print(f"      ⚠️  Page {page_num} - Missing required attributes: {missing_attrs}")
                        # Try alternative extraction methods
                        return extract_word_boxes_alternative(page, page_num)
                
            except Exception as e:
                print(f"      ❌ Basic word extraction failed for page {page_num}: {e}")
                return extract_word_boxes_alternative(page, page_num)
            
            # Process each word with comprehensive error handling
            for idx, word in enumerate(words):
                try:
                    # Validate required attributes exist
                    if not all(attr in word for attr in ['text', 'x0', 'y0', 'x1', 'y1']):
                        print(f"      ⚠️  Page {page_num}, Word {idx}: Missing required coordinates")
                        continue
                    
                    # Create word box with safe attribute access
                    word_box = WordBox(
                        text=word.get('text', ''),
                        x0=float(word.get('x0', 0)),
                        y0=float(word.get('y0', 0)),
                        x1=float(word.get('x1', 0)),
                        y1=float(word.get('y1', 0)),
                        width=float(word.get('x1', 0)) - float(word.get('x0', 0)),
                        height=float(word.get('y1', 0)) - float(word.get('y0', 0)),
                        fontname=word.get('fontname'),
                        fontsize=word.get('size'),
                        fontcolor=word.get('fontcolor'),
                        doctop=word.get('doctop'),
                        upright=word.get('upright'),
                        top=word.get('top'),
                        bottom=word.get('bottom'),
                        left=word.get('left'),
                        right=word.get('right'),
                        page_number=page_num,
                        word_index=idx
                    )
                    word_boxes.append(word_box)
                    
                except Exception as e:
                    print(f"      ⚠️  Page {page_num}, Word {idx}: Failed to create word box: {e}")
                    continue
            
            # Extract text blocks for layout analysis
            try:
                text_blocks = page.extract_text_simple()
            except:
                text_blocks = []
            
        else:  # OCR method
            word_boxes = extract_word_boxes_ocr(page, page_num)
    
    except Exception as e:
        print(f"      ❌ Word box extraction failed for page {page_num}: {e}")
        return extract_word_boxes_alternative(page, page_num)
    
    print(f"      ✅ Page {page_num}: Extracted {len(word_boxes)} word boxes")
    return word_boxes, text_blocks

In [8]:
def extract_word_boxes_alternative(page, page_num):
    """Alternative word box extraction when standard method fails"""
    
    word_boxes = []
    text_blocks = []
    
    try:
        # Method 1: Try extracting characters and grouping them
        print(f"      🔄 Page {page_num}: Trying character-based extraction...")
        
        try:
            chars = page.chars
            if chars:
                # Group characters into words
                words = group_chars_into_words(chars, page_num)
                word_boxes.extend(words)
                print(f"      ✅ Page {page_num}: Character-based extraction found {len(words)} words")
            else:
                print(f"      ⚠️  Page {page_num}: No characters found")
        except Exception as e:
            print(f"      ❌ Character-based extraction failed: {e}")
        
        # Method 2: If character extraction fails, try text-based estimation
        if not word_boxes:
            print(f"      🔄 Page {page_num}: Trying text-based estimation...")
            word_boxes = extract_word_boxes_from_text(page, page_num)
        
        # Extract text blocks
        try:
            text_blocks = page.extract_text_simple()
        except:
            text_blocks = []
    
    except Exception as e:
        print(f"      ❌ Alternative word box extraction failed for page {page_num}: {e}")
        return [], []
    
    return word_boxes, text_blocks

def group_chars_into_words(chars, page_num):
    """Group characters into words with bounding boxes"""
    
    if not chars:
        return []
    
    words = []
    current_word = []
    word_index = 0
    
    # Sort characters by position (top to bottom, left to right)
    sorted_chars = sorted(chars, key=lambda c: (c.get('top', 0), c.get('x0', 0)))
    
    for char in sorted_chars:
        if not char.get('text', '').strip():
            continue
        
        # If this is a space or significant gap, end current word
        if (char.get('text', '') == ' ' or 
            (current_word and 
             abs(char.get('x0', 0) - current_word[-1].get('x1', 0)) > 5)):
            
            if current_word:
                # Create word box from current word
                word_box = create_word_box_from_chars(current_word, page_num, word_index)
                if word_box:
                    words.append(word_box)
                    word_index += 1
                current_word = []
        
        # Add character to current word
        if char.get('text', '').strip():
            current_word.append(char)
    
    # Handle last word
    if current_word:
        word_box = create_word_box_from_chars(current_word, page_num, word_index)
        if word_box:
            words.append(word_box)
    
    return words

def create_word_box_from_chars(chars, page_num, word_index):
    """Create a word box from a list of characters"""
    
    if not chars:
        return None
    
    # Get bounding box from characters
    x0 = min(char.get('x0', 0) for char in chars)
    y0 = min(char.get('top', 0) for char in chars)
    x1 = max(char.get('x1', 0) for char in chars)
    y1 = max(char.get('bottom', 0) for char in chars)
    
    # Combine text
    text = ''.join(char.get('text', '') for char in chars)
    
    if not text.strip():
        return None
    
    # Get font info from first character
    first_char = chars[0]
    
    return WordBox(
        text=text.strip(),
        x0=float(x0),
        y0=float(y0),
        x1=float(x1),
        y1=float(y1),
        width=float(x1 - x0),
        height=float(y1 - y0),
        fontname=first_char.get('fontname'),
        fontsize=first_char.get('size'),
        fontcolor=first_char.get('fontcolor'),
        page_number=page_num,
        word_index=word_index
    )

def extract_word_boxes_from_text(page, page_num):
    """Extract word boxes by estimating positions from text"""
    
    word_boxes = []
    
    try:
        # Get page text
        text = page.extract_text()
        if not text:
            return []
        
        # Get page dimensions
        page_width = page.width if hasattr(page, 'width') else 612
        page_height = page.height if hasattr(page, 'height') else 792
        
        # Split into lines and words
        lines = text.split('\n')
        word_index = 0
        
        for line_num, line in enumerate(lines):
            if not line.strip():
                continue
            
            words = line.split()
            if not words:
                continue
            
            # Estimate line position
            line_y = line_num * 15  # Rough estimate: 15 points per line
            
            # Distribute words across line width
            char_width = page_width / 80  # Rough estimate: 80 chars per line
            word_width = page_width / len(words) if words else char_width * 5
            
            for word_num, word in enumerate(words):
                if not word.strip():
                    continue
                
                # Estimate word position
                word_x = word_num * word_width
                word_width_actual = len(word) * char_width
                
                word_box = WordBox(
                    text=word.strip(),
                    x0=float(word_x),
                    y0=float(line_y),
                    x1=float(word_x + word_width_actual),
                    y1=float(line_y + 12),  # Rough estimate: 12 points height
                    width=float(word_width_actual),
                    height=12.0,
                    page_number=page_num,
                    word_index=word_index
                )
                word_boxes.append(word_box)
                word_index += 1
    
    except Exception as e:
        print(f"      ❌ Text-based word box extraction failed: {e}")
    
    return word_boxes

In [None]:
# ENHANCED VERSION: process_single_pdf with LayoutLMv3 integration
def process_single_pdf_enhanced(pdf_path, output_dir, filename, table_settings, generate_layoutlmv3=True):
    """Process a single PDF file with comprehensive quality assessment and word box extraction
    
    Args:
        pdf_path (str): Path to the PDF file to process
        output_dir (str): Directory to save output files
        filename (str): Name of the PDF file
        table_settings (dict): Settings for table extraction
        generate_layoutlmv3 (bool, optional): Whether to generate LayoutLMv3-compatible output. Defaults to True.
    """
    # Create output directory for tables
    tables_dir = os.path.join(output_dir, "tables")
    os.makedirs(tables_dir, exist_ok=True)
    
    # (Optional) Extract tables from all pages
    with pdfplumber.open(pdf_path) as pdf:
        for page_num in range(len(pdf.pages)):
            extract_and_integrate_tables(
                pdf_path=pdf_path,
                output_dir=tables_dir,
                page_num=page_num,
                settings=table_settings
            )
    base_name = filename.replace('.pdf', '')
    output_file = os.path.join(output_dir, f"{base_name}_extracted.txt")
    metadata_file = os.path.join(output_dir, f"{base_name}_metadata.json")
    wordboxes_file = os.path.join(output_dir, f"{base_name}_wordboxes.json")
    layout_file = os.path.join(output_dir, f"{base_name}_layout.json")
    layoutlmv3_file = os.path.join(output_dir, f"{base_name}_layoutlmv3.json")  # NEW!
    
    # Skip if already processed (including LayoutLMv3 file if requested)
    files_to_check = [output_file, metadata_file, wordboxes_file, layout_file]
    if generate_layoutlmv3:
        files_to_check.append(layoutlmv3_file)
    
    if all(os.path.exists(f) for f in files_to_check):
        print(f"  ⏩ Already processed{' (including LayoutLMv3)' if generate_layoutlmv3 else ''}: {base_name}")
        return load_existing_stats(metadata_file)
    
    file_stats = {
        'filename': filename,
        'total_pages': 0,
        'pdfplumber_pages': 0,
        'ocr_pages': 0,
        'poor_quality_pages': 0,
        'total_word_boxes': 0,
        'processing_time': 0,
        'page_details': []
    }
    
    start_time = datetime.now()
    
    try:
        with pdfplumber.open(pdf_path) as pdf:
            print(f"  📄 Total pages: {len(pdf.pages)}")
            file_stats['total_pages'] = len(pdf.pages)
            
            extracted_pages = []
            all_word_boxes = []
            all_page_layouts = []
            
            # Process each page with quality assessment and word box extraction
            for page_num, page in enumerate(pdf.pages, 1):
                page_result = extract_page_with_quality_check(page, page_num)
                extracted_pages.append(page_result)
                all_word_boxes.extend(page_result['word_boxes'])
                all_page_layouts.append(page_result['page_layout'])
                file_stats['page_details'].append(page_result['metadata'])
                
                # Update statistics
                if page_result['metadata']['method'] == 'pdfplumber':
                    file_stats['pdfplumber_pages'] += 1
                else:
                    file_stats['ocr_pages'] += 1
                
                if page_result['metadata']['quality_flag']:
                    file_stats['poor_quality_pages'] += 1
                
                file_stats['total_word_boxes'] += len(page_result['word_boxes'])
                
                # Progress indicator
                if page_num % 20 == 0:
                    print(f"    📖 Processed {page_num} pages...")
            
            # Save extracted content, word boxes, and layout data
            save_extracted_content(extracted_pages, output_file, metadata_file, file_stats)
            save_word_boxes_and_layout(all_word_boxes, all_page_layouts, wordboxes_file, layout_file, file_stats)
            
            # 🆕 AUTOMATICALLY GENERATE LAYOUTLMV3-COMPATIBLE OUTPUT (if requested)
            if generate_layoutlmv3:
                try:
                    print(f"  🤖 Generating LayoutLMv3 output...")
                    layoutlmv3_data = end(output_filter="layout", input_data=layout_file, output_file=layoutlmv3_file)
                    print(f"  ✅ Generated LayoutLMv3 output: {os.path.basename(layoutlmv3_file)}")
                    
                    # Add LayoutLMv3 stats to file_stats
                    total_layoutlmv3_words = sum(len(page['words']) for page in layoutlmv3_data['page_layouts'])
                    file_stats['layoutlmv3_words'] = total_layoutlmv3_words
                    print(f"  📊 LayoutLMv3: {total_layoutlmv3_words} words across {len(layoutlmv3_data['page_layouts'])} pages")
                    
                except Exception as e:
                    print(f"  ⚠️  Failed to generate LayoutLMv3 output: {e}")
                    file_stats['layoutlmv3_error'] = str(e)
            else:
                print(f"  ⏭️  Skipping LayoutLMv3 generation (generate_layoutlmv3=False)")
            
    except Exception as e:
        print(f"  ❌ Error processing {filename}: {e}")
        file_stats['error'] = str(e)
    
    file_stats['processing_time'] = (datetime.now() - start_time).total_seconds()
    return file_stats

print("✅ Enhanced process_single_pdf_enhanced function defined!")
print("🔧 This function includes the optional generate_layoutlmv3 parameter")


In [None]:
# ENHANCED VERSION: process_all_pdfs with LayoutLMv3 integration
def process_all_pdfs_enhanced(generate_layoutlmv3=True):
    """Enhanced version of process_all_pdfs that includes optional LayoutLMv3 generation
    
    Args:
        generate_layoutlmv3 (bool, optional): Whether to generate LayoutLMv3-compatible output for all files. Defaults to True.
    """
    # Get all PDF files
    pdf_files = glob.glob(os.path.join(raw_pdf_dir, "*.pdf"))
    print(f"🔍 Found {len(pdf_files)} PDF files to process{' (with LayoutLMv3 generation)' if generate_layoutlmv3 else ''}")
    
    processing_stats = {
        'total_files': len(pdf_files),
        'processed_files': 0,
        'failed_files': 0,
        'total_pages': 0,
        'total_word_boxes': 0,
        'total_layoutlmv3_words': 0,  # NEW!
        'pdfplumber_pages': 0,
        'ocr_pages': 0,
        'poor_quality_pages': 0,
        'layoutlmv3_success': 0,  # NEW!
        'layoutlmv3_failures': 0  # NEW!
    }
    
    for pdf_path in pdf_files:
        filename = os.path.basename(pdf_path)
        year = filename.split('_')[2][:4]
        
        print(f"\n📄 Processing: {filename}")
        print(f"📅 Year: {year}")
        
        # Create year-specific output directory
        year_output_dir = os.path.join(parsed_output_dir, year)
        os.makedirs(year_output_dir, exist_ok=True)
        
        table_settings = {
            "vertical_strategy": "lines",
            "horizontal_strategy": "lines",
            "snap_x_tolerance": 10,
            "snap_y_tolerance": 10,
            "join_tolerance": 3,
            "edge_min_length": 3,
            "min_words_vertical": 3,
            "min_words_horizontal": 1,
            "intersection_tolerance": 3,
            "text_tolerance": 3,
            "text_x_tolerance": 3,
            "text_y_tolerance": 3
        }
        
        # Process the PDF with optional LayoutLMv3 generation
        file_stats = process_single_pdf_enhanced(
            pdf_path, 
            year_output_dir, 
            filename, 
            table_settings, 
            generate_layoutlmv3=generate_layoutlmv3  # Pass the parameter
        )
        
        # Update overall statistics with safe key access
        processing_stats['processed_files'] += 1
        processing_stats['total_pages'] += file_stats.get('total_pages', 0)
        processing_stats['total_word_boxes'] += file_stats.get('total_word_boxes', 0)
        processing_stats['pdfplumber_pages'] += file_stats.get('pdfplumber_pages', 0)
        processing_stats['ocr_pages'] += file_stats.get('ocr_pages', 0)
        processing_stats['poor_quality_pages'] += file_stats.get('poor_quality_pages', 0)
        
        # NEW: LayoutLMv3 statistics (only if requested)
        if generate_layoutlmv3:
            if 'layoutlmv3_words' in file_stats:
                processing_stats['total_layoutlmv3_words'] += file_stats['layoutlmv3_words']
                processing_stats['layoutlmv3_success'] += 1
            else:
                processing_stats['layoutlmv3_failures'] += 1
    
    # Print enhanced final statistics
    print_processing_summary_enhanced(processing_stats, generate_layoutlmv3)

def print_processing_summary_enhanced(processing_stats, include_layoutlmv3=True):
    """Enhanced processing summary that optionally includes LayoutLMv3 statistics"""
    
    print(f"\n{'='*80}")
    print(f"📊 PDF PROCESSING SUMMARY{' (WITH LAYOUTLMV3)' if include_layoutlmv3 else ''}")
    print(f"{'='*80}")
    
    # File statistics
    print(f"📁 Files Processed:")
    print(f"  Total files: {processing_stats['total_files']}")
    print(f"  Successfully processed: {processing_stats['processed_files']}")
    print(f"  Failed: {processing_stats['failed_files']}")
    print(f"  Success rate: {(processing_stats['processed_files']/processing_stats['total_files'])*100:.1f}%")
    
    # Page statistics
    print(f"\n📄 Page Statistics:")
    print(f"  Total pages processed: {processing_stats['total_pages']}")
    print(f"  PDFplumber extractions: {processing_stats['pdfplumber_pages']}")
    print(f"  OCR extractions: {processing_stats['ocr_pages']}")
    print(f"  Poor quality pages: {processing_stats['poor_quality_pages']}")
    
    # Word box statistics
    print(f"\n📦 Word Box Statistics:")
    print(f"  Total word boxes extracted: {processing_stats['total_word_boxes']}")
    print(f"  Average word boxes per page: {processing_stats['total_word_boxes']/processing_stats['total_pages']:.1f}")
    
    # LayoutLMv3 statistics (only if included)
    if include_layoutlmv3:
        print(f"\n🤖 LayoutLMv3 Statistics:")
        print(f"  Files with LayoutLMv3 output: {processing_stats['layoutlmv3_success']}")
        print(f"  LayoutLMv3 generation failures: {processing_stats['layoutlmv3_failures']}")
        print(f"  Total LayoutLMv3 words: {processing_stats['total_layoutlmv3_words']}")
        if processing_stats['processed_files'] > 0:
            print(f"  LayoutLMv3 success rate: {(processing_stats['layoutlmv3_success']/processing_stats['processed_files'])*100:.1f}%")
        if processing_stats['layoutlmv3_success'] > 0:
            print(f"  Average LayoutLMv3 words per file: {processing_stats['total_layoutlmv3_words']/processing_stats['layoutlmv3_success']:.1f}")
    
    # Quality metrics
    if processing_stats['total_pages'] > 0:
        pdfplumber_rate = (processing_stats['pdfplumber_pages'] / processing_stats['total_pages']) * 100
        ocr_rate = (processing_stats['ocr_pages'] / processing_stats['total_pages']) * 100
        poor_quality_rate = (processing_stats['poor_quality_pages'] / processing_stats['total_pages']) * 100
        
        print(f"\n⭐ Quality Metrics:")
        print(f"  PDFplumber success rate: {pdfplumber_rate:.1f}%")
        print(f"  OCR fallback rate: {ocr_rate:.1f}%")
        print(f"  Poor quality rate: {poor_quality_rate:.1f}%")
    
    # Performance metrics
    print(f"\n⚡ Performance:")
    print(f"  Average pages per file: {processing_stats['total_pages']/processing_stats['processed_files']:.1f}")
    print(f"  Average word boxes per file: {processing_stats['total_word_boxes']/processing_stats['processed_files']:.1f}")
    
    print(f"\n{'='*80}")
    print(f"🎉 Processing completed successfully!")
    print(f"📦 Word boxes and layout data saved")
    if include_layoutlmv3:
        print(f"🤖 LayoutLMv3 files ready for model input")
    print(f"{'='*80}")

print("✅ Enhanced processing functions defined!")
print("📝 Usage examples:")
print("  - process_all_pdfs_enhanced()  # With LayoutLMv3 (default)")
print("  - process_all_pdfs_enhanced(generate_layoutlmv3=False)  # Skip LayoutLMv3")
print("  - process_single_pdf_enhanced(pdf_path, output_dir, filename, settings)  # Single file with LayoutLMv3")
print("  - process_single_pdf_enhanced(pdf_path, output_dir, filename, settings, generate_layoutlmv3=False)  # Single file without LayoutLMv3")


In [9]:
def analyze_page_layout(word_boxes, page_width, page_height):
    """Analyze page layout and determine reading order with comprehensive metrics"""
    
    if not word_boxes:
        return {
            'columns': 0,
            'rows': 0,
            'text_density': 0,
            'layout_type': 'empty',
            'reading_order': [],
            'avg_font_size': 0,
            'font_size_variance': 0,
            'aspect_ratio': 0,
            'text_flow_analysis': {}
        }
    
    # Calculate text density
    total_text_area = sum(box.area for box in word_boxes)
    page_area = page_width * page_height
    text_density = total_text_area / page_area if page_area > 0 else 0
    
    # Determine layout type based on word distribution
    x_positions = [box.center_x for box in word_boxes]
    y_positions = [box.center_y for box in word_boxes]
    
    # Simple column detection
    x_sorted = sorted(set(x_positions))
    column_gaps = [x_sorted[i+1] - x_sorted[i] for i in range(len(x_sorted)-1)]
    avg_gap = sum(column_gaps) / len(column_gaps) if column_gaps else 0
    
    # Estimate number of columns
    estimated_columns = max(1, int(page_width / (avg_gap + 50)) if avg_gap > 0 else 1)
    
    # Determine reading order
    reading_order = determine_reading_order(word_boxes, estimated_columns)
    
    # Classify layout type
    layout_type = classify_layout_type(word_boxes, estimated_columns, text_density)
    
    # Calculate additional metrics
    font_sizes = [box.fontsize for box in word_boxes if box.fontsize is not None]
    avg_font_size = np.mean(font_sizes) if font_sizes else 0
    font_size_variance = np.var(font_sizes) if len(font_sizes) > 1 else 0
    
    x_spread = max(x_positions) - min(x_positions) if x_positions else 0
    y_spread = max(y_positions) - min(y_positions) if y_positions else 0
    aspect_ratio = x_spread / y_spread if y_spread > 0 else 0
    
    return {
        'columns': estimated_columns,
        'rows': len(set(y_positions)),
        'text_density': text_density,
        'layout_type': layout_type,
        'reading_order': reading_order,
        'avg_font_size': avg_font_size,
        'font_size_variance': font_size_variance,
        'aspect_ratio': aspect_ratio,
        'text_flow_analysis': {
            'x_spread': x_spread,
            'y_spread': y_spread,
            'word_count': len(word_boxes),
            'unique_x_positions': len(set(x_positions)),
            'unique_y_positions': len(set(y_positions))
        }
    }

In [10]:
def analyze_document_positioning(word_boxes):
    """Analyze document-level positioning and text orientation"""
    
    if not word_boxes:
        return {
            'total_document_height': 0,
            'rotated_text_count': 0,
            'normal_text_count': 0,
            'position_analysis': {}
        }
    
    # Analyze document positioning
    doctop_values = [box.doctop for box in word_boxes if box.doctop is not None]
    upright_values = [box.upright for box in word_boxes if box.upright is not None]
    
    analysis = {
        'total_document_height': max(doctop_values) if doctop_values else 0,
        'rotated_text_count': sum(1 for upright in upright_values if upright is False),
        'normal_text_count': sum(1 for upright in upright_values if upright is True),
        'position_analysis': {
            'min_doctop': min(doctop_values) if doctop_values else 0,
            'max_doctop': max(doctop_values) if doctop_values else 0,
            'avg_doctop': np.mean(doctop_values) if doctop_values else 0,
            'rotation_rate': sum(1 for upright in upright_values if upright is False) / len(upright_values) if upright_values else 0
        }
    }
    
    return analysis

In [11]:
def get_rotated_text_boxes(word_boxes):
    """Extract and analyze rotated text boxes"""
    
    rotated_boxes = [box for box in word_boxes if box.upright is False]
    
    if not rotated_boxes:
        return {
            'rotated_boxes': [],
            'rotation_analysis': {
                'count': 0,
                'percentage': 0,
                'common_rotations': []
            }
        }
    
    # Analyze rotation patterns
    rotation_analysis = {
        'count': len(rotated_boxes),
        'percentage': (len(rotated_boxes) / len(word_boxes)) * 100,
        'common_rotations': []  # Could be extended to detect rotation angles
    }
    
    return {
        'rotated_boxes': rotated_boxes,
        'rotation_analysis': rotation_analysis
    }

In [12]:
def extract_word_boxes_ocr(page, page_num):
    """Extract word boxes using OCR with bounding box information"""
    
    word_boxes = []
    
    try:
        # Convert page to image with high resolution for better OCR
        page_image = page.to_image(resolution=300)
        pil_image = page_image.original
        
        # Get detailed OCR data with bounding boxes
        ocr_data = pytesseract.image_to_data(
            pil_image, 
            lang='eng',
            output_type=pytesseract.Output.DICT
        )
        
        # Process OCR results
        for idx, (text, conf, x, y, w, h) in enumerate(zip(
            ocr_data['text'], 
            ocr_data['conf'], 
            ocr_data['left'], 
            ocr_data['top'], 
            ocr_data['width'], 
            ocr_data['height']
        )):
            # Filter low confidence and empty text
            if int(conf) > 30 and text.strip():
                word_box = WordBox(
                    text=text.strip(),
                    x0=float(x),
                    y0=float(y),
                    x1=float(x + w),
                    y1=float(y + h),
                    width=float(w),
                    height=float(h),
                    confidence=float(conf),
                    page_number=page_num,
                    word_index=idx
                )
                word_boxes.append(word_box)
        
        print(f"      📝 OCR extracted {len(word_boxes)} word boxes from page {page_num}")
    
    except Exception as e:
        print(f"      ❌ OCR word box extraction failed for page {page_num}: {e}")
    
    return word_boxes

In [13]:
def determine_reading_order(word_boxes, estimated_columns):
    """Determine reading order of words (top-to-bottom, left-to-right)"""
    
    if not word_boxes:
        return []
    
    # Method 1: Simple top-to-bottom, left-to-right sorting
    def simple_reading_order():
        # Sort by Y position first (top to bottom), then by X position (left to right)
        sorted_boxes = sorted(word_boxes, key=lambda box: (box.y0, box.x0))
        return [box.word_index for box in sorted_boxes]
    
    # Method 2: Column-aware reading order
    def column_aware_reading_order():
        if estimated_columns <= 1:
            return simple_reading_order()
        
        # Group words by estimated columns
        page_width = max(box.x1 for box in word_boxes) if word_boxes else 0
        column_width = page_width / estimated_columns
        
        # Assign words to columns
        column_groups = [[] for _ in range(estimated_columns)]
        
        for box in word_boxes:
            column_idx = min(int(box.center_x / column_width), estimated_columns - 1)
            column_groups[column_idx].append(box)
        
        # Sort each column and combine
        reading_order = []
        for column in column_groups:
            column_sorted = sorted(column, key=lambda box: box.y0)
            reading_order.extend([box.word_index for box in column_sorted])
        
        return reading_order
    
    # Method 3: Advanced reading order with line detection
    def advanced_reading_order():
        if len(word_boxes) < 10:  # Too few words for advanced analysis
            return simple_reading_order()
        
        # Group words into lines based on Y-coordinate clustering
        y_positions = [box.y0 for box in word_boxes]
        y_sorted = sorted(set(y_positions))
        
        # Detect line breaks (significant Y gaps)
        line_breaks = []
        for i in range(len(y_sorted) - 1):
            if y_sorted[i+1] - y_sorted[i] > 15:  # Threshold for line break
                line_breaks.append((y_sorted[i] + y_sorted[i+1]) / 2)
        
        # Group words into lines
        lines = []
        current_line = []
        
        for box in sorted(word_boxes, key=lambda b: b.y0):
            if not current_line or not any(abs(box.y0 - line_box.y0) < 10 for line_box in current_line):
                if current_line:
                    lines.append(current_line)
                current_line = [box]
            else:
                current_line.append(box)
        
        if current_line:
            lines.append(current_line)
        
        # Sort each line left-to-right, then combine lines
        reading_order = []
        for line in lines:
            line_sorted = sorted(line, key=lambda box: box.x0)
            reading_order.extend([box.word_index for box in line_sorted])
        
        return reading_order
    
    # Choose method based on complexity
    if estimated_columns > 2 and len(word_boxes) > 50:
        return advanced_reading_order()
    elif estimated_columns > 1:
        return column_aware_reading_order()
    else:
        return simple_reading_order()

In [14]:
def classify_layout_type(word_boxes, estimated_columns, text_density):
    """Classify the layout type based on word distribution and density"""
    
    if not word_boxes:
        return 'empty'
    
    # Analyze word distribution
    x_positions = [box.center_x for box in word_boxes]
    y_positions = [box.center_y for box in word_boxes]
    
    # Calculate spreads and statistics
    x_spread = max(x_positions) - min(x_positions) if x_positions else 0
    y_spread = max(y_positions) - min(y_positions) if y_positions else 0
    aspect_ratio = x_spread / y_spread if y_spread > 0 else 0
    
    # Analyze font size distribution
    font_sizes = [box.fontsize for box in word_boxes if box.fontsize is not None]
    avg_font_size = np.mean(font_sizes) if font_sizes else 12
    font_size_variance = np.var(font_sizes) if len(font_sizes) > 1 else 0
    
    # Analyze text density patterns
    density_thresholds = {
        'very_sparse': 0.05,
        'sparse': 0.15,
        'normal': 0.35,
        'dense': 0.55,
        'very_dense': 0.75
    }
    
    # Classify based on multiple criteria
    if text_density < density_thresholds['very_sparse']:
        return 'very_sparse'
    elif text_density < density_thresholds['sparse']:
        return 'sparse'
    elif estimated_columns == 1:
        if aspect_ratio < 0.3:
            return 'narrow_single_column'
        elif font_size_variance > 50:  # High variance in font sizes
            return 'mixed_formatting_single_column'
        else:
            return 'single_column'
    elif estimated_columns == 2:
        if font_size_variance > 50:
            return 'mixed_formatting_two_column'
        else:
            return 'two_column'
    elif estimated_columns >= 3:
        if text_density > density_thresholds['dense']:
            return 'dense_multi_column'
        else:
            return 'multi_column'
    elif x_spread < y_spread * 0.4:
        return 'narrow_column'
    elif font_size_variance > 100:  # Very high variance
        return 'complex_mixed_layout'
    elif text_density > density_thresholds['very_dense']:
        return 'very_dense_layout'
    else:
        return 'mixed_layout'

In [15]:
def extract_and_integrate_tables(pdf_path, output_dir, page_num=0, settings=None):
    if settings is None:
        settings = {
            "vertical_strategy": "lines",
            "horizontal_strategy": "lines",
            "snap_x_tolerance": 10,
            "snap_y_tolerance": 10,
            "join_tolerance": 3,
            "edge_min_length": 3,
            "min_words_vertical": 3,
            "min_words_horizontal": 1,
            "intersection_tolerance": 3,
            "text_tolerance": 3,
            "text_x_tolerance": 3,
            "text_y_tolerance": 3
        }
    os.makedirs(output_dir, exist_ok=True)
    with pdfplumber.open(pdf_path) as pdf:
        page = pdf.pages[page_num]
        tables = page.extract_tables(settings)
        
        # Group tables by their header (first row)
        table_groups = {}
        for table in tables:
            if not table or not table[0]:
                continue
            header = tuple(table[0])
            rows = table[1:]
            if header not in table_groups:
                table_groups[header] = []
            table_groups[header].extend(rows)
        
        # Export each group as a single CSV
        for idx, (header, rows) in enumerate(table_groups.items()):
            df = pd.DataFrame(rows)
            csv_path = os.path.join(output_dir, f"integrated_table_{idx+1}.csv")
            df.to_csv(csv_path, index=False)
            print(f"Exported integrated table {idx+1} to {csv_path}")


In [16]:
def process_single_pdf(pdf_path, output_dir, filename, table_settings):
    """Process a single PDF file with comprehensive quality assessment and word box extraction"""
     # Create output directory for tables
    tables_dir = os.path.join(output_dir, "tables")
    os.makedirs(tables_dir, exist_ok=True)
    
    # (Optional) Extract tables from all pages
    with pdfplumber.open(pdf_path) as pdf:
        for page_num in range(len(pdf.pages)):
            extract_and_integrate_tables(
                pdf_path=pdf_path,
                output_dir=tables_dir,
                page_num=page_num,
                settings=table_settings
            )
    base_name = filename.replace('.pdf', '')
    output_file = os.path.join(output_dir, f"{base_name}_extracted.txt")
    metadata_file = os.path.join(output_dir, f"{base_name}_metadata.json")
    wordboxes_file = os.path.join(output_dir, f"{base_name}_wordboxes.json")
    layout_file = os.path.join(output_dir, f"{base_name}_layout.json")
    
    # Skip if already processed
    if (os.path.exists(output_file) and os.path.exists(metadata_file) and 
        os.path.exists(wordboxes_file) and os.path.exists(layout_file)):
        print(f"  ⏩ Already processed: {base_name}")
        return load_existing_stats(metadata_file)
    
    file_stats = {
        'filename': filename,
        'total_pages': 0,
        'pdfplumber_pages': 0,
        'ocr_pages': 0,
        'poor_quality_pages': 0,
        'total_word_boxes': 0,
        'processing_time': 0,
        'page_details': []
    }
    
    start_time = datetime.now()
    
    try:
        with pdfplumber.open(pdf_path) as pdf:
            print(f"  📄 Total pages: {len(pdf.pages)}")
            file_stats['total_pages'] = len(pdf.pages)
            
            extracted_pages = []
            all_word_boxes = []
            all_page_layouts = []
            
            # Process each page with quality assessment and word box extraction
            for page_num, page in enumerate(pdf.pages, 1):
                page_result = extract_page_with_quality_check(page, page_num)
                extracted_pages.append(page_result)
                all_word_boxes.extend(page_result['word_boxes'])
                all_page_layouts.append(page_result['page_layout'])
                file_stats['page_details'].append(page_result['metadata'])
                
                # Update statistics
                if page_result['metadata']['method'] == 'pdfplumber':
                    file_stats['pdfplumber_pages'] += 1
                else:
                    file_stats['ocr_pages'] += 1
                
                if page_result['metadata']['quality_flag']:
                    file_stats['poor_quality_pages'] += 1
                
                file_stats['total_word_boxes'] += len(page_result['word_boxes'])
                
                # Progress indicator
                if page_num % 20 == 0:
                    print(f"    📖 Processed {page_num} pages...")
            
            # Save extracted content, word boxes, and layout data
            save_extracted_content(extracted_pages, output_file, metadata_file, file_stats)
            save_word_boxes_and_layout(all_word_boxes, all_page_layouts, wordboxes_file, layout_file, file_stats)
            
    except Exception as e:
        print(f"  ❌ Error processing {filename}: {e}")
        file_stats['error'] = str(e)
    
    file_stats['processing_time'] = (datetime.now() - start_time).total_seconds()
    return file_stats

In [17]:
def extract_page_with_quality_check(page, page_num):
    """Extract text and word boxes from a single page with comprehensive quality assessment"""
    
    # Try pdfplumber first
    text = page.extract_text()
    
    # Quality assessment metrics
    char_count = len(text) if text else 0
    word_count = len(text.split()) if text else 0
    line_count = len(text.split('\n')) if text else 0
    
    # Quality thresholds (adjust based on your needs)
    min_chars_per_page = 100
    max_chars_per_page = 10000
    min_words_per_page = 20
    
    quality_metrics = {
        'char_count': char_count,
        'word_count': word_count,
        'line_count': line_count,
        'char_density': char_count / (page.width * page.height) if hasattr(page, 'width') else 0,
        'quality_score': 0
    }
    
    # Determine quality and extraction method
    if not text or char_count < min_chars_per_page:
        # Poor extraction, try OCR
        print(f"    ⚠️  Page {page_num}: Poor pdfplumber extraction ({char_count} chars), trying OCR...")
        text = extract_with_ocr(page)
        method = 'tesseract'
        quality_flag = True
        
        # Recalculate metrics for OCR text
        quality_metrics.update({
            'char_count': len(text) if text else 0,
            'word_count': len(text.split()) if text else 0,
            'line_count': len(text.split('\n')) if text else 0
        })
        
    elif char_count > max_chars_per_page:
        # Suspiciously long text (might be garbled)
        print(f"    ⚠️  Page {page_num}: Suspiciously long text ({char_count} chars)")
        method = 'pdfplumber'
        quality_flag = True
        
    else:
        # Good extraction
        method = 'pdfplumber'
        quality_flag = False
    
    # Calculate quality score (0-100)
    quality_score = calculate_quality_score(quality_metrics, method)
    quality_metrics['quality_score'] = quality_score
    
    # Extract word boxes and perform layout analysis
    word_boxes, text_blocks = extract_word_boxes_with_layout(page, page_num, method)
    
    # Analyze page layout
    layout_analysis = analyze_page_layout(word_boxes, page.width, page.height)
    
    # Create PageLayout object
    page_layout = PageLayout(
        page_number=page_num,
        page_width=page.width,
        page_height=page.height,
        word_boxes=word_boxes,
        text_blocks=text_blocks,
        reading_order=layout_analysis['reading_order'],
        layout_analysis=layout_analysis
    )
    
    # Create metadata
    metadata = {
        'page_number': page_num,
        'method': method,
        'quality_flag': quality_flag,
        'quality_score': quality_score,
        'extraction_timestamp': datetime.now().isoformat(),
        'word_count': len(word_boxes),
        'layout_type': layout_analysis['layout_type'],
        'estimated_columns': layout_analysis['columns'],
        'text_density': layout_analysis['text_density'],
        **quality_metrics
    }
    
    return {
        'text': text or "",
        'word_boxes': word_boxes,
        'page_layout': page_layout,
        'metadata': metadata
    }

In [18]:
def save_word_boxes_and_layout(all_word_boxes, all_page_layouts, wordboxes_file, layout_file, file_stats):
    """Save word boxes and layout data as JSON files"""
    
    # Save word boxes data
    word_boxes_data = {
        'file_info': {
            'filename': file_stats['filename'],
            'processing_date': datetime.now().isoformat(),
            'total_word_boxes': len(all_word_boxes),
            'total_pages': file_stats['total_pages']
        },
        'word_boxes': [box.to_dict() for box in all_word_boxes],
        'statistics': {
            'total_word_boxes': len(all_word_boxes),
            'avg_word_boxes_per_page': len(all_word_boxes) / file_stats['total_pages'] if file_stats['total_pages'] > 0 else 0,
            'pages_with_word_boxes': len([layout for layout in all_page_layouts if layout.word_boxes])
        }
    }
    
    with open(wordboxes_file, 'w', encoding='utf-8') as f:
        json.dump(word_boxes_data, f, indent=2, ensure_ascii=False)
    
    # Save layout data
    layout_data = {
        'file_info': {
            'filename': file_stats['filename'],
            'processing_date': datetime.now().isoformat(),
            'total_pages': file_stats['total_pages']
        },
        'page_layouts': [layout.to_dict() for layout in all_page_layouts],
        'document_analysis': analyze_document_layout(all_page_layouts)
    }
    
    with open(layout_file, 'w', encoding='utf-8') as f:
        json.dump(layout_data, f, indent=2, ensure_ascii=False)
    
    print(f"  📦 Saved word boxes: {os.path.basename(wordboxes_file)}")
    print(f"  📐 Saved layout data: {os.path.basename(layout_file)}")
    print(f"  📊 Total word boxes: {len(all_word_boxes)}")

In [19]:
def analyze_document_layout(page_layouts):
    """Analyze overall document layout across all pages"""
    
    if not page_layouts:
        return {'document_type': 'empty', 'analysis': {}}
    
    # Collect statistics across all pages
    layout_types = [layout.layout_type for layout in page_layouts]
    column_counts = [layout.estimated_columns for layout in page_layouts]
    text_densities = [layout.text_density for layout in page_layouts]
    font_sizes = [layout.average_font_size for layout in page_layouts if layout.average_font_size > 0]
    
    # Analyze document characteristics
    most_common_layout = max(set(layout_types), key=layout_types.count) if layout_types else 'unknown'
    avg_columns = np.mean(column_counts) if column_counts else 1
    avg_text_density = np.mean(text_densities) if text_densities else 0
    avg_font_size = np.mean(font_sizes) if font_sizes else 0
    
    # Determine document type
    if most_common_layout in ['single_column', 'narrow_single_column']:
        document_type = 'single_column_document'
    elif most_common_layout in ['two_column', 'mixed_formatting_two_column']:
        document_type = 'two_column_document'
    elif most_common_layout in ['multi_column', 'dense_multi_column']:
        document_type = 'multi_column_document'
    else:
        document_type = 'mixed_layout_document'
    
    return {
        'document_type': document_type,
        'most_common_layout': most_common_layout,
        'average_columns': avg_columns,
        'average_text_density': avg_text_density,
        'average_font_size': avg_font_size,
        'layout_distribution': {layout: layout_types.count(layout) for layout in set(layout_types)},
        'column_distribution': {cols: column_counts.count(cols) for cols in set(column_counts)},
        'total_pages': len(page_layouts),
        'pages_with_content': len([layout for layout in page_layouts if layout.word_boxes])
    }

In [20]:
def extract_with_ocr(page):
    """Extract text using Tesseract OCR as fallback"""
    
    try:
        # Convert page to image
        page_image = page.to_image(resolution=300)  # High resolution for better OCR
        
        # Convert to PIL Image
        pil_image = page_image.original
        
        # Use pytesseract for OCR
        text = pytesseract.image_to_string(pil_image, lang='eng')
        
        return text.strip()
        
    except Exception as e:
        print(f"      ❌ OCR failed: {e}")
        return ""

In [21]:
def save_extracted_content(extracted_pages, output_file, metadata_file, file_stats):
    """Save extracted text and metadata with page-level granularity"""
    
    # Save extracted text with page separators
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(f"# Extracted Text from {file_stats['filename']}\n")
        f.write(f"# Processing Date: {datetime.now().isoformat()}\n")
        f.write(f"# Total Pages: {file_stats['total_pages']}\n")
        f.write(f"# PDFplumber Pages: {file_stats['pdfplumber_pages']}\n")
        f.write(f"# OCR Pages: {file_stats['ocr_pages']}\n")
        f.write(f"# Poor Quality Pages: {file_stats['poor_quality_pages']}\n\n")
        
        for page_data in extracted_pages:
            page_num = page_data['metadata']['page_number']
            method = page_data['metadata']['method']
            quality_score = page_data['metadata']['quality_score']
            
            f.write(f"\n{'='*80}\n")
            f.write(f"PAGE {page_num} | Method: {method} | Quality Score: {quality_score}\n")
            f.write(f"{'='*80}\n\n")
            f.write(page_data['text'])
            f.write(f"\n\n")
    
    # Save detailed metadata as JSON
    metadata = {
        'file_info': {
            'filename': file_stats['filename'],
            'processing_date': datetime.now().isoformat(),
            'processing_time_seconds': file_stats['processing_time']
        },
        'statistics': {
            'total_pages': file_stats['total_pages'],
            'pdfplumber_pages': file_stats['pdfplumber_pages'],
            'ocr_pages': file_stats['ocr_pages'],
            'poor_quality_pages': file_stats['poor_quality_pages'],
            'success_rate': (file_stats['pdfplumber_pages'] / file_stats['total_pages']) * 100
        },
        'page_details': file_stats['page_details']
    }
    
    with open(metadata_file, 'w', encoding='utf-8') as f:
        json.dump(metadata, f, indent=2, ensure_ascii=False)
    
    print(f"  ✅ Saved: {os.path.basename(output_file)}")
    print(f"  �� Quality: {file_stats['pdfplumber_pages']}/{file_stats['total_pages']} pages via pdfplumber")

In [22]:
def calculate_quality_score(metrics, method):
    """Calculate a quality score (0-100) for the extracted text"""
    
    score = 0
    
    # Character count scoring (0-30 points)
    char_count = metrics['char_count']
    if 200 <= char_count <= 5000:
        score += 30
    elif 100 <= char_count < 200 or 5000 < char_count <= 8000:
        score += 20
    elif 50 <= char_count < 100 or 8000 < char_count <= 10000:
        score += 10
    
    # Word count scoring (0-25 points)
    word_count = metrics['word_count']
    if 50 <= word_count <= 1000:
        score += 25
    elif 25 <= word_count < 50 or 1000 < word_count <= 1500:
        score += 15
    elif 10 <= word_count < 25:
        score += 10
    
    # Line count scoring (0-20 points)
    line_count = metrics['line_count']
    if 10 <= line_count <= 100:
        score += 20
    elif 5 <= line_count < 10 or 100 < line_count <= 150:
        score += 15
    elif 2 <= line_count < 5:
        score += 10
    
    # Method bonus (0-25 points)
    if method == 'pdfplumber':
        score += 25
    else:  # tesseract
        score += 15
    
    return min(score, 100)

In [23]:
def print_processing_summary(processing_stats):
    """Print comprehensive processing summary statistics including word box data"""
    
    print(f"\n{'='*80}")
    print(f"📊 PDF PROCESSING SUMMARY")
    print(f"{'='*80}")
    
    # File statistics
    print(f"📁 Files Processed:")
    print(f"  Total files: {processing_stats['total_files']}")
    print(f"  Successfully processed: {processing_stats['processed_files']}")
    print(f"  Failed: {processing_stats['failed_files']}")
    print(f"  Success rate: {(processing_stats['processed_files']/processing_stats['total_files'])*100:.1f}%")
    
    # Page statistics
    print(f"\n📄 Page Statistics:")
    print(f"  Total pages processed: {processing_stats['total_pages']}")
    print(f"  PDFplumber extractions: {processing_stats['pdfplumber_pages']}")
    print(f"  OCR extractions: {processing_stats['ocr_pages']}")
    print(f"  Poor quality pages: {processing_stats['poor_quality_pages']}")
    
    # Word box statistics
    print(f"\n📦 Word Box Statistics:")
    print(f"  Total word boxes extracted: {processing_stats['total_word_boxes']}")
    print(f"  Average word boxes per page: {processing_stats['total_word_boxes']/processing_stats['total_pages']:.1f}")
    
    # Quality metrics
    if processing_stats['total_pages'] > 0:
        pdfplumber_rate = (processing_stats['pdfplumber_pages'] / processing_stats['total_pages']) * 100
        ocr_rate = (processing_stats['ocr_pages'] / processing_stats['total_pages']) * 100
        poor_quality_rate = (processing_stats['poor_quality_pages'] / processing_stats['total_pages']) * 100
        
        print(f"\n�� Quality Metrics:")
        print(f"  PDFplumber success rate: {pdfplumber_rate:.1f}%")
        print(f"  OCR fallback rate: {ocr_rate:.1f}%")
        print(f"  Poor quality rate: {poor_quality_rate:.1f}%")
    
    # Performance metrics
    print(f"\n⚡ Performance:")
    print(f"  Average pages per file: {processing_stats['total_pages']/processing_stats['processed_files']:.1f}")
    print(f"  Average word boxes per file: {processing_stats['total_word_boxes']/processing_stats['processed_files']:.1f}")
    
    # Recommendations
    print(f"\n💡 Recommendations:")
    if processing_stats['ocr_pages'] > processing_stats['total_pages'] * 0.3:
        print(f"  ⚠️  High OCR usage ({ocr_rate:.1f}%) - consider improving PDF quality")
    if processing_stats['poor_quality_pages'] > processing_stats['total_pages'] * 0.1:
        print(f"  ⚠️  High poor quality rate ({poor_quality_rate:.1f}%) - review extraction parameters")
    if processing_stats['pdfplumber_pages'] > processing_stats['total_pages'] * 0.8:
        print(f"  ✅ Good extraction quality - most pages processed with PDFplumber")
    
    print(f"\n{'='*80}")
    print(f"🎉 Processing completed successfully!")
    print(f"📦 Word boxes and layout data saved for LayoutLMv3 analysis")
    print(f"{'='*80}")

In [24]:
def load_existing_stats(metadata_file):
    """Load existing statistics from metadata file with comprehensive backward compatibility"""
    try:
        with open(metadata_file, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
            
            # Get statistics with fallback values for missing keys
            stats = metadata.get('statistics', {})
            
            # Calculate total_word_boxes from page_details if not present
            total_word_boxes = stats.get('total_word_boxes', 0)
            if total_word_boxes == 0:
                # Try to calculate from page_details if available
                page_details = metadata.get('page_details', [])
                if page_details:
                    total_word_boxes = sum(page.get('word_count', 0) for page in page_details)
            
            return {
                'total_pages': stats.get('total_pages', 0),
                'pdfplumber_pages': stats.get('pdfplumber_pages', 0),
                'ocr_pages': stats.get('ocr_pages', 0),
                'poor_quality_pages': stats.get('poor_quality_pages', 0),
                'total_word_boxes': total_word_boxes
            }
    except FileNotFoundError:
        print(f"  ⚠️  Metadata file not found: {metadata_file}")
        return get_default_stats()
    except json.JSONDecodeError as e:
        print(f"  ⚠️  Invalid JSON in metadata file: {e}")
        return get_default_stats()
    except Exception as e:
        print(f"  ⚠️  Could not load existing stats: {e}")
        return get_default_stats()

def get_default_stats():
    """Return default statistics structure"""
    return {
        'total_pages': 0,
        'pdfplumber_pages': 0,
        'ocr_pages': 0,
        'poor_quality_pages': 0,
        'total_word_boxes': 0
    }

Call

In [None]:
def end(output_filter=None, input_data=None, output_file=None):
    """
    End function that filters and formats the extracted data to match LayoutLMv3 input schema.
    
    Args:
        output_filter (str, optional): Filter type. If "layout", output is filtered to match LayoutLMv3 schema.
        input_data (dict or str, optional): Input data - can be layout file path or loaded data dict.
                                          If None, uses the most recent layout file from parsed MSFT data.
        output_file (str, optional): Output file path. If None, saves to default location.
    
    Returns:
        dict: Filtered data in LayoutLMv3 format (if output_filter="layout")
        dict: Original data (if output_filter is None or other values)
    """
    
    # If no input data provided, find the most recent layout file
    if input_data is None:
        input_data = find_most_recent_layout_file()
    
    # Load data if input_data is a file path
    if isinstance(input_data, str):
        with open(input_data, 'r', encoding='utf-8') as f:
            data = json.load(f)
    else:
        data = input_data
    
    # Apply filter if specified
    if output_filter == "layout":
        filtered_data = transform_to_layoutlmv3_schema(data)
    else:
        filtered_data = data
    
    # Save to output file if specified
    if output_file:
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(filtered_data, f, indent=2, ensure_ascii=False)
        print(f"✅ Saved filtered data to: {output_file}")
    
    return filtered_data


def find_most_recent_layout_file():
    """Find the most recent layout file from the parsed MSFT data."""
    
    msft_parsed_dir = "../data/parsed/MSFT"
    layout_files = []
    
    # Search for layout files in all year subdirectories
    for year_dir in os.listdir(msft_parsed_dir):
        year_path = os.path.join(msft_parsed_dir, year_dir)
        if os.path.isdir(year_path):
            for file in os.listdir(year_path):
                if file.endswith("_layout.json"):
                    layout_files.append(os.path.join(year_path, file))
    
    if not layout_files:
        raise FileNotFoundError("No layout files found in the parsed MSFT data")
    
    # Sort by modification time and return the most recent
    layout_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
    most_recent = layout_files[0]
    
    print(f"📁 Using most recent layout file: {os.path.basename(most_recent)}")
    return most_recent


def transform_to_layoutlmv3_schema(layout_data):
    """
    Transform layout data to match LayoutLMv3 input schema.
    
    Args:
        layout_data (dict): Original layout data from the extraction process
    
    Returns:
        dict: Data formatted according to LayoutLMv3 schema
    """
    
    file_info = layout_data.get('file_info', {})
    page_layouts = layout_data.get('page_layouts', [])
    
    # Create the schema-compliant structure
    layoutlmv3_data = {
        "file_info": {
            "filename": file_info.get('filename', 'unknown'),
            "total_pages": len(page_layouts)
        },
        "page_layouts": []
    }
    
    # Transform each page layout
    for page_layout in page_layouts:
        # Extract basic page information
        page_number = page_layout.get('page_number', 0)
        page_width = page_layout.get('page_width', 612.0)
        page_height = page_layout.get('page_height', 792.0)
        word_boxes = page_layout.get('word_boxes', [])
        
        # Transform word boxes to the required format
        transformed_words = []
        for word_box in word_boxes:
            # Ensure we have the required fields
            text = word_box.get('text', '').strip()
            if not text:  # Skip empty text
                continue
                
            # Get bounding box coordinates
            x0 = word_box.get('x0', 0)
            y0 = word_box.get('y0', 0)
            x1 = word_box.get('x1', 0)
            y1 = word_box.get('y1', 0)
            
            # Normalize coordinates to 0-1000 range (LayoutLMv3 standard)
            # This assumes the original coordinates are in points/pixels
            normalized_bbox = [
                min(1000, max(0, int((x0 / page_width) * 1000))),
                min(1000, max(0, int((y0 / page_height) * 1000))),
                min(1000, max(0, int((x1 / page_width) * 1000))),
                min(1000, max(0, int((y1 / page_height) * 1000)))
            ]
            
            # Create word entry matching the schema
            word_entry = {
                "text": text,
                "bbox": normalized_bbox
            }
            
            transformed_words.append(word_entry)
        
        # Create page layout entry matching the schema
        page_entry = {
            "page_number": page_number,
            "page_width": float(page_width),
            "page_height": float(page_height),
            "words": transformed_words
        }
        
        layoutlmv3_data["page_layouts"].append(page_entry)
    
    # Print transformation summary
    total_words = sum(len(page["words"]) for page in layoutlmv3_data["page_layouts"])
    print(f"🔄 Transformed {len(layoutlmv3_data['page_layouts'])} pages with {total_words} words to LayoutLMv3 format")
    
    return layoutlmv3_data


def validate_layoutlmv3_schema(data):
    """
    Validate that the data matches the LayoutLMv3 schema requirements.
    
    Args:
        data (dict): Data to validate
    
    Returns:
        tuple: (is_valid, validation_errors)
    """
    
    errors = []
    
    # Check top-level structure
    if not isinstance(data, dict):
        errors.append("Data must be a dictionary")
        return False, errors
    
    # Check required top-level fields
    required_fields = ['file_info', 'page_layouts']
    for field in required_fields:
        if field not in data:
            errors.append(f"Missing required field: {field}")
    
    # Validate file_info
    file_info = data.get('file_info', {})
    if not isinstance(file_info, dict):
        errors.append("file_info must be a dictionary")
    else:
        if 'filename' not in file_info:
            errors.append("file_info missing required field: filename")
        if 'total_pages' not in file_info:
            errors.append("file_info missing required field: total_pages")
        elif not isinstance(file_info['total_pages'], (int, float)):
            errors.append("file_info.total_pages must be a number")
    
    # Validate page_layouts
    page_layouts = data.get('page_layouts', [])
    if not isinstance(page_layouts, list):
        errors.append("page_layouts must be an array")
    else:
        for i, page in enumerate(page_layouts):
            if not isinstance(page, dict):
                errors.append(f"page_layouts[{i}] must be a dictionary")
                continue
            
            # Check required page fields
            page_required = ['page_number', 'page_width', 'page_height', 'words']
            for field in page_required:
                if field not in page:
                    errors.append(f"page_layouts[{i}] missing required field: {field}")
            
            # Validate words array
            words = page.get('words', [])
            if not isinstance(words, list):
                errors.append(f"page_layouts[{i}].words must be an array")
            else:
                for j, word in enumerate(words):
                    if not isinstance(word, dict):
                        errors.append(f"page_layouts[{i}].words[{j}] must be a dictionary")
                        continue
                    
                    # Check required word fields
                    if 'text' not in word:
                        errors.append(f"page_layouts[{i}].words[{j}] missing required field: text")
                    if 'bbox' not in word:
                        errors.append(f"page_layouts[{i}].words[{j}] missing required field: bbox")
                    else:
                        bbox = word['bbox']
                        if not isinstance(bbox, list) or len(bbox) != 4:
                            errors.append(f"page_layouts[{i}].words[{j}].bbox must be an array of 4 numbers")
                        elif not all(isinstance(x, (int, float)) for x in bbox):
                            errors.append(f"page_layouts[{i}].words[{j}].bbox must contain only numbers")
    
    is_valid = len(errors) == 0
    return is_valid, errors


def save_layoutlmv3_output(data, output_file=None, validate=True):
    """
    Save data in LayoutLMv3 format with optional validation.
    
    Args:
        data (dict): Data to save
        output_file (str, optional): Output file path
        validate (bool): Whether to validate schema before saving
    
    Returns:
        str: Path to saved file
    """
    
    # Validate if requested
    if validate:
        is_valid, errors = validate_layoutlmv3_schema(data)
        if not is_valid:
            print("❌ Schema validation failed:")
            for error in errors:
                print(f"  - {error}")
            raise ValueError("Data does not match LayoutLMv3 schema")
        else:
            print("✅ Schema validation passed")
    
    # Determine output file path
    if output_file is None:
        # Create default output file name
        filename = data.get('file_info', {}).get('filename', 'unknown')
        base_name = filename.replace('.pdf', '').replace('.txt', '')
        output_file = f"../data/parsed/layoutlmv3_{base_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    
    # Ensure output directory exists
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    # Save the data
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    
    print(f"💾 LayoutLMv3 data saved to: {output_file}")
    return output_file


In [None]:
# Example 1: Basic usage - Filter most recent layout file to LayoutLMv3 format
print("🔄 Example 1: Converting most recent layout file to LayoutLMv3 format")
try:
    layoutlmv3_output = end(output_filter="layout")
    print(f"✅ Successfully transformed data with {len(layoutlmv3_output['page_layouts'])} pages")
    print(f"📊 Total words across all pages: {sum(len(page['words']) for page in layoutlmv3_output['page_layouts'])}")
except Exception as e:
    print(f"❌ Error: {e}")

print("\n" + "="*80 + "\n")

# Example 2: Specify a specific layout file
print("🔄 Example 2: Converting specific layout file to LayoutLMv3 format")
try:
    # You can specify a specific layout file path
    specific_layout_file = "../data/parsed/MSFT/2022/MSFT_10-K_20220728_000156459022026876_layout.json"
    if os.path.exists(specific_layout_file):
        layoutlmv3_output = end(
            output_filter="layout", 
            input_data=specific_layout_file,
            output_file="../data/parsed/example_layoutlmv3_output.json"
        )
        print(f"✅ Successfully converted specific file")
        print(f"📁 Output saved to: ../data/parsed/example_layoutlmv3_output.json")
    else:
        print("⚠️  Specific layout file not found, skipping this example")
except Exception as e:
    print(f"❌ Error: {e}")

print("\n" + "="*80 + "\n")

# Example 3: Return original data without filtering
print("🔄 Example 3: Return original data without filtering")
try:
    original_data = end()  # No filter specified
    print(f"✅ Retrieved original data structure")
    print(f"📋 Available keys: {list(original_data.keys())}")
except Exception as e:
    print(f"❌ Error: {e}")


## Integration with Main Processing Pipeline

To automatically generate LayoutLMv3-compatible output during the main PDF processing, you can modify the `process_single_pdf` function to include the `end()` function call. Here's how to integrate it:


In [25]:
process_all_pdfs()

🔍 Found 3 PDF files to process

📄 Processing: MSFT_10-K_20230727_000095017023035122.pdf
📅 Year: 2023
Exported integrated table 1 to ../data/parsed/MSFT/2023/tables/integrated_table_1.csv
Exported integrated table 2 to ../data/parsed/MSFT/2023/tables/integrated_table_2.csv
Exported integrated table 1 to ../data/parsed/MSFT/2023/tables/integrated_table_1.csv
Exported integrated table 2 to ../data/parsed/MSFT/2023/tables/integrated_table_2.csv
Exported integrated table 3 to ../data/parsed/MSFT/2023/tables/integrated_table_3.csv
Exported integrated table 1 to ../data/parsed/MSFT/2023/tables/integrated_table_1.csv
Exported integrated table 2 to ../data/parsed/MSFT/2023/tables/integrated_table_2.csv
Exported integrated table 3 to ../data/parsed/MSFT/2023/tables/integrated_table_3.csv
Exported integrated table 4 to ../data/parsed/MSFT/2023/tables/integrated_table_4.csv
Exported integrated table 5 to ../data/parsed/MSFT/2023/tables/integrated_table_5.csv
Exported integrated table 1 to ../data/

Use the below cells to refresh the script run

In [10]:


import shutil
import os

# Path to the MSFT folder within parsed directory
msft_path = "../data/parsed/MSFT"
tables_path = "../data/tables"
tabula_path = "../data/parsed/tabula_output/"
# Check if directory exists before attempting removal
if os.path.exists(msft_path):
    print(f"Removing {msft_path}...")
    try:
        shutil.rmtree(msft_path)
        shutil.rmtree(tabula_path)
        shutil.rmtree(tables_path)
        print("✅ MSFT folder successfully removed")
    except Exception as e:
        print(f"❌ Error removing MSFT folder: {e}")
else:
    print("⚠️ MSFT folder not found in parsed directory")


⚠️ MSFT folder not found in parsed directory
