# Fabric Document Intelligence Pipeline
This notebook orchestrates Azure AI Document Intelligence against documents stored in a Fabric lakehouse. It is designed to be executed from a Fabric Data Factory (Data Pipeline) event trigger when new files land in the document folders.

**Steps performed:**
1. Resolve pipeline parameters (Document Intelligence endpoint, lakehouse name, document types).
2. Discover new documents under `Files/documents/<type>/` within the lakehouse.
3. Invoke the Azure AI Document Intelligence REST API for each document.
4. Persist normalized JSON outputs back to `Files/raw/document-intelligence/...` and emit manifests.
5. Materialize silver Delta tables (`silver_invoice_*`, `silver_utility_bill_*`).

Configure your pipeline trigger to supply the required parameters (see repository README for details).


In [ ]:
from notebookutils import mssparkutils
import os
import json
import time
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from collections import defaultdict
import requests
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()


In [ ]:
def get_parameter(name: str, default: Optional[str] = None) -> Optional[str]:
    """Resolve a pipeline parameter, falling back to environment variables."""
    try:
        value = mssparkutils.env.getJobParameter(name)
        if value is not None and str(value).strip() and str(value).lower() != "none":
            return value
    except Exception:
        pass

    env_value = os.getenv(name)
    if env_value is None:
        env_value = os.getenv(name.upper())
    if env_value is not None and str(env_value).strip() and str(env_value).lower() != "none":
        return env_value

    return default


document_types_raw = get_parameter("documentTypes", "invoice,utility-bill")
document_types = [token.strip().lower() for token in document_types_raw.split(',') if token.strip()]

config: Dict[str, Any] = {
    "endpoint": get_parameter("documentIntelligenceEndpoint", os.getenv("DOCUMENT_INTELLIGENCE_ENDPOINT", "")),
    "api_version": get_parameter("documentIntelligenceApiVersion", "2023-07-31"),
    "lakehouse": get_parameter("documentLakehouseName", os.getenv("DOCUMENT_LAKEHOUSE_NAME", "")),
    "workspace_id": get_parameter("workspaceId", os.getenv("FABRIC_WORKSPACE_ID", "")),
    "max_documents": int(get_parameter("maxDocumentsPerType", "25")),
    "force_reprocess": get_parameter("forceReprocess", "false").lower() == "true"
}

if not config["endpoint"]:
    raise ValueError("Document Intelligence endpoint is required. Provide pipeline parameter 'documentIntelligenceEndpoint'.")
if not config["lakehouse"]:
    raise ValueError("Lakehouse name is required. Provide pipeline parameter 'documentLakehouseName'.")

try:
    spark.sql(f"USE {config['lakehouse']}")
except Exception:
    # If the lakehouse database has not been created yet, this will no-op; write operations will create it lazily.
    pass

_DOC_TYPE_DEFAULTS: Dict[str, Dict[str, Any]] = {
    "invoice": {
        "source_subfolder": "invoices",
        "parameter_name": "invoiceModelId",
        "header_table": "silver_invoice_header",
        "line_table": "silver_invoice_line"
    },
    "utility-bill": {
        "source_subfolder": "utility-bills",
        "parameter_name": "utilityModelId",
        "header_table": "silver_utility_bill_header",
        "line_table": "silver_utility_bill_line"
    }
}

doc_type_configs: Dict[str, Dict[str, Any]] = {}
for doc_type in document_types:
    defaults = _DOC_TYPE_DEFAULTS.get(doc_type, _DOC_TYPE_DEFAULTS["invoice"])
    model_id = get_parameter(defaults["parameter_name"], get_parameter("invoiceModelId", "prebuilt-invoice"))
    source_root = f"lakehouse://{config['lakehouse']}/Files/documents/{defaults['source_subfolder']}"
    output_root = f"lakehouse://{config['lakehouse']}/Files/raw/document-intelligence/{defaults['source_subfolder']}"
    processed_root = f"lakehouse://{config['lakehouse']}/Files/raw/document-intelligence/_processed/{defaults['source_subfolder']}"
    doc_type_configs[doc_type] = {
        "model_id": model_id,
        "source_root": source_root,
        "output_root": output_root,
        "processed_root": processed_root,
        "header_table": defaults["header_table"],
        "line_table": defaults["line_table"]
    }

print("Configuration loaded:")
print(json.dumps({"documentTypes": document_types, **config}, indent=2))
print("Document type mapping:")
print(json.dumps(doc_type_configs, indent=2))


In [ ]:
def ensure_directory(path: str) -> None:
    directory = path.rsplit('/', 1)[0]
    if directory:
        try:
            mssparkutils.fs.mkdirs(directory)
        except Exception:
            pass


def file_exists(path: str) -> bool:
    try:
        return bool(mssparkutils.fs.exists(path))
    except Exception:
        return False


def read_binary(path: str) -> bytes:
    with mssparkutils.fs.open(path, 'rb') as handle:
        return handle.read()


def write_json(path: str, payload: Dict[str, Any]) -> None:
    ensure_directory(path)
    mssparkutils.fs.put(path, json.dumps(payload, indent=2), True)


def list_files_recursive(root: str) -> List[str]:
    items: List[str] = []
    try:
        entries = mssparkutils.fs.ls(root)
    except Exception:
        return items
    for entry in entries:
        if getattr(entry, 'isDir', False):
            items.extend(list_files_recursive(entry.path))
        else:
            items.append(entry.path)
    return items


def guess_content_type(path: str) -> str:
    lower = path.lower()
    if lower.endswith('.pdf'):
        return 'application/pdf'
    if lower.endswith('.png'):
        return 'image/png'
    if lower.endswith(('.jpg', '.jpeg')):
        return 'image/jpeg'
    return 'application/octet-stream'


def get_cognitive_token() -> str:
    token = mssparkutils.credentials.getToken('https://cognitiveservices.azure.com/.default')
    if isinstance(token, dict):
        return token.get('token') or token.get('accessToken') or ''
    if hasattr(token, 'token'):
        return getattr(token, 'token')
    return str(token)


def analyze_document(content: bytes, content_type: str, model_id: str) -> Dict[str, Any]:
    endpoint = config['endpoint'].rstrip('/')
    url = f"{endpoint}/formrecognizer/documentModels/{model_id}:analyze?api-version={config['api_version']}&stringIndexType=unicodeCodePoint"
    auth_header = {"Authorization": f"Bearer {get_cognitive_token()}", "Content-Type": content_type}
    response = requests.post(url, headers=auth_header, data=content)
    if response.status_code == 202:
        operation_url = response.headers.get('operation-location') or response.headers.get('Operation-Location')
        if not operation_url:
            raise RuntimeError('Document Intelligence did not return an operation URL to poll.')
        while True:
            poll_response = requests.get(operation_url, headers={"Authorization": auth_header["Authorization"]})
            poll_response.raise_for_status()
            payload = poll_response.json()
            status = payload.get('status', '').lower()
            if status == 'succeeded':
                analyze_result = payload.get('analyzeResult', {})
                return {
                    "documents": analyze_result.get('documents', []),
                    "pages": analyze_result.get('pages', []),
                    "modelId": analyze_result.get('modelId') or payload.get('modelId'),
                    "modelVersion": analyze_result.get('modelVersion'),
                    "apiVersion": payload.get('apiVersion', config['api_version']),
                    "createdDateTime": payload.get('createdDateTime'),
                    "lastUpdatedDateTime": payload.get('lastUpdatedDateTime')
                }
            if status == 'failed':
                raise RuntimeError(f"Document Intelligence analysis failed: {json.dumps(payload)}")
            time.sleep(2)

    response.raise_for_status()
    payload = response.json()
    analyze_result = payload.get('analyzeResult', payload)
    return {
        "documents": analyze_result.get('documents', []),
        "pages": analyze_result.get('pages', []),
        "modelId": analyze_result.get('modelId') or payload.get('modelId'),
        "modelVersion": analyze_result.get('modelVersion'),
        "apiVersion": payload.get('apiVersion', config['api_version']),
        "createdDateTime": payload.get('createdDateTime'),
        "lastUpdatedDateTime": payload.get('lastUpdatedDateTime')
    }


def field_value(field: Optional[Dict[str, Any]]) -> Any:
    if not field:
        return None
    for key in ('valueString', 'valueDate', 'valueTime', 'valueInteger', 'valueNumber', 'valueCurrency'):
        if key in field and field[key] is not None:
            value = field[key]
            if isinstance(value, dict):
                return value.get('amount')
            return value
    if 'content' in field:
        return field['content']
    return None


def field_confidence(field: Optional[Dict[str, Any]]) -> Optional[float]:
    if not field:
        return None
    return field.get('confidence')


def normalize_analysis(doc_type: str, file_name: str, source_path: str, analysis: Dict[str, Any]) -> Tuple[Dict[str, Any], List[Dict[str, Any]], Dict[str, Any]]:
    documents = analysis.get('documents', [])
    if not documents:
        raise RuntimeError('Document Intelligence returned no documents to normalize.')

    document = documents[0]
    fields = document.get('fields', {})

    header_row = {
        'document_type': doc_type,
        'document_id': document.get('docId', str(uuid.uuid4())),
        'source_file_name': file_name,
        'source_file_path': source_path,
        'invoice_id': field_value(fields.get('InvoiceId')),
        'purchase_order': field_value(fields.get('PurchaseOrder')),
        'invoice_date': field_value(fields.get('InvoiceDate')),
        'due_date': field_value(fields.get('DueDate')),
        'total_amount': field_value(fields.get('Total')),
        'subtotal_amount': field_value(fields.get('Subtotal')),
        'tax_amount': field_value(fields.get('Tax')),
        'amount_due': field_value(fields.get('AmountDue')),
        'currency': field_value(fields.get('Currency')),
        'currency_code': field_value(fields.get('CurrencyCode')),
        'customer_name': field_value(fields.get('CustomerName')),
        'customer_id': field_value(fields.get('CustomerId')),
        'vendor_name': field_value(fields.get('VendorName')),
        'vendor_address': field_value(fields.get('VendorAddress')),
        'billing_address': field_value(fields.get('BillingAddress')),
        'service_address': field_value(fields.get('ServiceAddress')),
        'billing_recipient': field_value(fields.get('BillingAddressRecipient')),
        'service_recipient': field_value(fields.get('ServiceAddressRecipient')),
        'remittance_address': field_value(fields.get('RemittanceAddress')),
        'remittance_recipient': field_value(fields.get('RemittanceAddressRecipient')),
        'confidence': document.get('confidence'),
        'model_id': analysis.get('modelId'),
        'model_version': analysis.get('modelVersion'),
        'api_version': analysis.get('apiVersion'),
        'analyzed_at': datetime.utcnow().isoformat()
    }

    items_field = fields.get('Items') or fields.get('LineItems')
    line_rows: List[Dict[str, Any]] = []
    if items_field and 'valueArray' in items_field:
        for index, item_entry in enumerate(items_field['valueArray'], start=1):
            item_obj = item_entry.get('valueObject', {})
            line_rows.append({
                'document_type': doc_type,
                'document_id': header_row['document_id'],
                'line_number': index,
                'description': field_value(item_obj.get('Description')),
                'product_code': field_value(item_obj.get('ProductCode')),
                'quantity': field_value(item_obj.get('Quantity')),
                'unit': field_value(item_obj.get('Unit')),
                'unit_price': field_value(item_obj.get('UnitPrice')),
                'line_amount': field_value(item_obj.get('Amount')),
                'line_tax': field_value(item_obj.get('Tax')),
                'line_date': field_value(item_obj.get('Date')),
                'line_currency': field_value(item_obj.get('Currency')),
                'confidence': field_confidence(item_obj.get('Amount')) or item_obj.get('confidence')
            })

    manifest_info = {
        'modelId': analysis.get('modelId'),
        'apiVersion': analysis.get('apiVersion'),
        'documentId': header_row['document_id'],
        'confidence': document.get('confidence'),
        'lineItemCount': len(line_rows)
    }

    return header_row, line_rows, manifest_info


In [ ]:
header_tables: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
line_tables: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
summary: List[Dict[str, Any]] = []

for doc_type, type_config in doc_type_configs.items():
    files = [path for path in list_files_recursive(type_config['source_root']) if path.lower().endswith(('.pdf', '.png', '.jpg', '.jpeg'))]
    files.sort()
    processed = 0

    for file_path in files:
        file_name = file_path.split('/')[-1]
        output_path = f"{type_config['output_root']}/{file_name}.json"
        manifest_path = f"{type_config['processed_root']}/{file_name}.json"

        if not config['force_reprocess'] and file_exists(output_path):
            continue

        try:
            content_bytes = read_binary(file_path)
            analysis = analyze_document(content_bytes, guess_content_type(file_name), type_config['model_id'])
            header_row, line_rows, manifest_info = normalize_analysis(doc_type, file_name, file_path, analysis)

            normalized_payload = {
                'header': header_row,
                'lineItems': line_rows,
                'modelId': manifest_info['modelId'],
                'apiVersion': manifest_info['apiVersion'],
                'sourcePath': file_path
            }
            write_json(output_path, normalized_payload)

            manifest_payload = {
                'documentType': doc_type,
                'sourcePath': file_path,
                'outputPath': output_path,
                'processedAt': datetime.utcnow().isoformat(),
                'modelId': manifest_info['modelId'],
                'apiVersion': manifest_info['apiVersion'],
                'header': header_row,
                'lineItemCount': manifest_info['lineItemCount']
            }
            write_json(manifest_path, manifest_payload)

            header_tables[type_config['header_table']].append(header_row)
            if line_rows:
                line_tables[type_config['line_table']].extend(line_rows)

            processed += 1
            if processed >= config['max_documents']:
                break

        except Exception as err:
            print(f"[WARN] Failed to process {file_path}: {err}")

    summary.append({
        'document_type': doc_type,
        'processed': processed,
        'total_detected': len(files)
    })

for table_name, rows in header_tables.items():
    if rows:
        df = spark.createDataFrame(rows)
        df = df.select([F.col(col).alias(col) for col in df.columns])
        df.write.mode('overwrite').saveAsTable(table_name)

for table_name, rows in line_tables.items():
    if rows:
        df = spark.createDataFrame(rows)
        df = df.select([F.col(col).alias(col) for col in df.columns])
        df.write.mode('overwrite').saveAsTable(table_name)


In [ ]:
summary_df = (
    spark.createDataFrame(summary)
    if summary
    else spark.createDataFrame([], 'document_type string, processed int, total_detected int')
)

display(summary_df)
