# Intelligent Document Processing End-to-End Demo

This notebook demonstrates the Databricks-based intelligent document processing (IDP) pipeline using Azure Document Intelligence abstractions. Instead of calling the Azure service directly, we rely on a lightweight LLM-backed proxy that emits the same schema as the Azure SDK so we can exercise routing, parsing, summarisation, enrichment, and asynchronous batching locally.

## What you will see
- Automatic routing based on document layout with optional overrides.
- Parsing via an Azure Document Intelligence-compatible proxy that extracts text, tables, and fields.
- Canonical normalisation with deterministic summaries and optional enrichment.
- An asynchronous batching loop that mimics the SNS/SQS-driven orchestration.

## 1. Environment setup

In [None]:
import importlib
import subprocess
import sys

packages = [
    ('fitz', 'pymupdf'),
    ('openpyxl', 'openpyxl'),
    ('fpdf', 'fpdf2'),
]
for module_name, package_name in packages:
    try:
        importlib.import_module(module_name)
    except Exception:
        try:
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--quiet', package_name])
        except Exception as exc:  # noqa: BLE001
            print(f'Package {package_name} is unavailable ({exc}); continuing with fallback paths.')


## 2. Imports and logging

In [None]:
import asyncio
import base64
import csv
import json
import logging
import mimetypes
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional
import re
import textwrap

from urllib import request as urllib_request

from idp_service.document_intelligence_storage import InMemoryDocumentResultStore
from idp_service.document_intelligence_workflow import DocumentIntelligenceWorkflow, WorkflowConfig
from idp_service.enrichment import EnrichmentProvider, EnrichmentRequest, EnrichmentResponse
from idp_service.llm_document_intelligence_proxy import LLMAzureDocumentIntelligenceClient
from idp_service.routing.router import (
    DocumentRouter,
    HeuristicLayoutAnalyser,
    OverrideSet,
    PatternOverride,
    PyMuPDFLayoutAnalyser,
    RouterConfig,
    StrategyConfig,
    DocumentAnalysis,
)
from parsers.adapters import AzureDocumentIntelligenceAdapter

logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')


## 3. Download or synthesise sample documents

In [None]:
import csv
import mimetypes
import textwrap
from pathlib import Path
from urllib import request as urllib_request

from idp_service.sample_documents_embedded import write_embedded

DATA_DIR = Path('docs/sample_documents')
DATA_DIR.mkdir(parents=True, exist_ok=True)


def download_file(url: str, target: Path) -> str:
    try:
        with urllib_request.urlopen(url) as response:
            target.write_bytes(response.read())
        return 'downloaded'
    except Exception as exc:  # noqa: BLE001
        return f'fallback ({exc.__class__.__name__}: {exc})'


def create_pdf(text: str, target: Path) -> bool:
    try:
        from fpdf import FPDF
    except Exception:  # pragma: no cover - optional dependency
        return False

    pdf = FPDF()
    pdf.add_page()
    pdf.set_font('Helvetica', size=12)
    for paragraph in textwrap.wrap(text, width=90):
        pdf.multi_cell(0, 8, paragraph)
    pdf.output(str(target))
    return True


def create_excel(target: Path) -> bool:
    try:
        from openpyxl import Workbook
    except Exception:  # pragma: no cover - optional dependency
        return False

    wb = Workbook()
    ws = wb.active
    ws.title = 'Budget'
    ws.append(['Category', 'Amount', 'Notes'])
    ws.append(['Cloud spend', 25000, 'Azure + Databricks'])
    ws.append(['Staff training', 8000, 'LLM, security, governance'])
    ws.append(['Contingency', 5000, 'Reserved for overruns'])
    wb.save(target)
    return True


def ensure_pdf(target: Path) -> str:
    if target.exists():
        return 'existing'
    status = download_file('https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf', target)
    if status == 'downloaded':
        return status
    if write_embedded('financial_report.pdf', target):
        return 'embedded'
    sample_text = (
        'Contoso Retail consolidated financial summary. Revenue increased 18% year-on-year with strong growth in the '
        'North region. Key initiatives include expanded e-commerce investments and cost optimisation in operations.'
    )
    if create_pdf(sample_text, target):
        return 'generated'
    raise RuntimeError('Unable to provision sample PDF; please provide one manually.')


def ensure_csv(target: Path) -> str:
    if target.exists():
        return 'existing'
    status = download_file('https://raw.githubusercontent.com/plotly/datasets/master/2014_usa_states.csv', target)
    if status == 'downloaded':
        return status
    rows = [
        ['Region', 'Product', 'Units', 'Revenue'],
        ['North', 'Notebook', '120', '54000'],
        ['West', 'Tablet', '75', '33750'],
        ['South', 'Monitor', '55', '17600'],
    ]
    with target.open('w', newline='') as handle:
        writer = csv.writer(handle)
        writer.writerows(rows)
    return 'generated'


def ensure_excel(target: Path) -> str:
    if target.exists():
        return 'existing'
    status = download_file('https://github.com/plotly/datasets/raw/master/2011_us_ag_exports.xlsx', target)
    if status == 'downloaded':
        return status
    if write_embedded('operating_budget.xlsx', target):
        return 'embedded'
    if create_excel(target):
        return 'generated'
    raise RuntimeError('Unable to provision sample XLSX; please provide one manually.')


pdf_path = DATA_DIR / 'financial_report.pdf'
csv_path = DATA_DIR / 'quarterly_sales.csv'
xlsx_path = DATA_DIR / 'operating_budget.xlsx'
email_path = DATA_DIR / 'customer_email.eml'

pdf_status = ensure_pdf(pdf_path)
csv_status = ensure_csv(csv_path)
xlsx_status = ensure_excel(xlsx_path)

if not email_path.exists():
    from email.message import EmailMessage

    message = EmailMessage()
    message['Subject'] = 'Quarterly performance review'
    message['From'] = 'finance.lead@example.com'
    message['To'] = 'cfo@example.com'
    body = (
        'Hi, attached are the revenue extracts and the updated operating budget. '
        'Please review before the leadership sync. The north region still leads growth.'
    )
    message.set_content(body)
    for attachment_path in (csv_path, xlsx_path):
        data = attachment_path.read_bytes()
        mime_type, _ = mimetypes.guess_type(attachment_path.name)
        maintype, subtype = (mime_type or 'application/octet-stream').split('/', 1)
        message.add_attachment(data, maintype=maintype, subtype=subtype, filename=attachment_path.name)
    email_path.write_bytes(message.as_bytes())
    email_status = 'generated'
else:
    email_status = 'existing'

print('PDF source:', pdf_status)
print('CSV source:', csv_status)
print('Excel source:', xlsx_status)
print('Email status:', email_status)

sample_assets = {
    'pdf': pdf_path,
    'csv': csv_path,
    'xlsx': xlsx_path,
    'email': email_path,
}
sample_assets



## 4. Configure the router

In [None]:
router_config = RouterConfig(
    default_strategy_map={
        'short_form': {'name': 'prebuilt-layout'},
        'long_form': {'name': 'custom-long-form'},
        'table_heavy': {'name': 'prebuilt-table'},
        'form_heavy': {'name': 'prebuilt-form'},
        'unknown': {'name': 'prebuilt-layout'},
    },
    fallback_strategy={'name': 'prebuilt-layout'},
)

try:
    layout_analyser = PyMuPDFLayoutAnalyser()
except Exception:  # noqa: BLE001
    layout_analyser = HeuristicLayoutAnalyser()

override_set = OverrideSet(
    pattern_overrides=[
        PatternOverride(pattern=re.compile(r'.*\.csv$'), strategy=StrategyConfig(name='prebuilt-table')),
        PatternOverride(pattern=re.compile(r'.*\.xlsx$'), strategy=StrategyConfig(name='prebuilt-table')),
    ]
)

router = DocumentRouter(router_config, layout_analyser)
router_config, override_set


## 5. Instantiate the parsing workflow and enrichment provider

In [None]:
proxy_client = LLMAzureDocumentIntelligenceClient()
adapter = AzureDocumentIntelligenceAdapter()
result_store = InMemoryDocumentResultStore()

class KeywordEnrichmentProvider:
    name = 'keyword_insights'
    max_batch_size = 8
    timeout_seconds: Optional[float] = 5.0

    def enrich(self, requests: List[EnrichmentRequest]) -> List[EnrichmentResponse]:
        responses: List[EnrichmentResponse] = []
        for request in requests:
            text = ' '.join(span.content for span in request.document.text_spans)
            keywords = sorted({word.lower() for word in re.findall(r'[A-Za-z]{6,}', text)})[:6]
            enrichments = [
                {
                    'enrichment_type': 'keyword_summary',
                    'content': {
                        'keywords': keywords,
                        'token_count': len(text.split()),
                    },
                }
            ]
            responses.append(
                EnrichmentResponse(
                    document_id=request.document_id,
                    enrichments=enrichments,
                    metadata={'provider': 'keyword'},
                )
            )
        return responses

keyword_provider = KeywordEnrichmentProvider()
keyword_provider.name, keyword_provider.max_batch_size


## 6. Describe document jobs

In [None]:
@dataclass
class DocumentJob:
    document_id: str
    label: str
    path: Path
    metadata: Dict[str, str]
    parser_override: Optional[str] = None
    enrich_with: Optional[List[str]] = None
    force: bool = False

jobs: List[DocumentJob] = [
    DocumentJob(
        document_id='financial-report',
        label='Financial report PDF',
        path=sample_assets['pdf'],
        metadata={
            'mime_type': 'application/pdf',
            'document_type': 'financial_report',
            'object_key': sample_assets['pdf'].name,
        },
        enrich_with=['keyword_insights'],
    ),
    DocumentJob(
        document_id='quarterly-sales',
        label='Sales CSV baseline',
        path=sample_assets['csv'],
        metadata={
            'mime_type': 'text/csv',
            'document_type': 'sales_data',
            'object_key': sample_assets['csv'].name,
        },
        enrich_with=['keyword_insights'],
    ),
    DocumentJob(
        document_id='quarterly-sales',
        label='Sales CSV duplicate without force',
        path=sample_assets['csv'],
        metadata={
            'mime_type': 'text/csv',
            'document_type': 'sales_data',
            'object_key': sample_assets['csv'].name,
        },
        enrich_with=['keyword_insights'],
    ),
    DocumentJob(
        document_id='quarterly-sales',
        label='Sales CSV override with force',
        path=sample_assets['csv'],
        metadata={
            'mime_type': 'text/csv',
            'document_type': 'sales_data',
            'object_key': sample_assets['csv'].name,
        },
        parser_override='prebuilt-layout',
        enrich_with=['keyword_insights'],
        force=True,
    ),
    DocumentJob(
        document_id='operating-budget',
        label='Operating budget spreadsheet',
        path=sample_assets['xlsx'],
        metadata={
            'mime_type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
            'document_type': 'budget',
            'object_key': sample_assets['xlsx'].name,
        },
        enrich_with=['keyword_insights'],
    ),
    DocumentJob(
        document_id='customer-email',
        label='Customer email with attachments',
        path=sample_assets['email'],
        metadata={
            'mime_type': 'message/rfc822',
            'document_type': 'email',
            'object_key': sample_assets['email'].name,
        },
        enrich_with=['keyword_insights'],
    ),
]
len(jobs)


## 7. Helper functions for routing and processing

In [None]:
def build_router_body(job: DocumentJob, document_bytes: bytes) -> Dict[str, object]:
    payload = {
        'documentBytes': base64.b64encode(document_bytes).decode('ascii'),
        'documentMetadata': {
            'mimeType': job.metadata.get('mime_type'),
            'documentType': job.metadata.get('document_type'),
        },
    }
    if job.parser_override:
        payload['parser_override'] = job.parser_override
    return payload

def route_document(job: DocumentJob) -> tuple[bytes, DocumentAnalysis]:
    document_bytes = job.path.read_bytes()
    body = build_router_body(job, document_bytes)
    analysis = router.route(body, job.metadata['object_key'], override_set)
    return document_bytes, analysis

def run_workflow(job: DocumentJob, document_bytes: bytes, analysis: DocumentAnalysis):
    metadata = {
        'document_type': job.metadata.get('document_type'),
        'mime_type': job.metadata.get('mime_type'),
        'routing': analysis.to_metadata_record({'document_id': job.document_id}),
    }
    workflow = DocumentIntelligenceWorkflow(
        client=proxy_client,
        store=result_store,
        config=WorkflowConfig(
            model_id=analysis.strategy.name,
            adapter=adapter,
            enrichment_providers=(keyword_provider,),
        ),
    )
    return workflow.process(
        document_id=job.document_id,
        document_bytes=document_bytes,
        source_uri=str(job.path),
        metadata=metadata,
        content_type=job.metadata.get('mime_type'),
        force=job.force,
        enrich_with=job.enrich_with,
    )

async def process_jobs_async(job_list: List[DocumentJob], worker_count: int = 3):
    queue: asyncio.Queue[Optional[DocumentJob]] = asyncio.Queue()
    for job in job_list:
        await queue.put(job)
    for _ in range(worker_count):
        await queue.put(None)

    events: List[Dict[str, object]] = []
    final_documents: Dict[str, object] = {}

    async def worker(name: str) -> None:
        while True:
            job = await queue.get()
            if job is None:
                queue.task_done()
                break
            document_bytes, analysis = await asyncio.to_thread(route_document, job)
            workflow_result = await asyncio.to_thread(run_workflow, job, document_bytes, analysis)
            events.append(
                {
                    'worker': name,
                    'job': job,
                    'analysis': analysis,
                    'workflow_result': workflow_result,
                }
            )
            if not workflow_result.skipped and workflow_result.document is not None:
                final_documents[job.document_id] = workflow_result.document
            queue.task_done()

    workers = [asyncio.create_task(worker(f'worker-{idx+1}')) for idx in range(worker_count)]
    await queue.join()
    for worker_task in workers:
        await worker_task
    return events, final_documents


## 8. Run the asynchronous pipeline

In [None]:
events, final_documents = asyncio.run(process_jobs_async(jobs, worker_count=3))
len(events), list(final_documents.keys())


## 9. Inspect routing decisions and canonical outputs

In [None]:
from typing import Any

def _normalise_adi_result(raw: Any) -> Any:
    if raw is None:
        return None
    if isinstance(raw, dict):
        return raw
    for attr in ("to_dict", "as_dict"):
        method = getattr(raw, attr, None)
        if callable(method):
            try:
                return method()
            except Exception:
                pass
    if hasattr(raw, "__dict__"):
        try:
            return {k: v for k, v in raw.__dict__.items() if not k.startswith("_")}
        except Exception:
            pass
    return {"repr": repr(raw)}

def describe_event(event: Dict[str, object]) -> Dict[str, object]:
    job: DocumentJob = event['job']  # type: ignore[assignment]
    analysis: DocumentAnalysis = event['analysis']  # type: ignore[assignment]
    workflow_result = event['workflow_result']
    routing_record = analysis.to_metadata_record({'document_id': job.document_id})

    document_summary = None
    enrichment_summary = None
    canonical_payload = None
    denorm_records = []
    attachments = None

    if workflow_result.document is not None:
        doc = workflow_result.document
        canonical_payload = doc.to_dict()
        document_summary = {
            'title': doc.summaries[0].title if doc.summaries else None,
            'summary': doc.summaries[0].summary if doc.summaries else None,
            'text_span_count': len(doc.text_spans),
            'table_count': len(doc.tables),
        }
        enrichment_summary = [enrichment.to_dict() for enrichment in doc.enrichments]
        if doc.attachments:
            attachments = [
                {
                    'attachment_id': attachment.attachment_id,
                    'file_name': attachment.file_name,
                    'mime_type': attachment.mime_type,
                    'document_id': attachment.document.document_id if attachment.document else None,
                }
                for attachment in doc.attachments
            ]
        denorm_records = [record.to_dict() for record in workflow_result.records]

    adi_result = _normalise_adi_result(getattr(workflow_result, 'raw_result', None))

    return {
        'label': job.label,
        'document_id': job.document_id,
        'strategy': analysis.strategy.name,
        'strategy_reason': analysis.strategy.reason,
        'category': analysis.category.value,
        'overrides_applied': analysis.overrides_applied,
        'skipped': workflow_result.skipped,
        'summary': document_summary,
        'enrichments': enrichment_summary,
        'routing_metadata': routing_record,
        'adi_result': adi_result,
        'canonical_document': canonical_payload,
        'denormalized_records': denorm_records,
        'attachments': attachments,
    }

event_summaries = [describe_event(event) for event in events]
event_summaries


## 10. Normalised payloads ready for downstream consumption

In [None]:
canonical_payloads = {doc_id: doc.to_dict() for doc_id, doc in final_documents.items()}
json.dumps(canonical_payloads, indent=2)[:2000] + '...'


The JSON snippet above shows the canonical schema with text spans, tables, fields, summaries, and enrichment outputs. Downstream services can rely on this standard contract regardless of which parser handled the raw document.