# Document Processing Pipeline
## Extract and Chunk PDFs using Amazon Textract Textractor and LangChain

**Purpose**: Process 6 sample PDF documents through Textractor extraction and LangChain recursive splitting to prepare for BM25 indexing.

**Library**: Uses `amazon-textract-textractor` for layout-aware text extraction

**Outputs**: 
- Raw extracted text (saved as checkpoint)
- Post-processed text (saved as checkpoint)
- Processed chunks with metadata (ready for indexing)

**Next Step**: `02_indexing.ipynb`

---
## 1. Setup & Imports

In [None]:
import json
import boto3
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any
import pandas as pd
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Amazon Textract Textractor library
from textractor import Textractor
from textractor.data.constants import TextractFeatures

# Display settings
pd.set_option('display.max_colwidth', 100)

# Logging setup
import logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

print("‚úì Imports completed")
print("  - Using amazon-textract-textractor library for extraction")

In [None]:
# Initialize AWS clients
session = boto3.Session()
s3_client = session.client('s3')

# Verify credentials
sts = session.client('sts')
identity = sts.get_caller_identity()
print(f"‚úì AWS Identity: {identity['Arn']}")
print(f"‚úì Region: {session.region_name}")

---
## 2. Configuration

Centralize all parameters for reproducibility and easy experimentation.

In [None]:
class ProcessingConfig:
    """Configuration for document processing pipeline."""
    
    # Document sources
    PDF_SOURCE_TYPE = "s3"  # or "local"
    S3_BUCKET = "your-bucket-name"
    S3_PREFIX = "raw-pdfs/"
    LOCAL_PDF_DIR = "./data/pdfs/"
    
    # Output paths
    CHECKPOINT_DIR = "./checkpoints/"
    RAW_EXTRACTION_FILE = "raw_extractions.json"
    PROCESSED_EXTRACTIONS_FILE = "processed_extractions.json"
    PROCESSED_CHUNKS_FILE = "processed_chunks.json"
    
    # Textract settings (using amazon-textract-textractor)
    # Features: LAYOUT, TABLES, FORMS, SIGNATURES, QUERIES
    TEXTRACT_FEATURES = ["LAYOUT", "TABLES"]  # List of feature names as strings
    
    # Chunking parameters
    CHUNK_SIZE = 1000
    CHUNK_OVERLAP = 200
    SEPARATORS = ["\n\n", "\n", ". ", " ", ""]
    LENGTH_FUNCTION = len
    
    # Processing
    BATCH_SIZE = 6  # Number of documents
    PROCESSING_TIMESTAMP = datetime.utcnow().isoformat()
    
    @classmethod
    def to_dict(cls) -> Dict[str, Any]:
        """Export config as dictionary for logging."""
        return {
            k: v for k, v in cls.__dict__.items() 
            if not k.startswith('_') and not callable(v)
        }
    
    @classmethod
    def save(cls, filepath: str):
        """Save configuration to JSON."""
        with open(filepath, 'w') as f:
            json.dump(cls.to_dict(), f, indent=2)

# Display current configuration
config_df = pd.DataFrame([
    {"Parameter": k, "Value": v} 
    for k, v in ProcessingConfig.to_dict().items()
])
print("\nüìã Current Configuration:")
display(config_df)

In [None]:
# Create checkpoint directory
checkpoint_dir = Path(ProcessingConfig.CHECKPOINT_DIR)
checkpoint_dir.mkdir(parents=True, exist_ok=True)

# Save config for reproducibility
ProcessingConfig.save(checkpoint_dir / "config.json")
print(f"‚úì Configuration saved to {checkpoint_dir / 'config.json'}")

---
## 3. Processing Classes

Three separate classes for extraction, processing, and chunking.

**Design Pattern:**
- `TextExtractor`: PDF ‚Üí Raw text (via Textractor)
- `TextProcessor`: Raw text ‚Üí Cleaned/processed text
- `TextChunker`: Processed text ‚Üí Chunks

This separation allows independent testing, customization, and checkpointing at each stage.

### 3.1 TextExtractor Class

In [None]:
class TextExtractor:
    """
    Handles PDF extraction using amazon-textract-textractor library.
    
    The Textractor library provides:
    - High-level interface to Textract async API (start_document_analysis)
    - Automatic polling and result retrieval
    - Built-in parsing of Textract responses
    - Access to layout elements (titles, headers, paragraphs, tables, etc.)
    - Linearized text in reading order
    """
    
    def __init__(self, config: ProcessingConfig):
        self.config = config
        self.logger = logging.getLogger(self.__class__.__name__)
        
        # Initialize Textractor
        # Will use the default boto3 session/credentials
        self.extractor = Textractor(profile_name=None)
        
    def extract_from_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]:
        """
        Extract text from PDF using Textractor library.
        
        Textractor handles:
        - Calling start_document_analysis (async) or analyze_document (sync)
        - Polling for job completion
        - Retrieving and parsing all result pages
        - Organizing text in reading order
        
        Args:
            pdf_path: S3 URI (s3://bucket/key) or local file path
            doc_id: Unique identifier for this document
            
        Returns:
            Dictionary with extracted text and metadata
        """
        self.logger.info(f"Extracting text from: {pdf_path}")
        
        try:
            # Determine which features to use
            features = [
                getattr(TextractFeatures, f) 
                for f in self.config.TEXTRACT_FEATURES
            ]
            
            self.logger.info(f"Using Textract features: {self.config.TEXTRACT_FEATURES}")
            
            # Call Textractor - it handles async API automatically
            if pdf_path.startswith('s3://'):
                # S3 path - use start_document_analysis (async)
                self.logger.info("Starting async document analysis...")
                document = self.extractor.start_document_analysis(
                    file_source=pdf_path,
                    features=features,
                    save_image=False  # Don't save image overlays in POC
                )
            else:
                # Local file - can use analyze_document (sync) for small files
                self.logger.info("Processing local file with analyze_document (sync)...")
                document = self.extractor.analyze_document(
                    file_source=pdf_path,
                    features=features,
                    save_image=False
                )
            
            self.logger.info("‚úì Document analysis complete, parsing results...")
            
            # Extract page-level information
            pages_data = []
            for page_num, page in enumerate(document.pages, start=1):
                # Get text for this page using Textractor's linearization
                # This respects reading order based on layout
                page_text = page.get_text()
                
                # Get layout information if available
                layout_elements = {}
                if hasattr(page, 'page_layout') and page.page_layout:
                    layout = page.page_layout
                    # Count different layout element types
                    layout_elements = {
                        'titles': len(layout.titles) if hasattr(layout, 'titles') else 0,
                        'headers': len(layout.headers) if hasattr(layout, 'headers') else 0,
                        'sections': len(layout.section_headers) if hasattr(layout, 'section_headers') else 0,
                        'paragraphs': len(layout.texts) if hasattr(layout, 'texts') else 0,
                        'tables': len(layout.tables) if hasattr(layout, 'tables') else 0,
                        'lists': len(layout.lists) if hasattr(layout, 'lists') else 0,
                        'figures': len(layout.figures) if hasattr(layout, 'figures') else 0,
                    }
                
                pages_data.append({
                    'page_number': page_num,
                    'text': page_text,
                    'char_count': len(page_text),
                    'layout_elements': layout_elements
                })
            
            # Compile full document text
            # Using Textractor's get_text() which provides linearized text
            full_text = document.get_text()
            
            # Get table information
            tables_info = []
            for table_idx, table in enumerate(document.tables):
                tables_info.append({
                    'table_index': table_idx,
                    'page': table.page if hasattr(table, 'page') else None,
                    'rows': table.n_rows if hasattr(table, 'n_rows') else 0,
                    'cols': table.n_cols if hasattr(table, 'n_cols') else 0,
                })
            
            result = {
                'doc_id': doc_id,
                'doc_name': Path(pdf_path).name,
                'source_path': pdf_path,
                'full_text': full_text.strip(),
                'pages': pages_data,
                'page_count': len(pages_data),
                'total_char_count': len(full_text),
                'tables': tables_info,
                'table_count': len(tables_info),
                'extraction_timestamp': datetime.utcnow().isoformat(),
                'processed': False,  # Not yet post-processed
                'status': 'success'
            }
            
            self.logger.info(
                f"‚úì Extracted {len(pages_data)} pages, "
                f"{len(full_text):,} characters, "
                f"{len(tables_info)} tables"
            )
            
            return result
            
        except Exception as e:
            self.logger.error(f"‚úó Extraction failed: {str(e)}")
            import traceback
            self.logger.error(traceback.format_exc())
            
            return {
                'doc_id': doc_id,
                'doc_name': Path(pdf_path).name,
                'source_path': pdf_path,
                'status': 'failed',
                'error': str(e),
                'extraction_timestamp': datetime.utcnow().isoformat()
            }
    
    def extract_batch(self, pdf_sources: List[tuple]) -> List[Dict[str, Any]]:
        """
        Extract text from multiple PDFs.
        
        Note: This processes sequentially. For production:
        - Submit all jobs first (store job IDs)
        - Poll all jobs together
        - Process results as they complete
        
        Args:
            pdf_sources: List of (pdf_path, doc_id) tuples
            
        Returns:
            List of extraction result dictionaries
        """
        self.logger.info(f"Extracting batch of {len(pdf_sources)} documents")
        
        results = []
        for pdf_path, doc_id in pdf_sources:
            result = self.extract_from_pdf(pdf_path, doc_id)
            results.append(result)
        
        success_count = sum(1 for r in results if r['status'] == 'success')
        self.logger.info(
            f"‚úì Extraction complete: {success_count}/{len(pdf_sources)} successful"
        )
        
        return results

print("‚úì TextExtractor class defined (using amazon-textract-textractor library)")

### 3.2 TextProcessor Class

In [None]:
import re

class TextProcessor:
    """
    Handles text cleaning and post-processing after extraction.
    
    This is where you add custom logic for:
    - Cleaning OCR artifacts
    - Removing headers/footers
    - Fixing hyphenation
    - Normalizing whitespace
    - Any domain-specific processing
    """
    
    def __init__(self, config: ProcessingConfig):
        self.config = config
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def process(self, extraction_result: Dict[str, Any]) -> Dict[str, Any]:
        """
        Apply all post-processing steps to extracted text.
        
        Args:
            extraction_result: Output from TextExtractor.extract_from_pdf()
            
        Returns:
            Modified extraction_result with processed text
        """
        if extraction_result['status'] != 'success':
            self.logger.warning(f"Skipping processing for failed extraction: {extraction_result['doc_id']}")
            return extraction_result
        
        self.logger.info(f"Processing text for: {extraction_result['doc_id']}")
        
        try:
            text = extraction_result['full_text']
            original_length = len(text)
            
            # Apply processing pipeline
            processing_steps = []
            
            # Step 1: Clean whitespace
            text = self._clean_whitespace(text)
            processing_steps.append('clean_whitespace')
            
            # Step 2: Fix hyphenation (words split across lines)
            text = self._fix_hyphenation(text)
            processing_steps.append('fix_hyphenation')
            
            # Step 3: Remove common headers/footers patterns
            text = self._remove_headers_footers(text)
            processing_steps.append('remove_headers_footers')
            
            # Step 4: Normalize unicode characters
            text = self._normalize_unicode(text)
            processing_steps.append('normalize_unicode')
            
            # Step 5: Remove page numbers
            text = self._remove_page_numbers(text)
            processing_steps.append('remove_page_numbers')
            
            # ADD YOUR CUSTOM PROCESSING STEPS HERE
            # text = self._your_custom_method(text)
            # processing_steps.append('your_custom_step')
            
            # Update extraction result
            extraction_result['full_text'] = text
            extraction_result['processed'] = True
            extraction_result['processing_steps'] = processing_steps
            extraction_result['processing_timestamp'] = datetime.utcnow().isoformat()
            extraction_result['char_count_after_processing'] = len(text)
            extraction_result['chars_removed'] = original_length - len(text)
            
            self.logger.info(
                f"‚úì Processed: {original_length:,} ‚Üí {len(text):,} chars "
                f"({extraction_result['chars_removed']:,} removed)"
            )
            
            return extraction_result
            
        except Exception as e:
            self.logger.error(f"‚úó Processing failed: {str(e)}")
            extraction_result['status'] = 'processing_failed'
            extraction_result['error'] = str(e)
            return extraction_result
    
    def _clean_whitespace(self, text: str) -> str:
        """Remove excessive whitespace while preserving paragraph structure."""
        # Replace multiple spaces with single space
        text = re.sub(r' +', ' ', text)
        # Replace more than 2 newlines with exactly 2 (paragraph break)
        text = re.sub(r'\n{3,}', '\n\n', text)
        # Remove trailing/leading whitespace per line
        text = '\n'.join(line.strip() for line in text.split('\n'))
        return text.strip()
    
    def _fix_hyphenation(self, text: str) -> str:
        """
        Fix words split across lines with hyphens.
        Example: 'exam-\nple' ‚Üí 'example'
        """
        # Pattern: word character, hyphen, newline, word character
        text = re.sub(r'(\w+)-\s*\n\s*(\w+)', r'\1\2', text)
        return text
    
    def _remove_headers_footers(self, text: str) -> str:
        """
        Remove common header/footer patterns.
        
        NOTE: This is a simple heuristic. For production, you may want:
        - Page-aware processing (using extraction_result['pages'])
        - Machine learning based detection
        - Custom patterns based on your document types
        """
        # Remove standalone page numbers (number on its own line)
        text = re.sub(r'^\d+$', '', text, flags=re.MULTILINE)
        
        # Remove common footer patterns like "Page X of Y"
        text = re.sub(r'Page \d+ of \d+', '', text, flags=re.IGNORECASE)
        
        return text
    
    def _normalize_unicode(self, text: str) -> str:
        """Normalize unicode characters to standard forms."""
        # Replace smart quotes with standard quotes
        replacements = {
            '\u2018': "'",  # Left single quote
            '\u2019': "'",  # Right single quote
            '\u201c': '"',  # Left double quote
            '\u201d': '"',  # Right double quote
            '\u2013': '-',  # En dash
            '\u2014': '-',  # Em dash
            '\u2026': '...', # Ellipsis
        }
        
        for old, new in replacements.items():
            text = text.replace(old, new)
        
        return text
    
    def _remove_page_numbers(self, text: str) -> str:
        """
        Remove standalone page numbers that appear in text.
        Be careful not to remove legitimate numbers!
        """
        # Remove patterns like: "\n5\n" or "\nPage 5\n"
        text = re.sub(r'\n\s*Page\s+\d+\s*\n', '\n', text, flags=re.IGNORECASE)
        return text
    
    def process_batch(self, extraction_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Process multiple extraction results.
        
        Args:
            extraction_results: List of extraction result dictionaries
            
        Returns:
            List of processed extraction results
        """
        self.logger.info(f"Processing batch of {len(extraction_results)} documents")
        
        processed_results = []
        for extraction in extraction_results:
            processed = self.process(extraction)
            processed_results.append(processed)
        
        success_count = sum(1 for r in processed_results if r.get('processed', False))
        self.logger.info(f"‚úì Processing complete: {success_count}/{len(extraction_results)} successful")
        
        return processed_results

print("‚úì TextProcessor class defined")

### 3.3 TextChunker Class

In [None]:
class TextChunker:
    """Handles text chunking using LangChain RecursiveCharacterTextSplitter."""
    
    def __init__(self, config: ProcessingConfig):
        self.config = config
        self.logger = logging.getLogger(self.__class__.__name__)
        
        # Initialize text splitter
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=config.CHUNK_SIZE,
            chunk_overlap=config.CHUNK_OVERLAP,
            separators=config.SEPARATORS,
            length_function=config.LENGTH_FUNCTION,
        )
    
    def chunk(self, extraction_result: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        Chunk processed document text using LangChain splitter.
        
        Args:
            extraction_result: Output from TextProcessor.process() or TextExtractor.extract_from_pdf()
            
        Returns:
            List of chunk dictionaries with text and metadata
        """
        if extraction_result['status'] != 'success':
            self.logger.warning(f"Skipping chunking for failed document: {extraction_result['doc_id']}")
            return []
        
        self.logger.info(f"Chunking document: {extraction_result['doc_id']}")
        
        try:
            # Split text into chunks
            text = extraction_result['full_text']
            chunks = self.text_splitter.split_text(text)
            
            # Create chunk objects with metadata
            chunk_objects = []
            for idx, chunk_text in enumerate(chunks):
                chunk_obj = {
                    'chunk_id': f"{extraction_result['doc_id']}_chunk_{idx}",
                    'doc_id': extraction_result['doc_id'],
                    'doc_name': extraction_result['doc_name'],
                    'chunk_index': idx,
                    'text': chunk_text,
                    'char_count': len(chunk_text),
                    'page_numbers': self._estimate_page_numbers(
                        chunk_text, 
                        extraction_result.get('pages', [])
                    ),
                    'processing_timestamp': self.config.PROCESSING_TIMESTAMP,
                    'was_processed': extraction_result.get('processed', False)
                }
                chunk_objects.append(chunk_obj)
            
            self.logger.info(f"‚úì Created {len(chunk_objects)} chunks")
            return chunk_objects
            
        except Exception as e:
            self.logger.error(f"‚úó Chunking failed: {str(e)}")
            return []
    
    def _estimate_page_numbers(self, chunk_text: str, pages: List[Dict]) -> List[int]:
        """
        Estimate which pages a chunk spans based on text matching.
        
        NOTE: This is a simple heuristic. For production, consider:
        - Character position tracking during extraction
        - More sophisticated text matching algorithms
        - Handling post-processing effects on page boundaries
        """
        if not pages:
            return []
        
        # Find pages containing any portion of the chunk text
        chunk_snippet = chunk_text[:100]  # First 100 chars for matching
        matching_pages = []
        
        for page in pages:
            if chunk_snippet in page['text']:
                matching_pages.append(page['page_number'])
        
        return matching_pages if matching_pages else [1]  # Default to page 1
    
    def chunk_batch(self, extraction_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Chunk multiple documents.
        
        Args:
            extraction_results: List of extraction/processing result dictionaries
            
        Returns:
            List of all chunks from all documents
        """
        self.logger.info(f"Chunking batch of {len(extraction_results)} documents")
        
        all_chunks = []
        for extraction in extraction_results:
            chunks = self.chunk(extraction)
            all_chunks.extend(chunks)
        
        self.logger.info(f"‚úì Chunking complete: {len(all_chunks)} total chunks created")
        
        return all_chunks

print("‚úì TextChunker class defined")

---
## 4. Document Discovery & Validation

Locate and validate the 6 sample PDFs.

In [None]:
# Define your 6 PDF documents
# Modify these paths based on your actual document locations

if ProcessingConfig.PDF_SOURCE_TYPE == "s3":
    # S3 sources
    pdf_sources = [
        (f"s3://{ProcessingConfig.S3_BUCKET}/{ProcessingConfig.S3_PREFIX}doc1.pdf", "doc_001"),
        (f"s3://{ProcessingConfig.S3_BUCKET}/{ProcessingConfig.S3_PREFIX}doc2.pdf", "doc_002"),
        (f"s3://{ProcessingConfig.S3_BUCKET}/{ProcessingConfig.S3_PREFIX}doc3.pdf", "doc_003"),
        (f"s3://{ProcessingConfig.S3_BUCKET}/{ProcessingConfig.S3_PREFIX}doc4.pdf", "doc_004"),
        (f"s3://{ProcessingConfig.S3_BUCKET}/{ProcessingConfig.S3_PREFIX}doc5.pdf", "doc_005"),
        (f"s3://{ProcessingConfig.S3_BUCKET}/{ProcessingConfig.S3_PREFIX}doc6.pdf", "doc_006"),
    ]
else:
    # Local sources
    pdf_dir = Path(ProcessingConfig.LOCAL_PDF_DIR)
    pdf_files = sorted(pdf_dir.glob("*.pdf"))[:6]  # Take first 6 PDFs
    pdf_sources = [
        (str(pdf_path), f"doc_{i:03d}") 
        for i, pdf_path in enumerate(pdf_files, start=1)
    ]

# Display document list
doc_list_df = pd.DataFrame([
    {
        "Doc ID": doc_id,
        "Source Path": path,
        "Filename": Path(path).name
    }
    for path, doc_id in pdf_sources
])

print(f"\nüìÑ Found {len(pdf_sources)} documents:")
display(doc_list_df)

In [None]:
# Validation: Check if documents are accessible
print("\nüîç Validating document access...\n")

validation_results = []

for pdf_path, doc_id in pdf_sources:
    try:
        if pdf_path.startswith('s3://'):
            # Check S3 object exists
            parts = pdf_path.replace('s3://', '').split('/', 1)
            bucket, key = parts[0], parts[1]
            response = s3_client.head_object(Bucket=bucket, Key=key)
            size_mb = response['ContentLength'] / (1024 * 1024)
            status = "‚úì Accessible"
        else:
            # Check local file exists
            path = Path(pdf_path)
            if path.exists():
                size_mb = path.stat().st_size / (1024 * 1024)
                status = "‚úì Accessible"
            else:
                size_mb = 0
                status = "‚úó Not found"
        
        validation_results.append({
            "Doc ID": doc_id,
            "Filename": Path(pdf_path).name,
            "Size (MB)": f"{size_mb:.2f}",
            "Status": status
        })
        
    except Exception as e:
        validation_results.append({
            "Doc ID": doc_id,
            "Filename": Path(pdf_path).name,
            "Size (MB)": "N/A",
            "Status": f"‚úó Error: {str(e)[:50]}"
        })

validation_df = pd.DataFrame(validation_results)
display(validation_df)

# Check if all documents are accessible
accessible_count = sum(1 for r in validation_results if "‚úì" in r["Status"])
if accessible_count == len(pdf_sources):
    print(f"\n‚úì All {len(pdf_sources)} documents are accessible and ready for processing")
else:
    print(f"\n‚ö†Ô∏è Warning: Only {accessible_count}/{len(pdf_sources)} documents are accessible")

---
## 5. Raw Extraction (Textractor)

Extract text from all PDFs using amazon-textract-textractor and save checkpoint.

In [None]:
# Initialize processor
extractor = TextExtractor(ProcessingConfig)
processor = TextProcessor(ProcessingConfig)
chunker = TextChunker(ProcessingConfig)

print("‚úì Processing pipeline initialized")
print(f"  - TextExtractor: Ready (using Textractor library)")
print(f"  - TextProcessor: Ready")
print(f"  - TextChunker: Ready")
print(f"\n  Configuration:")
print(f"  - Chunk size: {ProcessingConfig.CHUNK_SIZE}")
print(f"  - Chunk overlap: {ProcessingConfig.CHUNK_OVERLAP}")
print(f"  - Textract features: {ProcessingConfig.TEXTRACT_FEATURES}")

In [None]:
# Extract text from all documents
print("\nüîÑ Starting extraction process...\n")
print("=" * 80)

raw_extractions = []

for pdf_path, doc_id in pdf_sources:
    print(f"\nProcessing: {doc_id} - {Path(pdf_path).name}")
    print("-" * 80)
    
    extraction_result = extractor.extract_from_pdf(pdf_path, doc_id)
    raw_extractions.append(extraction_result)
    
    # Display result summary
    if extraction_result['status'] == 'success':
        print(f"  ‚úì Pages: {extraction_result['page_count']}")
        print(f"  ‚úì Characters: {extraction_result['total_char_count']:,}")
        print(f"  ‚úì Tables: {extraction_result['table_count']}")
        
        # Show layout element counts
        total_layout = sum(
            sum(page['layout_elements'].values()) 
            for page in extraction_result['pages']
            if page['layout_elements']
        )
        if total_layout > 0:
            print(f"  ‚úì Layout elements detected: {total_layout}")
    else:
        print(f"  ‚úó Error: {extraction_result.get('error', 'Unknown error')}")

print("\n" + "=" * 80)
print("\n‚úì Extraction phase complete")

In [None]:
# Save raw extractions checkpoint
# Note: We don't save the textractor_document object (not JSON serializable)
extraction_checkpoint_path = checkpoint_dir / ProcessingConfig.RAW_EXTRACTION_FILE

# Create a copy without the textractor_document for serialization
serializable_extractions = []
for extraction in raw_extractions:
    extraction_copy = extraction.copy()
    if 'textractor_document' in extraction_copy:
        del extraction_copy['textractor_document']
    serializable_extractions.append(extraction_copy)

with open(extraction_checkpoint_path, 'w') as f:
    json.dump(serializable_extractions, f, indent=2)

print(f"üíæ Raw extractions saved to: {extraction_checkpoint_path}")
print(f"   File size: {extraction_checkpoint_path.stat().st_size / 1024:.2f} KB")

---
## 6. Extraction Quality Review

Inspect extracted text and layout information for quality issues.

In [None]:
# Summary statistics
extraction_stats = []

for extraction in raw_extractions:
    if extraction['status'] == 'success':
        # Calculate layout element totals
        total_layout = sum(
            sum(page['layout_elements'].values()) 
            for page in extraction['pages']
            if page['layout_elements']
        )
        
        extraction_stats.append({
            "Doc ID": extraction['doc_id'],
            "Doc Name": extraction['doc_name'],
            "Pages": extraction['page_count'],
            "Characters": f"{extraction['total_char_count']:,}",
            "Avg Chars/Page": f"{extraction['total_char_count'] // extraction['page_count']:,}",
            "Tables": extraction['table_count'],
            "Layout Elements": total_layout,
            "Status": "‚úì Success"
        })
    else:
        extraction_stats.append({
            "Doc ID": extraction['doc_id'],
            "Doc Name": extraction['doc_name'],
            "Pages": "N/A",
            "Characters": "N/A",
            "Avg Chars/Page": "N/A",
            "Tables": "N/A",
            "Layout Elements": "N/A",
            "Status": "‚úó Failed"
        })

stats_df = pd.DataFrame(extraction_stats)
print("\nüìä Extraction Statistics:\n")
display(stats_df)

# Overall stats
successful = [e for e in raw_extractions if e['status'] == 'success']
if successful:
    total_pages = sum(e['page_count'] for e in successful)
    total_chars = sum(e['total_char_count'] for e in successful)
    total_tables = sum(e['table_count'] for e in successful)
    
    print(f"\nüìà Overall:")
    print(f"   Total pages extracted: {total_pages}")
    print(f"   Total characters: {total_chars:,}")
    print(f"   Total tables: {total_tables}")
    print(f"   Average document size: {total_chars // len(successful):,} characters")

In [None]:
# Display sample text from each document
print("\nüìù Sample Text from Each Document (Linearized by Textractor):\n")
print("=" * 80)

for extraction in raw_extractions:
    if extraction['status'] == 'success':
        print(f"\n{extraction['doc_id']} - {extraction['doc_name']}")
        print("-" * 80)
        # Show first 500 characters
        sample_text = extraction['full_text'][:500]
        print(sample_text)
        if len(extraction['full_text']) > 500:
            print("\n[... truncated ...]")
        print()

In [None]:
# Display layout information
print("\nüèóÔ∏è Layout Analysis:\n")

for extraction in raw_extractions:
    if extraction['status'] == 'success':
        print(f"\n{extraction['doc_id']} - Layout Element Breakdown:")
        
        # Aggregate layout elements across all pages
        aggregated_layout = {}
        for page in extraction['pages']:
            if page['layout_elements']:
                for element_type, count in page['layout_elements'].items():
                    aggregated_layout[element_type] = aggregated_layout.get(element_type, 0) + count
        
        if aggregated_layout:
            for element_type, count in aggregated_layout.items():
                print(f"  {element_type.capitalize()}: {count}")
        else:
            print("  No layout elements detected")
        print()

In [None]:
# Quality checks
print("\nüîç Quality Checks:\n")

quality_issues = []

for extraction in raw_extractions:
    if extraction['status'] == 'success':
        text = extraction['full_text']
        doc_id = extraction['doc_id']
        
        # Check for potential issues
        issues = []
        
        # Very short extraction
        if len(text) < 100:
            issues.append("Very short text (< 100 chars)")
        
        # Check for excessive special characters (possible OCR issues)
        special_char_ratio = sum(1 for c in text if not c.isalnum() and not c.isspace()) / len(text)
        if special_char_ratio > 0.3:
            issues.append(f"High special char ratio ({special_char_ratio:.1%})")
        
        # Check for repeated characters (OCR artifact)
        if '.....' in text or '-----' in text:
            issues.append("Repeated characters detected")
        
        if issues:
            quality_issues.append({
                "Doc ID": doc_id,
                "Issues": "; ".join(issues)
            })

if quality_issues:
    quality_df = pd.DataFrame(quality_issues)
    print("‚ö†Ô∏è Potential Quality Issues Detected:\n")
    display(quality_df)
else:
    print("‚úì No obvious quality issues detected")
    print("   Note: Manual review of sample text above is still recommended")

---
## 6.5 Text Post-Processing

Apply cleaning and normalization to extracted text before chunking.

**Processing Steps:**
1. Clean whitespace
2. Fix hyphenation (words split across lines)
3. Remove headers/footers
4. Normalize unicode characters
5. Remove page numbers

**Custom processing:** Modify `TextProcessor` class methods to add domain-specific logic.

In [None]:
# Process all successfully extracted documents
print("\nüîÑ Starting text processing...\n")
print("=" * 80)

processed_extractions = []

for extraction in raw_extractions:
    if extraction['status'] == 'success':
        print(f"\nProcessing: {extraction['doc_id']} - {extraction['doc_name']}")
        print("-" * 80)
        
        original_length = extraction['total_char_count']
        
        processed = processor.process(extraction)
        processed_extractions.append(processed)
        
        if processed.get('processed', False):
            new_length = processed['char_count_after_processing']
            chars_removed = processed['chars_removed']
            print(f"  ‚úì Original: {original_length:,} chars")
            print(f"  ‚úì Processed: {new_length:,} chars")
            print(f"  ‚úì Removed: {chars_removed:,} chars ({chars_removed/original_length*100:.1f}%)")
            print(f"  ‚úì Steps: {', '.join(processed['processing_steps'])}")
        else:
            print(f"  ‚úó Processing failed")
    else:
        # Keep failed extractions in the list
        processed_extractions.append(extraction)
        print(f"\nSkipping: {extraction['doc_id']} (extraction failed)")

print("\n" + "=" * 80)
print("\n‚úì Text processing complete")

In [None]:
# Save processed extractions checkpoint
processed_checkpoint_path = checkpoint_dir / ProcessingConfig.PROCESSED_EXTRACTIONS_FILE

# Create serializable version (remove textractor_document if present)
serializable_processed = []
for extraction in processed_extractions:
    extraction_copy = extraction.copy()
    if 'textractor_document' in extraction_copy:
        del extraction_copy['textractor_document']
    serializable_processed.append(extraction_copy)

with open(processed_checkpoint_path, 'w') as f:
    json.dump(serializable_processed, f, indent=2)

print(f"üíæ Processed extractions saved to: {processed_checkpoint_path}")
print(f"   File size: {processed_checkpoint_path.stat().st_size / 1024:.2f} KB")

### Processing Quality Analysis

In [None]:
# Compare before/after statistics
print("\nüìä Processing Impact Analysis:\n")

comparison_data = []

for i, extraction in enumerate(raw_extractions):
    if extraction['status'] == 'success':
        processed = processed_extractions[i]
        if processed.get('processed', False):
            original = extraction['total_char_count']
            after = processed['char_count_after_processing']
            removed = processed['chars_removed']
            
            comparison_data.append({
                "Doc ID": extraction['doc_id'],
                "Original Chars": f"{original:,}",
                "Processed Chars": f"{after:,}",
                "Chars Removed": f"{removed:,}",
                "Reduction %": f"{removed/original*100:.1f}%"
            })

if comparison_data:
    comparison_df = pd.DataFrame(comparison_data)
    display(comparison_df)
    
    # Overall statistics
    total_original = sum(int(r["Original Chars"].replace(',', '')) for r in comparison_data)
    total_processed = sum(int(r["Processed Chars"].replace(',', '')) for r in comparison_data)
    total_removed = total_original - total_processed
    
    print(f"\nüìà Overall Impact:")
    print(f"   Total original: {total_original:,} characters")
    print(f"   Total processed: {total_processed:,} characters")
    print(f"   Total removed: {total_removed:,} characters ({total_removed/total_original*100:.1f}%)")

In [None]:
# Display before/after samples
print("\nüìù Before/After Text Samples:\n")
print("=" * 80)

# Show first document as example
for i, extraction in enumerate(raw_extractions[:2]):  # First 2 docs
    if extraction['status'] == 'success':
        processed = processed_extractions[i]
        if processed.get('processed', False):
            print(f"\n{extraction['doc_id']} - {extraction['doc_name']}")
            print("-" * 80)
            print("\nBEFORE (first 300 chars):")
            print(extraction['full_text'][:300])
            print("\nAFTER (first 300 chars):")
            print(processed['full_text'][:300])
            print()

---
## 7. Text Chunking

Apply LangChain RecursiveCharacterTextSplitter to create chunks from processed text.

In [None]:
# Chunk all successfully processed documents
print("\nüîÑ Starting chunking process...\n")
print("=" * 80)

all_chunks = []

for extraction in processed_extractions:
    if extraction['status'] == 'success':
        print(f"\nChunking: {extraction['doc_id']} - {extraction['doc_name']}")
        print("-" * 80)
        
        chunks = chunker.chunk(extraction)
        all_chunks.extend(chunks)
        
        print(f"  ‚úì Created {len(chunks)} chunks")
        if chunks:
            avg_chunk_size = sum(c['char_count'] for c in chunks) / len(chunks)
            print(f"  ‚úì Average chunk size: {avg_chunk_size:.0f} characters")
            print(f"  ‚úì Used processed text: {chunks[0]['was_processed']}")

print("\n" + "=" * 80)
print(f"\n‚úì Chunking complete: {len(all_chunks)} total chunks created")

---
## 8. Chunk Analysis

Analyze chunk distribution and quality.

In [None]:
# Chunk distribution by document
import matplotlib.pyplot as plt

if all_chunks:
    # Group chunks by document
    chunks_by_doc = {}
    for chunk in all_chunks:
        doc_id = chunk['doc_id']
        if doc_id not in chunks_by_doc:
            chunks_by_doc[doc_id] = []
        chunks_by_doc[doc_id].append(chunk)
    
    # Create distribution table
    distribution_data = []
    for doc_id, chunks in chunks_by_doc.items():
        chunk_sizes = [c['char_count'] for c in chunks]
        distribution_data.append({
            "Doc ID": doc_id,
            "Chunk Count": len(chunks),
            "Avg Size": f"{sum(chunk_sizes) / len(chunks):.0f}",
            "Min Size": min(chunk_sizes),
            "Max Size": max(chunk_sizes),
            "Total Chars": f"{sum(chunk_sizes):,}"
        })
    
    dist_df = pd.DataFrame(distribution_data)
    print("\nüìä Chunk Distribution by Document:\n")
    display(dist_df)
    
    # Visualize chunk size distribution
    all_chunk_sizes = [c['char_count'] for c in all_chunks]
    
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.hist(all_chunk_sizes, bins=30, edgecolor='black', alpha=0.7)
    plt.axvline(ProcessingConfig.CHUNK_SIZE, color='red', linestyle='--', label=f'Target: {ProcessingConfig.CHUNK_SIZE}')
    plt.xlabel('Chunk Size (characters)')
    plt.ylabel('Frequency')
    plt.title('Chunk Size Distribution')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    doc_ids = [d["Doc ID"] for d in distribution_data]
    chunk_counts = [d["Chunk Count"] for d in distribution_data]
    plt.bar(doc_ids, chunk_counts, alpha=0.7, edgecolor='black')
    plt.xlabel('Document ID')
    plt.ylabel('Number of Chunks')
    plt.title('Chunks per Document')
    plt.xticks(rotation=45)
    
    plt.tight_layout()
    plt.savefig(checkpoint_dir / 'chunk_distribution.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    print(f"\nüìà Visualization saved to: {checkpoint_dir / 'chunk_distribution.png'}")

In [None]:
# Display example chunks
print("\nüìù Example Chunks:\n")
print("=" * 80)

# Show first chunk from each document
if all_chunks:
    docs_shown = set()
    for chunk in all_chunks:
        if chunk['doc_id'] not in docs_shown:
            docs_shown.add(chunk['doc_id'])
            print(f"\n{chunk['chunk_id']}")
            print(f"Doc: {chunk['doc_name']}, Pages: {chunk['page_numbers']}, Size: {chunk['char_count']} chars")
            print("-" * 80)
            # Show first 300 characters
            print(chunk['text'][:300])
            if len(chunk['text']) > 300:
                print("\n[... truncated ...]")
            print()

In [None]:
# Chunk boundary analysis
print("\nüîç Chunk Boundary Analysis:\n")

if all_chunks:
    # Check for very small chunks (might indicate issues)
    small_chunks = [c for c in all_chunks if c['char_count'] < ProcessingConfig.CHUNK_SIZE * 0.3]
    
    # Check for chunks at max size (might be cut mid-sentence)
    large_chunks = [c for c in all_chunks if c['char_count'] >= ProcessingConfig.CHUNK_SIZE * 0.95]
    
    print(f"Small chunks (< 30% of target): {len(small_chunks)} / {len(all_chunks)} ({len(small_chunks)/len(all_chunks)*100:.1f}%)")
    print(f"Large chunks (>= 95% of target): {len(large_chunks)} / {len(all_chunks)} ({len(large_chunks)/len(all_chunks)*100:.1f}%)")
    
    # Check overlap effectiveness
    print(f"\nOverlap check: Comparing consecutive chunks...")
    overlap_examples = 0
    for i in range(min(3, len(all_chunks) - 1)):
        chunk1 = all_chunks[i]
        chunk2 = all_chunks[i + 1]
        if chunk1['doc_id'] == chunk2['doc_id']:  # Same document
            # Check if there's any overlap
            end_of_first = chunk1['text'][-100:]  # Last 100 chars of first chunk
            start_of_second = chunk2['text'][:100]  # First 100 chars of second chunk
            
            # Simple overlap detection
            overlap_found = any(end_of_first[i:i+20] in start_of_second for i in range(len(end_of_first)-20))
            
            if overlap_found:
                overlap_examples += 1
    
    print(f"  Overlap detected in {overlap_examples}/3 sample consecutive chunk pairs")
    print(f"  (Expected behavior with overlap={ProcessingConfig.CHUNK_OVERLAP})")

---
## 9. Final Output Preparation

Validate and save processed chunks for indexing.

In [None]:
# Schema validation
print("\n‚úÖ Schema Validation:\n")

required_fields = ['chunk_id', 'doc_id', 'doc_name', 'text', 'chunk_index', 'char_count', 'page_numbers']

validation_passed = True
for chunk in all_chunks[:5]:  # Check first 5 chunks
    missing_fields = [field for field in required_fields if field not in chunk]
    if missing_fields:
        print(f"‚úó Chunk {chunk.get('chunk_id', 'unknown')} missing fields: {missing_fields}")
        validation_passed = False

if validation_passed:
    print("‚úì All chunks have required fields")
    print(f"‚úì Schema: {', '.join(required_fields)}")
else:
    print("\n‚ö†Ô∏è Warning: Some chunks have missing fields")

In [None]:
# Prepare final output format
# Add any additional metadata or formatting needed for OpenSearch

final_chunks = []
for chunk in all_chunks:
    # Create a clean version for indexing
    final_chunk = {
        'id': chunk['chunk_id'],  # Use as document ID in OpenSearch
        'text': chunk['text'],
        'metadata': {
            'doc_id': chunk['doc_id'],
            'doc_name': chunk['doc_name'],
            'chunk_index': chunk['chunk_index'],
            'char_count': chunk['char_count'],
            'page_numbers': chunk['page_numbers'],
            'processing_timestamp': chunk['processing_timestamp'],
            'was_processed': chunk['was_processed']
        }
    }
    final_chunks.append(final_chunk)

print(f"‚úì Prepared {len(final_chunks)} chunks for indexing")
print(f"\nSample output format:")
print(json.dumps(final_chunks[0], indent=2))

In [None]:
# Save processed chunks
chunks_output_path = checkpoint_dir / ProcessingConfig.PROCESSED_CHUNKS_FILE

with open(chunks_output_path, 'w') as f:
    json.dump(final_chunks, f, indent=2)

print(f"üíæ Processed chunks saved to: {chunks_output_path}")
print(f"   File size: {chunks_output_path.stat().st_size / 1024:.2f} KB")
print(f"   Total chunks: {len(final_chunks)}")

---
## 10. Summary & Next Steps

In [None]:
# Generate final summary
print("\n" + "=" * 80)
print("üìã PROCESSING SUMMARY")
print("=" * 80)

successful_extractions = [e for e in raw_extractions if e['status'] == 'success']
successful_processing = [e for e in processed_extractions if e.get('processed', False)]

print(f"\nüìÑ Document Processing:")
print(f"   Total documents: {len(pdf_sources)}")
print(f"   Successfully extracted: {len(successful_extractions)}")
print(f"   Successfully processed: {len(successful_processing)}")
print(f"   Failed extractions: {len(pdf_sources) - len(successful_extractions)}")

if successful_extractions:
    total_pages = sum(e['page_count'] for e in successful_extractions)
    total_chars_raw = sum(e['total_char_count'] for e in successful_extractions)
    total_tables = sum(e['table_count'] for e in successful_extractions)
    print(f"   Total pages: {total_pages}")
    print(f"   Total characters (raw): {total_chars_raw:,}")
    print(f"   Total tables detected: {total_tables}")

if successful_processing:
    total_chars_processed = sum(e['char_count_after_processing'] for e in successful_processing)
    total_removed = total_chars_raw - total_chars_processed
    print(f"   Total characters (processed): {total_chars_processed:,}")
    print(f"   Characters removed by processing: {total_removed:,} ({total_removed/total_chars_raw*100:.1f}%)")

print(f"\nüì¶ Chunk Generation:")
print(f"   Total chunks created: {len(all_chunks)}")
if all_chunks:
    avg_chunk_size = sum(c['char_count'] for c in all_chunks) / len(all_chunks)
    chunks_from_processed = sum(1 for c in all_chunks if c.get('was_processed', False))
    print(f"   Average chunk size: {avg_chunk_size:.0f} characters")
    print(f"   Target chunk size: {ProcessingConfig.CHUNK_SIZE} characters")
    print(f"   Chunk overlap: {ProcessingConfig.CHUNK_OVERLAP} characters")
    print(f"   Chunks from processed text: {chunks_from_processed}/{len(all_chunks)}")

print(f"\nüíæ Outputs:")
print(f"   Config: {checkpoint_dir / 'config.json'}")
print(f"   Raw extractions: {checkpoint_dir / ProcessingConfig.RAW_EXTRACTION_FILE}")
print(f"   Processed extractions: {checkpoint_dir / ProcessingConfig.PROCESSED_EXTRACTIONS_FILE}")
print(f"   Final chunks: {checkpoint_dir / ProcessingConfig.PROCESSED_CHUNKS_FILE}")
print(f"   Visualization: {checkpoint_dir / 'chunk_distribution.png'}")

print(f"\nüîß Pipeline Components Used:")
print(f"   1. TextExtractor (amazon-textract-textractor) ‚Üí Layout-aware text extraction")
print(f"   2. TextProcessor ‚Üí Cleaned and normalized text")
print(f"   3. TextChunker (LangChain) ‚Üí Final chunks for indexing")

print(f"\n‚è≠Ô∏è  Next Steps:")
print(f"   1. Review the layout analysis in Section 6")
print(f"   2. Review the before/after samples in Section 6.5")
print(f"   3. Review the chunk samples in Section 8")
print(f"   4. If satisfied, proceed to: 02_indexing.ipynb")
print(f"   5. If adjustments needed:")
print(f"      - Modify TextProcessor methods for different cleaning")
print(f"      - Modify config and re-run from Section 6.5 (load raw checkpoint)")
print(f"      - Modify chunking params and re-run from Section 7 (load processed checkpoint)")

print("\n" + "=" * 80)
print("‚úì Notebook execution complete")
print("=" * 80)

---
## Notes & Observations

**Architecture:**
- Three-class design: `TextExtractor`, `TextProcessor`, `TextChunker`
- Using amazon-textract-textractor library for layout-aware extraction
- Separation of concerns allows independent testing and iteration
- Each stage has its own checkpoint for efficient experimentation

**Extraction Quality:**
- [Add observations about Textractor layout detection]
- [Note quality of linearized text vs raw text]
- [Document any problematic documents]

**Layout Detection:**
- [Note which layout elements were detected (titles, headers, tables, etc.)]
- [Assess if layout-aware extraction improved text quality]
- [Consider using layout info for chunking in future]

**Text Processing:**
- [Document effectiveness of cleaning steps]
- [Note any additional processing needed]
- [List custom methods added to TextProcessor]

**Chunking Strategy:**
- [Document why you chose these chunk parameters]
- [Note any adjustments made during experimentation]
- [Consider layout-aware chunking for next iteration]

**Issues Encountered:**
- [List any errors or unexpected behavior]

**Next Experiments:**
- [Try different processing strategies]
- [Alternative chunk sizes or overlap values]
- [Layout-based chunking (chunk by sections instead of character count)]
- [Use table data for structured retrieval]
- [Metadata enhancements from layout elements]