# Optimized PDF Processing with Docling for Google Colab

This notebook processes PDFs using Docling with GPU acceleration, proper chunking (1024 tokens), and batch processing capabilities.

## Key Features:
- GPU acceleration with CUDA support
- Optimized batch processing for speed (8 PDFs per batch for Google Colab)
- 1024 token chunks (HybridChunker handles overlap internally)
- Proper logging and performance monitoring
- Automatic compression of outputs
- Compatible with Google Colab GPU runtime

## Setup Instructions for Google Colab:
1. Set Runtime to GPU: Runtime → Change runtime type → GPU
2. Upload your PDFs to a folder named `pdfs` in the root directory
3. Run all cells sequentially
4. Download the compressed results from the final cell

In [1]:
# Environment Setup and Dependencies Installation
import os
import subprocess
import sys

def install_package(package):
    """Install package with proper error handling"""
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        print(f"Successfully installed {package}")
    except subprocess.CalledProcessError as e:
        print(f"Failed to install {package}: {e}")

# Install required packages
packages = [
    "docling",
    "docling-core",
    "tiktoken",
    "openai",
    "torch",
    "torchvision",
    "torchaudio",
    "pydantic",
    "pydantic-settings"
]

print("Installing required packages...")
for package in packages:
    install_package(package)

print("All packages installed successfully!")

# Check GPU availability
import torch
if torch.cuda.is_available():
    print(f"GPU detected: {torch.cuda.get_device_name(0)}")
    print(f"CUDA version: {torch.version.cuda}")
else:
    print("No GPU detected - will use CPU")

Installing required packages...
Successfully installed docling
Successfully installed docling-core
Successfully installed tiktoken
Successfully installed openai
Successfully installed torch
Successfully installed torchvision
Successfully installed torchaudio
Successfully installed pydantic
Successfully installed pydantic-settings
All packages installed successfully!
GPU detected: Tesla T4
CUDA version: 12.4


In [8]:
# Import Required Libraries and Configure Logging
import json
import logging
import time
import zipfile
from collections.abc import Iterable
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime
import warnings

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# Docling imports
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import ConversionStatus, InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
from docling.chunking import HybridChunker
from docling_core.transforms.chunker.tokenizer.openai import OpenAITokenizer
from docling_core.types.doc import ImageRefMode
import tiktoken

# Configure comprehensive logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('pdf_processing.log')
    ]
)
logger = logging.getLogger(__name__)

# Performance tracking class
class PerformanceTracker:
    def __init__(self):
        self.start_time = None
        self.end_time = None
        self.stage_times = {}

    def start_stage(self, stage_name):
        self.stage_times[stage_name] = {'start': time.time()}

    def end_stage(self, stage_name):
        if stage_name in self.stage_times:
            self.stage_times[stage_name]['end'] = time.time()
            self.stage_times[stage_name]['duration'] = (
                self.stage_times[stage_name]['end'] -
                self.stage_times[stage_name]['start']
            )

    def get_stage_duration(self, stage_name):
        return self.stage_times.get(stage_name, {}).get('duration', 0)

    def get_report(self):
        total_time = sum(stage.get('duration', 0) for stage in self.stage_times.values())
        report = f"Performance Report:\n"
        report += f"Total Processing Time: {total_time:.2f} seconds\n\n"

        for stage, times in self.stage_times.items():
            duration = times.get('duration', 0)
            percentage = (duration / total_time * 100) if total_time > 0 else 0
            report += f"{stage}: {duration:.2f}s ({percentage:.1f}%)\n"

        return report

tracker = PerformanceTracker()
logger.info("Libraries imported and logging configured successfully")

In [9]:
# Configure GPU Acceleration and Pipeline Options
def setup_accelerator_options():
    """Configure optimal accelerator settings based on available hardware"""
    if torch.cuda.is_available():
        accelerator_options = AcceleratorOptions(
            num_threads=8,
            device=AcceleratorDevice.CUDA
        )
        logger.info("Using CUDA acceleration")
        print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
        accelerator_options = AcceleratorOptions(
            num_threads=8,
            device=AcceleratorDevice.MPS
        )
        logger.info("Using MPS acceleration (Apple Silicon)")
    else:
        accelerator_options = AcceleratorOptions(
            num_threads=8,
            device=AcceleratorDevice.CPU
        )
        logger.info("Using CPU acceleration")

    return accelerator_options

def create_optimized_pipeline_options():
    """Create optimized pipeline options for maximum performance"""
    accelerator_options = setup_accelerator_options()

    pipeline_options = PdfPipelineOptions()
    pipeline_options.accelerator_options = accelerator_options
    pipeline_options.do_ocr = True
    pipeline_options.do_table_structure = True
    pipeline_options.table_structure_options.do_cell_matching = True
    pipeline_options.generate_page_images = False  # Disabled for speed

    logger.info("Pipeline options configured for optimal performance")
    return pipeline_options

# Configuration constants
CHUNK_SIZE = 1024
BATCH_SIZE = 8  # Optimized for Google Colab GPU - higher throughput

logger.info(f"Processing configuration: chunks={CHUNK_SIZE}, batch_size={BATCH_SIZE}")

In [10]:
# Setup Directory Structure and File Management
def setup_directories():
    """Create necessary directories and scan for PDF files"""
    # Check if we're in Colab
    try:
        from google.colab import files
        is_colab = True
        base_path = Path('/content')
    except ImportError:
        is_colab = False
        base_path = Path('.')

    # Setup directories
    pdfs_dir = base_path / 'pdfs'
    output_dir = base_path / 'processed_documents'

    if not pdfs_dir.exists():
        logger.error(f"PDFs directory not found: {pdfs_dir}")
        if is_colab:
            print("Please upload your PDFs to a folder named 'pdfs' in the root directory")
            print("You can create the folder and upload files using the file browser on the left")
        return None, None, []

    # Create output directory
    output_dir.mkdir(exist_ok=True)

    # Scan for PDF files
    pdf_files = list(pdfs_dir.glob("*.pdf"))

    if not pdf_files:
        logger.error("No PDF files found in the pdfs directory")
        return pdfs_dir, output_dir, []

    logger.info(f"Found {len(pdf_files)} PDF files in {pdfs_dir}")

    # Log file details
    for pdf_file in pdf_files:
        size_mb = pdf_file.stat().st_size / (1024 * 1024)
        logger.info(f"  - {pdf_file.name}: {size_mb:.1f} MB")

    return pdfs_dir, output_dir, pdf_files

# Setup directories and scan files
pdfs_dir, output_dir, pdf_files = setup_directories()

if pdf_files:
    print(f"Setup complete. Found {len(pdf_files)} PDF files to process.")
    print(f"Output will be saved to: {output_dir}")
else:
    print("No PDF files found. Please upload PDFs to the 'pdfs' folder.")

Setup complete. Found 15 PDF files to process.
Output will be saved to: /content/processed_documents


In [11]:
# Initialize Document Converter with Optimized Settings
class OptimizedDoclingProcessor:
    """Optimized Docling processor for batch processing with performance monitoring"""

    def __init__(self, output_dir: Path):
        self.output_dir = output_dir

        # Setup pipeline options
        pipeline_options = create_optimized_pipeline_options()

        # Initialize converter
        self.converter = DocumentConverter(
            format_options={
                InputFormat.PDF: PdfFormatOption(
                    pipeline_options=pipeline_options
                )
            }
        )

        # Setup tokenizer and chunker for optimal chunk sizes
        self.tokenizer = OpenAITokenizer(
            tokenizer=tiktoken.encoding_for_model("text-embedding-3-small"),
            max_tokens=CHUNK_SIZE,
        )

        self.chunker = HybridChunker(
            tokenizer=self.tokenizer,
            merge_peers=True,
        )
        # NOTE: HybridChunker handles chunking internally without explicit overlap

        logger.info(f"OptimizedDoclingProcessor initialized with chunk_size={CHUNK_SIZE}")

    def process_batch(self, pdf_paths: List[Path]) -> Dict[str, Any]:
        """Process a batch of PDFs with detailed performance tracking"""
        batch_start = time.time()

        logger.info(f"Starting batch processing of {len(pdf_paths)} PDFs")

        # Track conversion stage
        tracker.start_stage("Document_Conversion")

        conv_results = self.converter.convert_all(
            pdf_paths,
            raises_on_error=False
        )

        tracker.end_stage("Document_Conversion")
        conversion_time = tracker.get_stage_duration("Document_Conversion")

        # Process results
        tracker.start_stage("Chunking_and_Export")

        results = {
            'success_count': 0,
            'partial_count': 0,
            'failure_count': 0,
            'total_chunks': 0,
            'processed_files': [],
            'failed_files': [],
            'conversion_time': conversion_time,
            'processing_details': []
        }

        for conv_res in conv_results:
            pdf_name = conv_res.input.file.stem
            file_start = time.time()

            try:
                if conv_res.status == ConversionStatus.SUCCESS:
                    # Chunk the document
                    chunks = list(self.chunker.chunk(conv_res.document))

                    # Export with optimized settings
                    self._export_document(conv_res, chunks)

                    file_time = time.time() - file_start
                    results['success_count'] += 1
                    results['total_chunks'] += len(chunks)
                    results['processed_files'].append(pdf_name)

                    details = {
                        'file': pdf_name,
                        'status': 'success',
                        'chunks': len(chunks),
                        'processing_time': file_time
                    }
                    results['processing_details'].append(details)

                    logger.info(f"SUCCESS: {pdf_name} - {len(chunks)} chunks ({file_time:.1f}s)")

                elif conv_res.status == ConversionStatus.PARTIAL_SUCCESS:
                    results['partial_count'] += 1
                    logger.warning(f"PARTIAL SUCCESS: {pdf_name}")
                else:
                    results['failure_count'] += 1
                    results['failed_files'].append(pdf_name)
                    logger.error(f"CONVERSION FAILED: {pdf_name}")

            except Exception as e:
                results['failure_count'] += 1
                results['failed_files'].append(pdf_name)
                logger.error(f"PROCESSING ERROR: {pdf_name} - {e}")

        tracker.end_stage("Chunking_and_Export")

        batch_time = time.time() - batch_start
        results['batch_time'] = batch_time

        return results

    def _extract_docling_metadata(self, chunk) -> Dict[str, Any]:
        """
        Extract rich metadata from Docling chunk - PROPER JSON serializable format.
        Based on the original docling_processor.py implementation.
        """
        try:
            meta = {}

            # Extract dl_meta structure properly
            if hasattr(chunk, 'meta') and chunk.meta:
                if hasattr(chunk.meta, 'doc_items') and chunk.meta.doc_items:
                    doc_items = []
                    for item in chunk.meta.doc_items:
                        doc_item = {
                            'self_ref': getattr(item, 'self_ref', ''),
                            'label': getattr(item, 'label', 'unknown'),
                        }

                        # Extract bbox if available
                        if hasattr(item, 'prov') and item.prov:
                            prov = item.prov[0] if item.prov else None
                            if prov:
                                doc_item['page_no'] = getattr(prov, 'page_no', 0)
                                # Convert BoundingBox to serializable dict
                                bbox = getattr(prov, 'bbox', None)
                                if bbox:
                                    doc_item['bbox'] = {
                                        'l': getattr(bbox, 'l', 0),
                                        't': getattr(bbox, 't', 0),
                                        'r': getattr(bbox, 'r', 0),
                                        'b': getattr(bbox, 'b', 0)
                                    }
                                else:
                                    doc_item['bbox'] = {}

                        doc_items.append(doc_item)

                    meta['doc_items'] = doc_items

                # Extract headings
                if hasattr(chunk.meta, 'headings') and chunk.meta.headings:
                    meta['headings'] = list(chunk.meta.headings)

                # Extract origin
                if hasattr(chunk.meta, 'origin'):
                    origin = chunk.meta.origin
                    meta['origin'] = {
                        'mimetype': getattr(origin, 'mimetype', ''),
                        'filename': getattr(origin, 'filename', ''),
                        'binary_hash': getattr(origin, 'binary_hash', None)
                    }

            return meta

        except Exception as e:
            logger.warning(f"Could not extract Docling metadata: {e}")
            return {'doc_items': [], 'headings': []}

    def _export_document(self, conv_res, chunks):
        """Export document in optimized formats"""
        doc_filename = conv_res.input.file.stem

        # Prepare chunk data with proper metadata
        chunk_data = {
            'metadata': {
                'source_file': conv_res.input.file.name,
                'processing_timestamp': datetime.now().isoformat(),
                'chunk_count': len(chunks),
                'chunking_method': 'docling_hybrid_optimized',
                'tokenizer': 'openai_tiktoken',
                'chunk_size': CHUNK_SIZE,
                'docling_version': 'latest'
            },
            'chunks': []
        }

        # Process chunks with enhanced metadata
        for idx, chunk in enumerate(chunks):
            # PROPER context enrichment using chunker.contextualize()
            enriched_content = self.chunker.contextualize(chunk=chunk)

            # Extract metadata properly to avoid JSON serialization errors
            docling_meta = self._extract_docling_metadata(chunk)

            chunk_info = {
                'id': f"{doc_filename}_{idx}",
                'content': chunk.text,
                'context_enriched_content': enriched_content,
                'chunk_index': idx,
                'token_count': len(self.tokenizer.tokenizer.encode(chunk.text)),
                'docling_meta': docling_meta
            }
            chunk_data['chunks'].append(chunk_info)

        # Save JSON file
        json_path = self.output_dir / f"{doc_filename}_docling_chunks.json"
        with open(json_path, 'w', encoding='utf-8') as f:
            json.dump(chunk_data, f, indent=2, ensure_ascii=False)

        # Save readable TXT file
        txt_path = self.output_dir / f"{doc_filename}_docling_chunks.txt"
        with open(txt_path, 'w', encoding='utf-8') as f:
            f.write(f"Source: {conv_res.input.file.name}\n")
            f.write(f"Processed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"Total Chunks: {len(chunks)}\n")
            f.write(f"Chunk Size: {CHUNK_SIZE} tokens\n")
            f.write("=" * 60 + "\n\n")

            for idx, chunk in enumerate(chunks):
                f.write(f"CHUNK {idx + 1}:\n")
                f.write("-" * 40 + "\n")
                f.write(chunk.text)
                f.write("\n\n" + "=" * 40 + "\n\n")

# Initialize processor if we have files to process
if pdf_files and output_dir:
    processor = OptimizedDoclingProcessor(output_dir)
    print("Document processor initialized successfully")
else:
    print("Cannot initialize processor - no PDF files found")

GPU Memory: 15.8 GB
Document processor initialized successfully


In [12]:
# Batch PDF Processing with Performance Monitoring
def process_all_pdfs_optimized():
    """Process all PDFs with comprehensive monitoring and reporting"""
    if not pdf_files:
        print("No PDF files to process")
        return None

    tracker.start_stage("Total_Processing")

    total_results = {
        'total_files': len(pdf_files),
        'total_success': 0,
        'total_partial': 0,
        'total_failure': 0,
        'total_chunks': 0,
        'batch_results': [],
        'processing_start': datetime.now().isoformat()
    }

    # Process in batches
    total_batches = (len(pdf_files) + BATCH_SIZE - 1) // BATCH_SIZE

    print(f"Processing {len(pdf_files)} PDFs in {total_batches} batches of {BATCH_SIZE}")
    print("=" * 60)

    for i in range(0, len(pdf_files), BATCH_SIZE):
        batch_num = i // BATCH_SIZE + 1
        batch = pdf_files[i:i + BATCH_SIZE]

        print(f"Batch {batch_num}/{total_batches}: Processing {len(batch)} files")

        batch_results = processor.process_batch(batch)
        total_results['batch_results'].append(batch_results)

        # Update totals
        total_results['total_success'] += batch_results['success_count']
        total_results['total_partial'] += batch_results['partial_count']
        total_results['total_failure'] += batch_results['failure_count']
        total_results['total_chunks'] += batch_results['total_chunks']

        # Progress update
        print(f"Batch {batch_num} completed:")
        print(f"  Success: {batch_results['success_count']}")
        print(f"  Partial: {batch_results['partial_count']}")
        print(f"  Failed: {batch_results['failure_count']}")
        print(f"  Chunks: {batch_results['total_chunks']}")
        print(f"  Time: {batch_results['batch_time']:.1f}s")
        print("-" * 40)

    tracker.end_stage("Total_Processing")

    total_results['processing_end'] = datetime.now().isoformat()
    total_results['total_time'] = tracker.get_stage_duration("Total_Processing")

    return total_results

# Run the processing
if pdf_files:
    print("Starting optimized PDF processing...")
    results = process_all_pdfs_optimized()

    if results:
        print("\nPROCESSING COMPLETE!")
        print("=" * 60)
        print(f"Total Files: {results['total_files']}")
        print(f"Successful: {results['total_success']}")
        print(f"Partial Success: {results['total_partial']}")
        print(f"Failed: {results['total_failure']}")
        print(f"Total Chunks Generated: {results['total_chunks']}")
        print(f"Total Time: {results['total_time']:.1f} seconds")
        print(f"Average per file: {results['total_time']/results['total_files']:.1f} seconds")
        print("=" * 60)
else:
    print("No files to process")
    results = None

Starting optimized PDF processing...
Processing 15 PDFs in 2 batches of 8
Batch 1/2: Processing 8 files
Batch 1 completed:
  Success: 8
  Partial: 0
  Failed: 0
  Chunks: 1804
  Time: 1376.7s
----------------------------------------
Batch 2/2: Processing 7 files
Batch 2 completed:
  Success: 7
  Partial: 0
  Failed: 0
  Chunks: 1789
  Time: 1618.9s
----------------------------------------

PROCESSING COMPLETE!
Total Files: 15
Successful: 15
Partial Success: 0
Failed: 0
Total Chunks Generated: 3593
Total Time: 2996.0 seconds
Average per file: 199.7 seconds


In [13]:
# Generate Performance Reports and Compression
def create_performance_report(results):
    """Generate detailed performance report"""
    if not results:
        return "No results to report"

    report = []
    report.append("DETAILED PERFORMANCE REPORT")
    report.append("=" * 50)
    report.append(f"Processing Date: {results['processing_start']}")
    report.append(f"Total Processing Time: {results['total_time']:.2f} seconds")
    report.append(f"Average Time per PDF: {results['total_time']/results['total_files']:.2f} seconds")
    report.append("")

    report.append("SUMMARY:")
    report.append(f"  Total Files: {results['total_files']}")
    report.append(f"  Successful: {results['total_success']}")
    report.append(f"  Partial Success: {results['total_partial']}")
    report.append(f"  Failed: {results['total_failure']}")
    report.append(f"  Success Rate: {(results['total_success']/results['total_files']*100):.1f}%")
    report.append(f"  Total Chunks: {results['total_chunks']}")
    report.append(f"  Average Chunks per PDF: {results['total_chunks']/max(results['total_success'], 1):.1f}")
    report.append("")

    report.append("PERFORMANCE BREAKDOWN:")
    report.append(tracker.get_report())
    report.append("")

    report.append("BATCH DETAILS:")
    for i, batch in enumerate(results['batch_results'], 1):
        report.append(f"  Batch {i}:")
        report.append(f"    Conversion Time: {batch['conversion_time']:.2f}s")
        report.append(f"    Total Batch Time: {batch['batch_time']:.2f}s")
        report.append(f"    Success: {batch['success_count']}")
        report.append(f"    Failed: {batch['failure_count']}")
        report.append("")

    # Identify potential bottlenecks
    conversion_time = sum(b['conversion_time'] for b in results['batch_results'])
    processing_time = results['total_time'] - conversion_time

    report.append("BOTTLENECK ANALYSIS:")
    report.append(f"  Document Conversion: {conversion_time:.2f}s ({conversion_time/results['total_time']*100:.1f}%)")
    report.append(f"  Chunking & Export: {processing_time:.2f}s ({processing_time/results['total_time']*100:.1f}%)")

    if conversion_time > processing_time * 2:
        report.append("  RECOMMENDATION: Document conversion is the bottleneck. Consider:")
        report.append("    - Reducing OCR quality settings")
        report.append("    - Disabling table structure detection for simple documents")
        report.append("    - Using smaller batch sizes")
    elif processing_time > conversion_time * 2:
        report.append("  RECOMMENDATION: Chunking/Export is the bottleneck. Consider:")
        report.append("    - Increasing chunk sizes")
        report.append("    - Optimizing JSON serialization")
        report.append("    - Using faster storage")
    else:
        report.append("  ANALYSIS: Processing is well-balanced")

    return "\n".join(report)

def compress_outputs():
    """Compress all output files for easy download"""
    if not output_dir or not output_dir.exists():
        return None

    # Create ZIP file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    zip_path = output_dir.parent / f"processed_documents_{timestamp}.zip"

    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Add all processed files
        for file_path in output_dir.rglob('*'):
            if file_path.is_file():
                zipf.write(file_path, file_path.relative_to(output_dir))

        # Add performance report
        if results:
            report_content = create_performance_report(results)
            zipf.writestr("performance_report.txt", report_content)

        # Add processing log
        if Path('pdf_processing.log').exists():
            zipf.write('pdf_processing.log', 'pdf_processing.log')

    return zip_path

# Generate reports and compress outputs
if results:
    print("Generating performance report...")
    report = create_performance_report(results)
    print("\n" + report)

    print("\nCompressing outputs...")
    zip_path = compress_outputs()

    if zip_path:
        zip_size_mb = zip_path.stat().st_size / (1024 * 1024)
        print(f"Outputs compressed to: {zip_path}")
        print(f"Archive size: {zip_size_mb:.1f} MB")

        # For Google Colab users
        try:
            from google.colab import files
            print("\nDownloading compressed archive...")
            files.download(str(zip_path))
            print("Download started! Check your browser's downloads folder.")
        except ImportError:
            print(f"Archive ready for download: {zip_path}")

    # Save performance report separately
    if output_dir:
        report_path = output_dir / "performance_report.txt"
        with open(report_path, 'w') as f:
            f.write(report)
        print(f"Performance report saved to: {report_path}")

print("\nProcessing pipeline completed successfully!")

Generating performance report...

DETAILED PERFORMANCE REPORT
Processing Date: 2025-08-18T19:58:15.323078
Total Processing Time: 2995.96 seconds
Average Time per PDF: 199.73 seconds

SUMMARY:
  Total Files: 15
  Successful: 15
  Partial Success: 0
  Failed: 0
  Success Rate: 100.0%
  Total Chunks: 3593
  Average Chunks per PDF: 239.5

PERFORMANCE BREAKDOWN:
Performance Report:
Total Processing Time: 4614.88 seconds

Total_Processing: 2995.96s (64.9%)
Document_Conversion: 0.00s (0.0%)
Chunking_and_Export: 1618.93s (35.1%)


BATCH DETAILS:
  Batch 1:
    Conversion Time: 0.00s
    Total Batch Time: 1376.75s
    Success: 8
    Failed: 0

  Batch 2:
    Conversion Time: 0.00s
    Total Batch Time: 1618.93s
    Success: 7
    Failed: 0

BOTTLENECK ANALYSIS:
  Document Conversion: 0.00s (0.0%)
  Chunking & Export: 2995.96s (100.0%)
  RECOMMENDATION: Chunking/Export is the bottleneck. Consider:
    - Increasing chunk sizes
    - Optimizing JSON serialization
    - Using faster storage

Compre

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Download started! Check your browser's downloads folder.
Performance report saved to: /content/processed_documents/performance_report.txt

Processing pipeline completed successfully!
