# Amazon Textract

Testing Amazon Textract but mostly having SubscriptionRequired errors

In [1]:
# Imports and Setup
import boto3
from pathlib import Path
from typing import Dict, List
from botocore.exceptions import ClientError, NoCredentialsError

In [2]:
# Initialize Textract Client Function
def initialize_textract_client(region_name: str = 'us-east-1', debug: bool = False) -> boto3.client:
    """
    Initialize AWS Textract client 
    
    Args:
        region_name (str): AWS region for Textract service
        debug (bool): Enable debug logging
    
    Returns:
        boto3.client: Configured Textract client
    
    Raises:
        NoCredentialsError: If AWS credentials are not configured
        Exception: For other AWS configuration issues
    """
    if debug:
        print(f"[DEBUG] Initializing Textract client for region: {region_name}")
    
    try:
        # Initialize the Textract client
        textract = boto3.client('textract', region_name=region_name)
        
        if debug:
            print("[DEBUG] Successfully created Textract client")
            # Test client by listing available operations (doesn't make API call)
            operations = [op for op in dir(textract) if not op.startswith('_') and callable(getattr(textract, op))][:5]
            print(f"[DEBUG] Available operations sample: {operations}")
        
        return textract
        
    except NoCredentialsError:
        error_msg = "AWS credentials not found. Please configure AWS credentials."
        if debug:
            print(f"[DEBUG] {error_msg}")
        raise
        
    except Exception as e:
        error_msg = f"Failed to initialize Textract client: {str(e)}"
        if debug:
            print(f"[DEBUG] {error_msg}")
        raise

In [3]:
# Test Textract Client Initialization
try:
    # Test the function with debug enabled
    textract_client = initialize_textract_client(debug=True)
    print("✅ Textract client initialized successfully!")
    print(f"Service name: {textract_client._service_model.service_name}")
    print(f"Region: {textract_client.meta.region_name}")
    
except NoCredentialsError:
    print("❌ AWS credentials not configured. Please set up your AWS credentials.")
    print("You can configure them using:")
    print("1. AWS CLI: aws configure")
    print("2. Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY")
    print("3. IAM roles (if running on EC2)")
    
except Exception as e:
    print(f"❌ Error initializing Textract client: {e}")

[DEBUG] Initializing Textract client for region: us-east-1
[DEBUG] Successfully created Textract client
[DEBUG] Available operations sample: ['analyze_document', 'analyze_expense', 'analyze_id', 'can_paginate', 'close']
✅ Textract client initialized successfully!
Service name: textract
Region: us-east-1


In [4]:
# PDF Directory Scanner Function
def scan_pdf_directory(root_directory: str, debug: bool = False) -> List[Dict[str, str]]:
    """
    Recursively scan directory for PDF files and return structured information.
    
    Args:
        root_directory (str): Path to root directory containing PDFs
        debug (bool): Enable debug logging
    
    Returns:
        List[Dict]: List of PDF file information with keys: 'path', 'name', 'size', 'subfolder'
    """
    if debug:
        print(f"[DEBUG] Scanning directory: {root_directory}")
    
    pdf_files = []
    root_path = Path(root_directory)
    
    if not root_path.exists():
        error_msg = f"Directory does not exist: {root_directory}"
        if debug:
            print(f"[DEBUG] {error_msg}")
        return pdf_files
    
    if not root_path.is_dir():
        error_msg = f"Path is not a directory: {root_directory}"
        if debug:
            print(f"[DEBUG] {error_msg}")
        return pdf_files
    
    # Recursively find all PDF files
    pdf_pattern = "**/*.pdf"
    found_files = list(root_path.glob(pdf_pattern))
    
    if debug:
        print(f"[DEBUG] Found {len(found_files)} PDF files")
    
    for pdf_path in found_files:
        try:
            # Get file stats
            file_stats = pdf_path.stat()
            
            # Determine subfolder relative to root
            relative_path = pdf_path.relative_to(root_path)
            subfolder = str(relative_path.parent) if relative_path.parent != Path('.') else 'root'
            
            pdf_info = {
                'path': str(pdf_path.absolute()),
                'name': pdf_path.name,
                'size': file_stats.st_size,
                'subfolder': subfolder,
                'size_mb': round(file_stats.st_size / (1024 * 1024), 2)
            }
            
            pdf_files.append(pdf_info)
            
            if debug:
                print(f"[DEBUG] Found PDF: {pdf_info['name']} ({pdf_info['size_mb']} MB) in {subfolder}")
                
        except Exception as e:
            error_msg = f"Error processing file {pdf_path}: {str(e)}"
            if debug:
                print(f"[DEBUG] {error_msg}")
    
    # Sort by subfolder then by name
    pdf_files.sort(key=lambda x: (x['subfolder'], x['name']))
    
    if debug:
        print(f"[DEBUG] Successfully processed {len(pdf_files)} PDF files")
        
    return pdf_files

In [5]:
# Test PDF Directory Scanner
pdf_directory = "./pdfs"  # Change this to your actual directory path if needed

print("Scanning for PDF files...")
pdf_files = scan_pdf_directory(pdf_directory, debug=True)

print(f"\n Summary:")
print(f"Total PDFs found: {len(pdf_files)}")

if pdf_files:
    # Group by subfolder
    subfolders = {}
    total_size = 0
    
    for pdf in pdf_files:
        subfolder = pdf['subfolder']
        if subfolder not in subfolders:
            subfolders[subfolder] = []
        subfolders[subfolder].append(pdf)
        total_size += pdf['size']
    
    print(f"Total size: {round(total_size / (1024 * 1024), 2)} MB")
    print(f"Subfolders: {len(subfolders)}")
    
    print("\n📁 Files by subfolder:")
    for subfolder, files in subfolders.items():
        folder_size = sum(f['size'] for f in files)
        print(f"  {subfolder}: {len(files)} files ({round(folder_size / (1024 * 1024), 2)} MB)")
        
        # Show first few files as examples
        for i, file in enumerate(files[:3]):
            print(f"    - {file['name']} ({file['size_mb']} MB)")
        if len(files) > 3:
            print(f"    ... and {len(files) - 3} more")
else:
    print("❌ No PDF files found. Please check the directory path.")

Scanning for PDF files...
[DEBUG] Scanning directory: ./pdfs
[DEBUG] Found 1 PDF files
[DEBUG] Found PDF: deepshield-systems-employee-handbook-2023.pdf (0.01 MB) in root
[DEBUG] Successfully processed 1 PDF files

 Summary:
Total PDFs found: 1
Total size: 0.01 MB
Subfolders: 1

📁 Files by subfolder:
  root: 1 files (0.01 MB)
    - deepshield-systems-employee-handbook-2023.pdf (0.01 MB)


In [6]:
# PDF Text Extraction Function using Textract
def extract_text_from_pdf(pdf_path: str, textract_client: boto3.client, debug: bool = False) -> Dict:
    """
    Extract text from a PDF using AWS Textract (synchronous processing).
    
    Args:
        pdf_path (str): Path to the PDF file
        textract_client (boto3.client): Initialized Textract client
        debug (bool): Enable debug logging
    
    Returns:
        Dict: Extraction results with keys: 'success', 'text', 'confidence', 'error', 'page_count'
    """
    if debug:
        print(f"[DEBUG] Processing PDF: {pdf_path}")
    
    result = {
        'success': False,
        'text': '',
        'confidence': 0.0,
        'error': None,
        'page_count': 0,
        'file_size_mb': 0
    }
    
    try:
        # Check if file exists
        pdf_file = Path(pdf_path)
        if not pdf_file.exists():
            raise FileNotFoundError(f"PDF file not found: {pdf_path}")
        
        # Get file size
        file_size = pdf_file.stat().st_size
        result['file_size_mb'] = round(file_size / (1024 * 1024), 2)
        
        if debug:
            print(f"[DEBUG] File size: {result['file_size_mb']} MB")
        
        # Check file size limit (10MB for synchronous processing)
        if file_size > 10 * 1024 * 1024:
            raise ValueError(f"File too large for synchronous processing: {result['file_size_mb']} MB (max 10MB)")
        
        # Read the PDF file
        with open(pdf_path, 'rb') as document:
            document_bytes = document.read()
        
        if debug:
            print("[DEBUG] File read successfully, calling Textract...")
        
        # Call Textract
        response = textract_client.detect_document_text(
            Document={'Bytes': document_bytes}
        )
        
        if debug:
            print(f"[DEBUG] Textract response received with {len(response.get('Blocks', []))} blocks")
        
        # Extract text from blocks
        extracted_text = []
        confidences = []
        
        for block in response.get('Blocks', []):
            if block['BlockType'] == 'LINE':
                text = block.get('Text', '')
                confidence = block.get('Confidence', 0)
                
                extracted_text.append(text)
                confidences.append(confidence)
                
                if debug and len(extracted_text) <= 3:  # Show first few lines
                    print(f"[DEBUG] Line: '{text}' (confidence: {confidence:.1f}%)")
        
        # Combine results
        result['text'] = '\n'.join(extracted_text)
        result['confidence'] = sum(confidences) / len(confidences) if confidences else 0
        result['page_count'] = len([block for block in response.get('Blocks', []) if block['BlockType'] == 'PAGE'])
        result['success'] = True
        
        if debug:
            print(f"[DEBUG] Extraction successful!")
            print(f"[DEBUG] Pages: {result['page_count']}")
            print(f"[DEBUG] Lines extracted: {len(extracted_text)}")
            print(f"[DEBUG] Average confidence: {result['confidence']:.1f}%")
            print(f"[DEBUG] Text length: {len(result['text'])} characters")
        
    except ClientError as e:
        error_code = e.response['Error']['Code']
        error_msg = f"AWS Textract error ({error_code}): {e.response['Error']['Message']}"
        result['error'] = error_msg
        if debug:
            print(f"[DEBUG] {error_msg}")
    
    except Exception as e:
        error_msg = f"Error processing PDF: {str(e)}"
        result['error'] = error_msg
        if debug:
            print(f"[DEBUG] {error_msg}")
    
    return result

In [7]:
# Test PDF Text Extraction
# First ensure we have a working Textract client
try:
    if 'textract_client' not in locals():
        print("🔄 Initializing Textract client...")
        textract_client = initialize_textract_client(debug=True)
except Exception as e:
    print(f"❌ Failed to initialize Textract client: {e}")
    textract_client = None

if 'pdf_files' in locals() and pdf_files and textract_client:
    print("🧪 Testing PDF text extraction...\n")
    
    # Test on first few PDFs (or all if less than 3)
    test_files = pdf_files[:min(3, len(pdf_files))]
    
    for i, pdf_info in enumerate(test_files, 1):
        print(f"📄 Test {i}/{len(test_files)}: {pdf_info['name']}")
        print(f"   Size: {pdf_info['size_mb']} MB | Subfolder: {pdf_info['subfolder']}")
        
        # Skip if file is too large for sync processing
        if pdf_info['size_mb'] > 10:
            print("   ⚠️  Skipping - File too large for synchronous processing (>10MB)")
            continue
        
        # Extract text
        result = extract_text_from_pdf(pdf_info['path'], textract_client, debug=True)
        
        if result['success']:
            print(f"   ✅ Success!")
            print(f"   📊 Pages: {result['page_count']} | Confidence: {result['confidence']:.1f}%")
            print(f"   📝 Text length: {len(result['text'])} characters")
            
            # Show first 200 characters of extracted text
            preview_text = result['text'][:200].replace('\n', ' ')
            print(f"   🔍 Preview: {preview_text}...")
            
        else:
            print(f"   ❌ Failed: {result['error']}")
        
        print("-" * 60)
    
    # Summary
    successful_extractions = sum(1 for pdf in test_files if pdf['size_mb'] <= 10)
    print(f"\n📈 Test Summary:")
    print(f"Files tested: {len(test_files)}")
    print(f"Files suitable for sync processing: {successful_extractions}")
    
else:
    print("❌ Cannot run test - missing PDF files or Textract client")
    print("Please run the previous cells first to scan for PDFs and initialize the client.")


🧪 Testing PDF text extraction...

📄 Test 1/1: deepshield-systems-employee-handbook-2023.pdf
   Size: 0.01 MB | Subfolder: root
[DEBUG] Processing PDF: c:\Code\Code for learning at Diligence\Text Chunking and Processing\PDF-Processing-Pipeline\pdfs\deepshield-systems-employee-handbook-2023.pdf
[DEBUG] File size: 0.01 MB
[DEBUG] File read successfully, calling Textract...
[DEBUG] AWS Textract error (SubscriptionRequiredException): The AWS Access Key Id needs a subscription for the service
   ❌ Failed: AWS Textract error (SubscriptionRequiredException): The AWS Access Key Id needs a subscription for the service
------------------------------------------------------------

📈 Test Summary:
Files tested: 1
Files suitable for sync processing: 1


# PDF Extraction Uses PyMuPDF

Since Textract isn't working, trying with PyMuPDF

In [8]:
import os
import fitz  # PyMuPDF
import re

In [9]:
# Use PyMuPDF for PDF Extraction
def extract_text_from_pdf(pdf_path, debug=False):
    """
    Extract text from a single PDF file
    
    Args:
        pdf_path (str): Path to the PDF file
        debug (bool): Print debug information
    
    Returns:
        str: Extracted text from the PDF
    """
    if debug:
        print(f"[DEBUG] Starting text extraction for: {pdf_path}")
    
    try:
        # Open the PDF document
        doc = fitz.open(pdf_path)
        text = ""
        
        if debug:
            print(f"[DEBUG] PDF opened successfully. Total pages: {doc.page_count}")
        
        # Iterate through each page
        for page_num in range(doc.page_count):
            page = doc[page_num]
            # Extract text from the page
            page_text = page.get_text()
            
            if debug:
                char_count = len(page_text.strip())
                print(f"[DEBUG] Page {page_num + 1}: Extracted {char_count} characters")
            
            text += f"\n--- Page {page_num + 1} ---\n"
            text += page_text
            text += "\n"
        
        # Close the document
        doc.close()
        
        if debug:
            total_chars = len(text.strip())
            print(f"[DEBUG] Completed extraction. Total characters: {total_chars}")
        
        return text
    
    except Exception as e:
        error_msg = f"Error extracting text from {pdf_path}: {str(e)}"
        if debug:
            print(f"[DEBUG ERROR] {error_msg}")
        else:
            print(error_msg)
        return ""

# Find PDF files in a directory (Don't care about file size unlike other function)
def find_pdf_files(directory, debug=False):
    """
    Find all PDF files in directory and subdirectories
    
    Args:
        directory (str): Root directory to search
        debug (bool): Print debug information
    
    Returns:
        list: List of PDF file paths
    """
    if debug:
        print(f"[DEBUG] Searching for PDF files in: {directory}")
    
    pdf_files = []
    directory_path = Path(directory)
    
    if not directory_path.exists():
        error_msg = f"Directory {directory} does not exist"
        if debug:
            print(f"[DEBUG ERROR] {error_msg}")
        else:
            print(error_msg)
        return []
    
    # Recursively find all PDF files
    for pdf_file in directory_path.rglob("*.pdf"):
        if pdf_file.is_file():
            pdf_files.append(str(pdf_file))
            if debug:
                print(f"[DEBUG] Found PDF: {pdf_file}")
    
    if debug:
        print(f"[DEBUG] Total PDF files found: {len(pdf_files)}")
    
    return pdf_files

def extract_pdfs_from_directory(input_directory, output_file=None, save_individual=False, debug=False):
    """
    Extract text from all PDFs in a directory and its subdirectories
    
    Args:
        input_directory (str): Directory containing PDF files
        output_file (str, optional): Path to save combined extracted text
        save_individual (bool): Whether to save each PDF's text individually
        debug (bool): Print debug information
    
    Returns:
        dict: Dictionary with PDF paths as keys and extracted text as values
    """
    if debug:
        print(f"[DEBUG] Starting PDF extraction from directory: {input_directory}")
        print(f"[DEBUG] Output file: {output_file}")
        print(f"[DEBUG] Save individual files: {save_individual}")
    
    if not os.path.exists(input_directory):
        error_msg = f"Directory {input_directory} does not exist"
        if debug:
            print(f"[DEBUG ERROR] {error_msg}")
        else:
            print(error_msg)
        return {}
    
    # Find all PDF files
    pdf_files = find_pdf_files(input_directory, debug=debug)
    
    if not pdf_files:
        warning_msg = f"No PDF files found in {input_directory}"
        if debug:
            print(f"[DEBUG WARNING] {warning_msg}")
        else:
            print(warning_msg)
        return {}
    
    print(f"Found {len(pdf_files)} PDF files")
    
    extracted_texts = {}
    all_text = ""
    successful_extractions = 0
    
    for i, pdf_path in enumerate(pdf_files, 1):
        if debug:
            print(f"[DEBUG] Processing file {i}/{len(pdf_files)}: {pdf_path}")
        else:
            print(f"Processing ({i}/{len(pdf_files)}): {os.path.basename(pdf_path)}")
        
        # Extract text from PDF
        text = extract_text_from_pdf(pdf_path, debug=debug)
        
        if text.strip():  # Check if there's actual content
            extracted_texts[pdf_path] = text
            successful_extractions += 1
            
            # Add to combined text
            all_text += f"\n{'='*50}\n"
            all_text += f"FILE: {pdf_path}\n"
            all_text += f"{'='*50}\n"
            all_text += text
            all_text += "\n"
            
            # Save individual file if requested
            if save_individual:
                output_path = pdf_path.replace('.pdf', '_extracted.txt')
                try:
                    with open(output_path, 'w', encoding='utf-8') as f:
                        f.write(text)
                    if debug:
                        print(f"[DEBUG] Saved individual text to: {output_path}")
                    else:
                        print(f"  → Saved individual text file")
                except Exception as e:
                    error_msg = f"Error saving individual file {output_path}: {str(e)}"
                    if debug:
                        print(f"[DEBUG ERROR] {error_msg}")
                    else:
                        print(f"  → Error saving individual file: {str(e)}")
        
        else:
            warning_msg = f"No text extracted from: {pdf_path}"
            if debug:
                print(f"[DEBUG WARNING] {warning_msg}")
            else:
                print(f"  → No text extracted")
    
    # Save combined text if output file specified
    if output_file and all_text:
        try:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(all_text)
            if debug:
                print(f"[DEBUG] Saved combined text to: {output_file}")
            else:
                print(f"Saved combined text to: {output_file}")
        except Exception as e:
            error_msg = f"Error saving combined file {output_file}: {str(e)}"
            if debug:
                print(f"[DEBUG ERROR] {error_msg}")
            else:
                print(f"Error saving combined file: {str(e)}")
    
    # Print summary
    print(f"\nExtraction complete!")
    print(f"Successfully processed {successful_extractions}/{len(pdf_files)} PDF files")
    
    if debug:
        print(f"[DEBUG] Total characters in combined text: {len(all_text)}")
        print(f"[DEBUG] Extraction results dictionary has {len(extracted_texts)} entries")
    
    return extracted_texts

def simple_extract(directory_path, debug=False):
    """
    Simple function to extract text from all PDFs in a directory
    
    Args:
        directory_path (str): Path to directory containing PDFs
        debug (bool): Print debug information
    
    Returns:
        dict: Dictionary with PDF paths as keys and extracted text as values
    """
    if debug:
        print(f"[DEBUG] Simple extract called for directory: {directory_path}")
    
    return extract_pdfs_from_directory(directory_path, debug=debug)

In [10]:
extracted_texts = simple_extract(r".\pdfs")
extracted_texts

Found 1 PDF files
Processing (1/1): deepshield-systems-employee-handbook-2023.pdf

Extraction complete!
Successfully processed 1/1 PDF files


{'pdfs\\deepshield-systems-employee-handbook-2023.pdf': '\n--- Page 1 ---\nDeepShield Systems Employee Handbook 2023\n1. Introduction and Purpose\n1. This Employee Handbook ("Handbook") sets forth the policies, procedures, and working\nconditions applicable to all employees of DeepShield Systems, Inc. ("DeepShield" or the\n"Company"), a Delaware corporation with principal offices at 2100 Innovation Drive, Suite 400,\nWilmington, DE 19801.\n2. This Handbook supersedes all previous employee handbooks and management memos. The\npolicies contained herein are effective as of January 1, 2023.\n2. Employment Policies\n1. Equal Employment Opportunity\nDeepShield provides equal employment opportunities to all employees and applicants without regard\nto race, color, religion, sex, national origin, age, disability, genetic information, or any other\nprotected characteristic.\n2. Employment Classification\n-\nFull-time Regular: Employees scheduled to work 40 hours per week\n-\nPart-time Regular: E

In [11]:
# Different Text Extraction Strategy using PyMuPDF block based extraction
def extract_text_with_layout(pdf_path: str, debug: bool = False) -> str:
    """
    Extract text from PDF using PyMuPDF's block-based extraction for better paragraph detection.
    
    Args:
        pdf_path (str): Path to the PDF file
        debug (bool): Print debug information
    
    Returns:
        str: Extracted text with better paragraph preservation
    """
    if debug:
        print(f"[DEBUG] Starting layout-aware extraction for: {pdf_path}")
    
    try:
        doc = fitz.open(pdf_path)
        text = ""
        
        if debug:
            print(f"[DEBUG] PDF opened successfully. Total pages: {doc.page_count}")
        
        for page_num in range(doc.page_count):
            page = doc[page_num]
            
            # Use block-based extraction which preserves paragraph structure
            blocks = page.get_text("blocks")
            
            if debug:
                print(f"[DEBUG] Page {page_num + 1}: Found {len(blocks)} text blocks")
            
            text += f"\n--- Page {page_num + 1} ---\n"
            
            # Process blocks - each block is typically a paragraph or text unit
            for block_num, block in enumerate(blocks):
                # block format: (x0, y0, x1, y1, "text", block_no, block_type)
                # block_type 0 = text, 1 = image
                if block[6] == 0:  # Text block
                    block_text = block[4]
                    
                    # Clean up the block text
                    # Join lines within the block that are clearly continuations
                    lines = block_text.split('\n')
                    cleaned_lines = []
                    
                    for i, line in enumerate(lines):
                        line = line.strip()
                        if not line:
                            continue
                        
                        # Check if this line should be joined with the previous one
                        if cleaned_lines and not line[0].isupper() and not line[0].isdigit():
                            # Likely a continuation - join with previous line
                            cleaned_lines[-1] += " " + line
                        elif cleaned_lines and not cleaned_lines[-1].endswith(('.', ':', '-', ';')):
                            # Previous line doesn't end with punctuation - likely continuation
                            cleaned_lines[-1] += " " + line
                        else:
                            # New line/paragraph
                            cleaned_lines.append(line)
                    
                    # Join the cleaned lines within this block
                    block_text = ' '.join(cleaned_lines)
                    
                    if block_text.strip():
                        text += block_text + "\n\n"
                        
                        if debug and block_num < 3:
                            preview = block_text[:100].replace('\n', ' ')
                            print(f"[DEBUG]   Block {block_num}: {preview}...")
        
        doc.close()
        
        if debug:
            print(f"[DEBUG] Extraction complete. Total characters: {len(text)}")
        
        return text
    
    except Exception as e:
        error_msg = f"Error extracting text from {pdf_path}: {str(e)}"
        print(f"[DEBUG ERROR] {error_msg}")
        return ""

# Text Cleaning

In [43]:
def clean_text_for_rag(text: str, debug: bool = False) -> str:
    """
    Improved text cleaning with hierarchical section structure.
    Uses positional logic instead of keywords for generalization.
    """
    
    if debug:
        print(f"[DEBUG] Starting improved text cleaning. Original length: {len(text)} characters")
    
    original_text = text
    
    # Steps 1-3: Same as before (page markers, artifacts, whitespace)
    text = re.sub(r'--- Page \d+ ---\n?', '', text)
    text = re.sub(r'^[-_]{3,}\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'_{3,}', '[SIGNATURE LINE]', text)
    text = re.sub(r' {2,}', ' ', text)
    lines = [line.strip() for line in text.split('\n') if line.strip()]
    text = '\n'.join(lines)
    
    if debug:
        print(f"[DEBUG] Steps 1-3 complete: Basic cleaning")
    
    # Step 4: Join line continuations
    if debug:
        print(f"[DEBUG] Step 4: Joining line continuations")
    
    lines = text.split('\n')
    joined_lines = []
    i = 0
    
    def is_section_header(line: str) -> bool:
        """Detect if a line is a section header."""
        if not re.match(r'^\d+\.\s+', line):
            return False
        if len(line) > 60:
            return False
        text_part = re.sub(r'^\d+\.\s+', '', line)
        if text_part.endswith(('.', '!', '?')):
            return False
        words = text_part.split()
        if len(words) > 1:
            capitalized = sum(1 for w in words if w and w[0].isupper())
            if capitalized / len(words) > 0.5:
                return True
        if len(words) <= 3 and len(line) < 40:
            return True
        return False
    
    while i < len(lines):
        current_line = lines[i].strip()
        
        is_bullet = re.match(r'^[-•*]\s+', current_line)
        is_signature = '[SIGNATURE LINE]' in current_line
        is_header = is_section_header(current_line)
        
        if is_bullet or is_signature or is_header:
            joined_lines.append(current_line)
            i += 1
            continue
        
        paragraph = [current_line]
        i += 1
        
        while i < len(lines):
            next_line = lines[i].strip()
            
            if (re.match(r'^[-•*]\s+', next_line) or 
                '[SIGNATURE LINE]' in next_line or 
                is_section_header(next_line)):
                break
            
            prev_line = paragraph[-1]
            first_char = next_line[0] if next_line else ''
            starts_lowercase = first_char.isalpha() and first_char.islower()
            
            prev_no_end_punct = not prev_line.endswith(('.', '!', '?', ':', ';'))
            prev_has_continuation = prev_line.rstrip().endswith((',', 'and', 'or', 'the', 'of', 'to', 'in', 'for', 'with', 'at', 'as', 'by'))
            
            if starts_lowercase or prev_no_end_punct or prev_has_continuation:
                if first_char.isdigit() and len(next_line) > 60:
                    break
                paragraph.append(next_line)
                i += 1
            else:
                break
        
        joined_lines.append(' '.join(paragraph))
    
    text = '\n'.join(joined_lines)
    
    if debug:
        print(f"[DEBUG] Step 4 complete")
    
    # Step 5: Detect hierarchy based on position
    if debug:
        print(f"[DEBUG] Step 5: Detecting hierarchy from position")
    
    lines = text.split('\n')
    structured_lines = []
    
    for i, line in enumerate(lines):
        line_stripped = line.strip()
        
        if not line_stripped or not is_section_header(line_stripped):
            structured_lines.append(line)
            continue
        
        # Check what came before this header
        prev_idx = i - 1
        while prev_idx >= 0 and not lines[prev_idx].strip():
            prev_idx -= 1
        
        # Determine if main section or subsection
        if prev_idx < 0:
            # First item in document - main section
            structured_lines.append(line_stripped + " [MAIN]")
        else:
            prev_line = lines[prev_idx].strip()
            prev_is_header = is_section_header(prev_line)
            
            if prev_is_header:
                # Header following header = subsection
                structured_lines.append(line_stripped + " [SUB]")
            else:
                # Header following content = new main section
                structured_lines.append(line_stripped + " [MAIN]")
        
        if debug:
            marker = "[MAIN]" if "[MAIN]" in structured_lines[-1] else "[SUB]"
            print(f"[DEBUG]   Line {i}: {marker} '{line_stripped}'")
    
    text = '\n'.join(structured_lines)
    
    # Step 6: Renumber based on hierarchy
    if debug:
        print(f"[DEBUG] Step 6: Renumbering")
    
    lines = text.split('\n')
    processed_lines = []
    current_main = None
    sub_counter = 1
    
    for line in lines:
        line_stripped = line.strip()
        
        if not line_stripped:
            processed_lines.append('')
            continue
        
        if '[MAIN]' in line_stripped:
            line_stripped = line_stripped.replace(' [MAIN]', '')
            match = re.match(r'^(\d+)\.\s+(.+)', line_stripped)
            if match:
                current_main = match.group(1)
                sub_counter = 1
                processed_lines.append('')
                processed_lines.append(line_stripped)
                processed_lines.append('')
            continue
        
        if '[SUB]' in line_stripped:
            line_stripped = line_stripped.replace(' [SUB]', '')
            if current_main:
                text_part = re.sub(r'^\d+\.\s+', '', line_stripped)
                new_line = f"{current_main}.{sub_counter} {text_part}"
                processed_lines.append(new_line)
                sub_counter += 1
            else:
                processed_lines.append(line_stripped)
            continue
        
        # Keep everything else as-is (no auto-numbering of content)
        processed_lines.append(line_stripped)
    
    text = '\n'.join(processed_lines)
    
    if debug:
        print(f"[DEBUG] Step 6 complete")
    
    # Final cleanup
    text = re.sub(r'^[-•*]\s+', '• ', text, flags=re.MULTILINE)
    text = re.sub(r'\.{2,}', '.', text)
    text = re.sub(r'\s*:\s*', ': ', text)
    
    lines = text.split('\n')
    cleaned_lines = []
    prev_empty = False
    
    for line in lines:
        line = line.strip()
        if line:
            cleaned_lines.append(line)
            prev_empty = False
        elif not prev_empty:
            cleaned_lines.append('')
            prev_empty = True
    
    text = '\n'.join(cleaned_lines).strip()
    
    if debug:
        print(f"[DEBUG] Complete. Final length: {len(text)} characters")
    
    return text

In [66]:
def clean_text_for_rag(text: str, debug: bool = False) -> str:
    """
    Improved text cleaning with hierarchical section structure.
    Detects main sections by observing when numbering resets to 1.
    """
    
    if debug:
        print(f"[DEBUG] Starting improved text cleaning. Original length: {len(text)} characters")
    
    original_text = text
    
    # Steps 1-3: Basic cleaning
    text = re.sub(r'--- Page \d+ ---\n?', '', text)
    text = re.sub(r'^[-_]{3,}\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'_{3,}', '[SIGNATURE LINE]', text)
    text = re.sub(r' {2,}', ' ', text)
    lines = [line.strip() for line in text.split('\n') if line.strip()]
    text = '\n'.join(lines)
    
    # Step 4: Join line continuations
    lines = text.split('\n')
    joined_lines = []
    i = 0
    
    def is_section_header(line: str) -> bool:
        """Detect if a line is a section header."""
        if not re.match(r'^\d+\.\s+', line):
            return False
        if len(line) > 60:
            return False
        text_part = re.sub(r'^\d+\.\s+', '', line)
        if text_part.endswith(('.', '!', '?')):
            return False
        words = text_part.split()
        if len(words) > 1:
            capitalized = sum(1 for w in words if w and w[0].isupper())
            if capitalized / len(words) > 0.5:
                return True
        if len(words) <= 3 and len(line) < 40:
            return True
        return False
    
    while i < len(lines):
        current_line = lines[i].strip()
        
        is_bullet = re.match(r'^[-•*]\s+', current_line)
        is_signature = '[SIGNATURE LINE]' in current_line
        is_header = is_section_header(current_line)
        
        if is_bullet or is_signature or is_header:
            joined_lines.append(current_line)
            i += 1
            continue
        
        paragraph = [current_line]
        i += 1
        
        while i < len(lines):
            next_line = lines[i].strip()
            
            if (re.match(r'^[-•*]\s+', next_line) or 
                '[SIGNATURE LINE]' in next_line or 
                is_section_header(next_line)):
                break
            
            prev_line = paragraph[-1]
            first_char = next_line[0] if next_line else ''
            starts_lowercase = first_char.isalpha() and first_char.islower()
            
            prev_no_end_punct = not prev_line.endswith(('.', '!', '?', ':', ';'))
            prev_has_continuation = prev_line.rstrip().endswith((',', 'and', 'or', 'the', 'of', 'to', 'in', 'for', 'with', 'at', 'as', 'by'))
            
            if starts_lowercase or prev_no_end_punct or prev_has_continuation:
                if first_char.isdigit() and len(next_line) > 60:
                    break
                paragraph.append(next_line)
                i += 1
            else:
                break
        
        joined_lines.append(' '.join(paragraph))
    
    text = '\n'.join(joined_lines)
    
    # Step 5: Detect headers and classify using reset-to-1 logic
    if debug:
        print(f"\n[DEBUG] Step 5: Detecting headers with reset-to-1 logic")
    
    lines = text.split('\n')
    
    # First pass: find all headers
    header_positions = []
    for i, line in enumerate(lines):
        line_stripped = line.strip()
        if is_section_header(line_stripped):
            num_match = re.match(r'^(\d+)\.', line_stripped)
            number = int(num_match.group(1)) if num_match else 0
            header_positions.append({
                'index': i,
                'line': line_stripped,
                'number': number
            })
    
    if debug:
        print(f"[DEBUG] Found {len(header_positions)} headers")
        print(f"[DEBUG] Numbers: {[h['number'] for h in header_positions]}")
    
    # Second pass: classify based on reset-to-1 pattern
    # A header that is numbered "1" AFTER we've seen other numbers indicates
    # the START of a subsection group under a new main section
    structured_lines = []
    header_idx = 0
    current_main_num = 0
    in_subsection_group = False
    
    for i, line in enumerate(lines):
        line_stripped = line.strip()
        
        if header_idx < len(header_positions) and i == header_positions[header_idx]['index']:
            current_header = header_positions[header_idx]
            current_num = current_header['number']
            
            is_main = False
            
            if header_idx == 0:
                # First header
                if current_num == 1:
                    # Starts with 1, it's main
                    is_main = True
                    current_main_num = 1
                    in_subsection_group = False
                else:
                    # Starts with non-1, treat as main anyway
                    is_main = True
                    current_main_num = current_num
                    in_subsection_group = False
            else:
                prev_num = header_positions[header_idx - 1]['number']
                
                # Check if this is a reset to 1 after being in a sequence
                if current_num == 1 and prev_num > 1:
                    # This marks the start of a subsection group
                    # The PREVIOUS high number was actually a main section
                    # We need to retroactively fix this
                    in_subsection_group = True
                    is_main = False
                    if debug:
                        print(f"[DEBUG] Header {header_idx}: Detected reset to 1 (after {prev_num}), entering subsection group")
                elif in_subsection_group:
                    # We're in a subsection group, check if we should exit
                    if current_num > prev_num + 1:
                        # Big jump, this is a new main section
                        is_main = True
                        current_main_num = current_num
                        in_subsection_group = False
                        if debug:
                            print(f"[DEBUG] Header {header_idx}: Jump from {prev_num} to {current_num}, exiting subsection group")
                    else:
                        # Continue in subsection group
                        is_main = False
                else:
                    # Not in subsection group
                    # If we see a sequential increase, stay as main
                    if current_num == prev_num + 1 or current_num == prev_num:
                        # Might be continuing main or starting subsection
                        # Check ahead: if next header is 1, current is main
                        if header_idx + 1 < len(header_positions):
                            next_num = header_positions[header_idx + 1]['number']
                            if next_num == 1:
                                # Next is 1, so current is main
                                is_main = True
                                current_main_num = current_num
                            else:
                                # Unclear, default to main if sequential
                                is_main = True
                                current_main_num = current_num
                        else:
                            is_main = True
                            current_main_num = current_num
                    else:
                        is_main = True
                        current_main_num = current_num
            
            if is_main:
                structured_lines.append(line_stripped + " [MAIN]")
            else:
                structured_lines.append(line_stripped + " [SUB]")
            
            header_idx += 1
        else:
            structured_lines.append(line)
    
    text = '\n'.join(structured_lines)
    
    # Step 6: Renumber
    if debug:
        print(f"\n[DEBUG] Step 6: Renumbering")
    
    lines = text.split('\n')
    processed_lines = []
    current_main = None
    sub_counter = 1
    
    for line in lines:
        line_stripped = line.strip()
        
        if not line_stripped:
            processed_lines.append('')
            continue
        
        if '[MAIN]' in line_stripped:
            line_stripped = line_stripped.replace(' [MAIN]', '')
            match = re.match(r'^(\d+)\.\s+(.+)', line_stripped)
            if match:
                current_main = match.group(1)
                sub_counter = 1
                processed_lines.append('')
                processed_lines.append(line_stripped)
                processed_lines.append('')
                if debug:
                    print(f"[DEBUG]   Main {current_main}: '{line_stripped}'")
            continue
        
        if '[SUB]' in line_stripped:
            line_stripped = line_stripped.replace(' [SUB]', '')
            if current_main:
                text_part = re.sub(r'^\d+\.\s+', '', line_stripped)
                new_line = f"{current_main}.{sub_counter} {text_part}"
                processed_lines.append(new_line)
                if debug:
                    print(f"[DEBUG]   Sub {current_main}.{sub_counter}: '{text_part}'")
                sub_counter += 1
            else:
                processed_lines.append(line_stripped)
            continue
        
        processed_lines.append(line_stripped)
    
    text = '\n'.join(processed_lines)
    
    # Final cleanup
    text = re.sub(r'^[-•*]\s+', '• ', text, flags=re.MULTILINE)
    text = re.sub(r'\.{2,}', '.', text)
    text = re.sub(r'\s*:\s*', ': ', text)
    
    lines = text.split('\n')
    cleaned_lines = []
    prev_empty = False
    
    for line in lines:
        line = line.strip()
        if line:
            cleaned_lines.append(line)
            prev_empty = False
        elif not prev_empty:
            cleaned_lines.append('')
            prev_empty = True
    
    text = '\n'.join(cleaned_lines).strip()
    
    if debug:
        print(f"\n[DEBUG] Complete")
    
    return text

In [69]:
cleaned_text = clean_text_for_rag(next(iter(extracted_texts.values())), debug=True)

[DEBUG] Starting improved text cleaning. Original length: 4511 characters

[DEBUG] Step 5: Detecting headers with reset-to-1 logic
[DEBUG] Found 20 headers
[DEBUG] Numbers: [1, 2, 1, 2, 3, 3, 1, 2, 3, 4, 1, 2, 5, 1, 2, 6, 1, 2, 7, 8]
[DEBUG] Header 2: Detected reset to 1 (after 2), entering subsection group
[DEBUG] Header 6: Detected reset to 1 (after 3), entering subsection group
[DEBUG] Header 10: Detected reset to 1 (after 4), entering subsection group
[DEBUG] Header 12: Jump from 2 to 5, exiting subsection group
[DEBUG] Header 13: Detected reset to 1 (after 5), entering subsection group
[DEBUG] Header 15: Jump from 2 to 6, exiting subsection group
[DEBUG] Header 16: Detected reset to 1 (after 6), entering subsection group
[DEBUG] Header 18: Jump from 2 to 7, exiting subsection group

[DEBUG] Step 6: Renumbering
[DEBUG]   Main 1: '1. Introduction and Purpose'
[DEBUG]   Main 2: '2. Employment Policies'
[DEBUG]   Sub 2.1: 'Equal Employment Opportunity'
[DEBUG]   Sub 2.2: 'Employment C

In [68]:
print(cleaned_text)

DeepShield Systems Employee Handbook 2023

1. Introduction and Purpose

1. This Employee Handbook ("Handbook") sets forth the policies, procedures, and working conditions applicable to all employees of DeepShield Systems, Inc. ("DeepShield" or the "Company"), a Delaware corporation with principal offices at 2100 Innovation Drive, Suite 400, Wilmington, DE 19801.
2. This Handbook supersedes all previous employee handbooks and management memos. The policies contained herein are effective as of January 1, 2023.

2. Employment Policies

2.1 Equal Employment Opportunity
DeepShield provides equal employment opportunities to all employees and applicants without regard to race, color, religion, sex, national origin, age, disability, genetic information, or any other protected characteristic.
2.2 Employment Classification
• Full-time Regular: Employees scheduled to work 40 hours per week - Part-time Regular: Employees scheduled to work less than 40 hours per week - Temporary: Employees hired fo

# Text Chunking not using LangChain

In [16]:
# Chunk by section headers
def chunk_document_by_sections(cleaned_text: str, debug: bool = False) -> List[Dict[str, str]]:
    """
    Split the cleaned document into logical chunks based on sections.
    
    Args:
        cleaned_text (str): Cleaned text from clean_text_for_rag()
        debug (bool): If True, prints debug information about chunking process
        
    Returns:
        List[Dict[str, str]]: List of chunks with metadata
    """
    if debug:
        print(f"\n[DEBUG] Starting document chunking")
        print(f"[DEBUG] Input text length: {len(cleaned_text)} characters")
        print(f"[DEBUG] Input text lines: {len(cleaned_text.splitlines())}")
    
    chunks = []
    
    # First, let's see what section headers we can find
    section_headers = re.findall(r'^(\d+\.\s+[^\n]+)', cleaned_text, flags=re.MULTILINE)
    if debug:
        print(f"[DEBUG] Found potential section headers:")
        for i, header in enumerate(section_headers):
            print(f"[DEBUG]   {i+1}: '{header}'")
    
    # Split by major sections (numbered sections like "1. Introduction")
    # Use a more flexible pattern that captures the full header line
    sections = re.split(r'^(\d+\.\s+[^\n]*)', cleaned_text, flags=re.MULTILINE)
    
    if debug:
        print(f"[DEBUG] Split resulted in {len(sections)} segments")
        for i, segment in enumerate(sections[:6]):  # Show first 6 segments
            segment_preview = segment.strip()[:100].replace('\n', '\\n')
            print(f"[DEBUG]   Segment {i}: '{segment_preview}...'")
    
    current_section = "Document Header"
    current_content = ""
    segment_count = 0
    
    for i, segment in enumerate(sections):
        segment = segment.strip()
        
        if not segment:  # Skip empty segments
            continue
            
        # Check if this segment is a section header
        if re.match(r'^\d+\.\s+[A-Za-z]', segment):
            # This is a section header - save previous section if it has content
            if current_content.strip():
                chunk_length = len(current_content.strip())
                chunks.append({
                    'section': current_section,
                    'content': current_content.strip(),
                    'chunk_id': len(chunks)
                })
                if debug:
                    print(f"[DEBUG] Created chunk {len(chunks)-1}: '{current_section}' ({chunk_length} chars)")
            
            # Start new section
            current_section = segment
            current_content = ""
            segment_count += 1
            if debug:
                print(f"[DEBUG] Starting new section: '{current_section}'")
        else:
            # This is section content - add to current section
            if current_content:
                current_content += "\n\n" + segment
            else:
                current_content = segment
            if debug and len(segment) > 50:
                print(f"[DEBUG] Added content to '{current_section[:30]}...': {len(segment)} chars")
    
    # Add the last section if it has content
    if current_content.strip():
        chunk_length = len(current_content.strip())
        chunks.append({
            'section': current_section,
            'content': current_content.strip(),
            'chunk_id': len(chunks)
        })
        if debug:
            print(f"[DEBUG] Created final chunk {len(chunks)-1}: '{current_section}' ({chunk_length} chars)")
    
    if debug:
        print(f"\n[DEBUG] Chunking summary:")
        print(f"[DEBUG] - Found {len(section_headers)} section headers in text")
        print(f"[DEBUG] - Processed {segment_count} sections")
        print(f"[DEBUG] - Created {len(chunks)} chunks")
        
        total_content_length = sum(len(chunk['content']) for chunk in chunks)
        avg_chunk_length = total_content_length / len(chunks) if chunks else 0
        print(f"[DEBUG] - Total content length: {total_content_length} characters")
        print(f"[DEBUG] - Average chunk length: {avg_chunk_length:.1f} characters")
        
        # Show chunk size distribution
        chunk_sizes = [len(chunk['content']) for chunk in chunks]
        if chunk_sizes:
            print(f"[DEBUG] - Chunk size range: {min(chunk_sizes)} - {max(chunk_sizes)} characters")
            
        # Show all chunk sections
        print(f"[DEBUG] Final chunks created:")
        for chunk in chunks:
            print(f"[DEBUG]   Chunk {chunk['chunk_id']}: '{chunk['section'][:60]}...' ({len(chunk['content'])} chars)")
    
    return chunks
    
    return chunks

In [17]:
chunk_document_by_sections(cleaned_text, debug=False)

[{'section': 'Document Header',
  'content': 'DeepShield Systems Employee Handbook 2023',
  'chunk_id': 0},
 {'section': '2. Employment Policies',
  'content': '2.1 Equal Employment Opportunity\n2.1.1 DeepShield provides equal employment opportunities to all employees and applicants without regard to race, color, religion, sex, national origin, age, disability, genetic information, or any other protected characteristic.',
  'chunk_id': 1},
 {'section': '2. Employment Classification',
  'content': '• Full-time Regular: Employees scheduled to work 40 hours per week - Part-time Regular: Employees scheduled to work less than 40 hours per week - Temporary: Employees hired for a specific project or time period - Exempt: Salaried employees exempt from overtime requirements - Non-exempt: Employees eligible for overtime compensation',
  'chunk_id': 2},
 {'section': '3. Security Clearance Requirements',
  'content': "3.1 Given the nature of DeepShield's work in critical infrastructure protection

Has an issue where newlines are treated as new sections. Easy to see on Section 1 compared to section 1.1 which are both part of the same sentence

# Chunking using LangChain

In [18]:
from langchain.text_splitter import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitter
from typing import List, Dict, Any

In [19]:
class LegalDocumentChunker:
    """
    Specialized chunker for legal documents that preserves section structure
    while ensuring chunks fit within LLM context limits.
    """
    
    def __init__(self, 
                 max_chunk_size: int = 1000,
                 chunk_overlap: int = 200,
                 preserve_section_hierarchy: bool = True):
        """
        Initialize the legal document chunker.
        
        Args:
            max_chunk_size: Maximum characters per chunk
            chunk_overlap: Number of characters to overlap between chunks
            preserve_section_hierarchy: Whether to maintain section context in metadata
        """
        self.max_chunk_size = max_chunk_size
        self.chunk_overlap = chunk_overlap
        self.preserve_section_hierarchy = preserve_section_hierarchy
        
        # Configure the recursive splitter for oversized sections
        self.recursive_splitter = RecursiveCharacterTextSplitter(
            chunk_size=max_chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", ". ", " ", ""]
        )
    
    def convert_to_markdown_headers(self, text: str) -> str:
        """
        Convert numbered sections to markdown headers for better splitting.
        """
        # Convert main sections (1. Title) to ## headers
        text = re.sub(r'^(\d+)\.\s+([A-Z][^\n]+)', r'## \1. \2', text, flags=re.MULTILINE)
        
        # Convert subsections (1.1 Title) to ### headers  
        text = re.sub(r'^(\d+\.\d+)\.?\s+([A-Z][^\n]+)', r'### \1 \2', text, flags=re.MULTILINE)
        
        # Convert sub-subsections (1.1.1 Title) to #### headers
        text = re.sub(r'^(\d+\.\d+\.\d+)\.?\s+([A-Z][^\n]+)', r'#### \1 \2', text, flags=re.MULTILINE)
        
        return text
    
    def chunk_legal_document(self, cleaned_text: str, document_title: str = None) -> List[Dict[str, Any]]:
        """
        Chunk a legal document while preserving section structure.
        
        Args:
            cleaned_text: Text that has been cleaned with clean_text_for_rag()
            document_title: Optional document title for metadata
            
        Returns:
            List of dictionaries containing chunk text and metadata
        """
        # Convert sections to markdown headers
        markdown_text = self.convert_to_markdown_headers(cleaned_text)
        
        # Define headers to split on
        headers_to_split_on = [
            ("##", "section"),
            ("###", "subsection"), 
            ("####", "subsubsection")
        ]
        
        # Create markdown header splitter
        markdown_splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=headers_to_split_on
        )
        
        # Split by headers first
        header_chunks = markdown_splitter.split_text(markdown_text)
        
        # Process each chunk
        final_chunks = []
        
        for chunk in header_chunks:
            chunk_text = chunk.page_content
            chunk_metadata = chunk.metadata.copy()
            
            # Add document-level metadata
            if document_title:
                chunk_metadata["document_title"] = document_title
            
            # Check if chunk is too large and needs further splitting
            if len(chunk_text) > self.max_chunk_size:
                # Use recursive splitter for oversized chunks
                sub_chunks = self.recursive_splitter.split_text(chunk_text)
                
                for i, sub_chunk in enumerate(sub_chunks):
                    sub_metadata = chunk_metadata.copy()
                    sub_metadata["chunk_part"] = f"{i+1}/{len(sub_chunks)}"
                    sub_metadata["is_split_chunk"] = True
                    
                    final_chunks.append({
                        "text": sub_chunk,
                        "metadata": sub_metadata,
                        "chunk_size": len(sub_chunk)
                    })
            else:
                chunk_metadata["is_split_chunk"] = False
                final_chunks.append({
                    "text": chunk_text,
                    "metadata": chunk_metadata,
                    "chunk_size": len(chunk_text)
                })
        
        return final_chunks
    
    def create_section_hierarchy_context(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Add hierarchical context to chunks for better retrieval.
        """
        if not self.preserve_section_hierarchy:
            return chunks
        
        for chunk in chunks:
            metadata = chunk["metadata"]
            context_parts = []
            
            # Build hierarchical context
            if "section" in metadata:
                context_parts.append(f"Section: {metadata['section']}")
            if "subsection" in metadata:
                context_parts.append(f"Subsection: {metadata['subsection']}")
            if "subsubsection" in metadata:
                context_parts.append(f"Sub-subsection: {metadata['subsubsection']}")
            
            if context_parts:
                chunk["metadata"]["section_hierarchy"] = " > ".join(context_parts)
        
        return chunks


In [20]:
def process_legal_document(pdf_text: str, document_title: str = None, debug: bool = False) -> List[Dict[str, Any]]:
    """
    Complete pipeline: clean text and chunk for RAG system.
    
    Args:
        pdf_text: Raw PDF text
        document_title: Title of the document
        debug: Whether to print debug information
        
    Returns:
        List of chunks ready for embedding and vector storage
    """
    # First clean the text (using the function from the previous artifact)
    cleaned_text = clean_text_for_rag(pdf_text, debug=debug)
    
    # Then chunk it
    chunker = LegalDocumentChunker(
        max_chunk_size=1000,
        chunk_overlap=200,
        preserve_section_hierarchy=True
    )
    
    chunks = chunker.chunk_legal_document(cleaned_text, document_title)
    chunks = chunker.create_section_hierarchy_context(chunks)
    
    if debug:
        print(f"\n[DEBUG] Chunking complete:")
        print(f"[DEBUG] Total chunks: {len(chunks)}")
        print(f"[DEBUG] Average chunk size: {sum(c['chunk_size'] for c in chunks) / len(chunks):.0f} characters")
        print(f"[DEBUG] Chunk size range: {min(c['chunk_size'] for c in chunks)} - {max(c['chunk_size'] for c in chunks)} characters")
        
        # Show first few chunks
        for i, chunk in enumerate(chunks[:3]):
            print(f"\n[DEBUG] Chunk {i+1} metadata: {chunk['metadata']}")
            print(f"[DEBUG] Chunk {i+1} preview: {chunk['text'][:200]}...")
            
    return chunks

In [21]:
chunks = process_legal_document(cleaned_text, document_title="Sample Legal Document", debug=True)
chunks

[DEBUG] Starting improved text cleaning. Original length: 4500 characters
[DEBUG] Step 1: Removed 0 page markers
[DEBUG] Step 2: Cleaned formatting artifacts
[DEBUG] Step 3: Normalized whitespace and filtered empty lines
[DEBUG] Step 4: Starting intelligent line continuation joining
[DEBUG] Step 4 complete: Joined 1 lines
[DEBUG] Step 5: Identifying hierarchical structure
[DEBUG] Step 6: Auto-numbering with hierarchy
[DEBUG]   Main section 1: '1. Introduction and Purpose'
[DEBUG]   Main section 2: '2. Employment Policies'
[DEBUG]   Main section 2: '2. Employment Classification'
[DEBUG]   Main section 3: '3. Security Clearance Requirements'
[DEBUG]   Main section 3: '3. Workplace Policies'
[DEBUG]   Main section 2: '2. Cybersecurity Compliance'
[DEBUG]   Main section 3: '3. Remote Work Policy'
[DEBUG]   Main section 4: '4. Compensation and Benefits'
[DEBUG]   Main section 1: '1. Compensation'
[DEBUG]   Main section 2: '2. Benefits Package'
[DEBUG]   Main section 5: '5. Time Off and Leav

[{'text': 'DeepShield Systems Employee Handbook 2023',
  'metadata': {'document_title': 'Sample Legal Document',
   'is_split_chunk': False},
  'chunk_size': 41},
 {'text': '• Full-time Regular: Employees scheduled to work 40 hours per week - Part-time Regular: Employees scheduled to work less than 40 hours per week - Temporary: Employees hired for a specific project or time period - Exempt: Salaried employees exempt from overtime requirements - Non-exempt: Employees eligible for overtime compensation',
  'metadata': {'section': '2. Employment Classification',
   'document_title': 'Sample Legal Document',
   'is_split_chunk': False,
   'section_hierarchy': 'Section: 2. Employment Classification'},
  'chunk_size': 332},
 {'text': "• All employees must sign and comply with the Company's Confidentiality and Intellectual Property Agreement - Proprietary information includes but is not limited to: source code, security architectures, customer data, threat detection algorithms, and maritime 

In [22]:
chunks[1]

{'text': '• Full-time Regular: Employees scheduled to work 40 hours per week - Part-time Regular: Employees scheduled to work less than 40 hours per week - Temporary: Employees hired for a specific project or time period - Exempt: Salaried employees exempt from overtime requirements - Non-exempt: Employees eligible for overtime compensation',
 'metadata': {'section': '2. Employment Classification',
  'document_title': 'Sample Legal Document',
  'is_split_chunk': False,
  'section_hierarchy': 'Section: 2. Employment Classification'},
 'chunk_size': 332}

In [23]:
# Integration with LangChain vector stores
def create_langchain_documents(chunks: List[Dict[str, Any]]):
    """
    Convert chunks to LangChain Document objects for vector store ingestion.
    """
    from langchain.schema import Document
    
    documents = []
    for chunk in chunks:
        doc = Document(
            page_content=chunk["text"],
            metadata=chunk["metadata"]
        )
        documents.append(doc)
    
    return documents

langchian_docs = create_langchain_documents(chunks)
langchian_docs

[Document(metadata={'document_title': 'Sample Legal Document', 'is_split_chunk': False}, page_content='DeepShield Systems Employee Handbook 2023'),
 Document(metadata={'section': '2. Employment Classification', 'document_title': 'Sample Legal Document', 'is_split_chunk': False, 'section_hierarchy': 'Section: 2. Employment Classification'}, page_content='• Full-time Regular: Employees scheduled to work 40 hours per week - Part-time Regular: Employees scheduled to work less than 40 hours per week - Temporary: Employees hired for a specific project or time period - Exempt: Salaried employees exempt from overtime requirements - Non-exempt: Employees eligible for overtime compensation'),
 Document(metadata={'section': '3. Workplace Policies', 'subsection': '3.1 Confidentiality and Intellectual Property', 'document_title': 'Sample Legal Document', 'is_split_chunk': False, 'section_hierarchy': 'Section: 3. Workplace Policies > Subsection: 3.1 Confidentiality and Intellectual Property'}, page_

Same issue in LangChain chunking that treats a newline as a seperate chunk. Seems to be an issue with the scanning/cleaning parts

In [24]:
# Test chunking with the improved cleaned text
chunker = LegalDocumentChunker(
    max_chunk_size=1000,
    chunk_overlap=200,
    preserve_section_hierarchy=True
)

chunks = chunker.chunk_legal_document(cleaned_text, document_title="Employee Handbook")
chunks_with_context = chunker.create_section_hierarchy_context(chunks)

print(f"Total chunks created: {len(chunks_with_context)}")
print(f"\nFirst 3 chunks:")
print("="*60)

for i, chunk in enumerate(chunks_with_context[:3]):
    print(f"\nChunk {i+1}:")
    print(f"Metadata: {chunk['metadata']}")
    print(f"Text ({chunk['chunk_size']} chars): {chunk['text'][:150]}...")
    print("-"*60)

Total chunks created: 12

First 3 chunks:

Chunk 1:
Metadata: {'document_title': 'Employee Handbook', 'is_split_chunk': False}
Text (41 chars): DeepShield Systems Employee Handbook 2023...
------------------------------------------------------------

Chunk 2:
Metadata: {'section': '2. Employment Classification', 'document_title': 'Employee Handbook', 'is_split_chunk': False, 'section_hierarchy': 'Section: 2. Employment Classification'}
Text (332 chars): • Full-time Regular: Employees scheduled to work 40 hours per week - Part-time Regular: Employees scheduled to work less than 40 hours per week - Temp...
------------------------------------------------------------

Chunk 3:
Metadata: {'section': '3. Workplace Policies', 'subsection': '3.1 Confidentiality and Intellectual Property', 'document_title': 'Employee Handbook', 'is_split_chunk': False, 'section_hierarchy': 'Section: 3. Workplace Policies > Subsection: 3.1 Confidentiality and Intellectual Property'}
Text (371 chars): • All empl