# Parse PDFs to Delta Tables

This notebook:
1. Reads PDFs from Unity Catalog volumes
2. Parses content using `ai_parse_document()`
3. Chunks text for vector search
4. Saves to Delta tables with Change Data Feed enabled

In [None]:
%pip install databricks-sdk
dbutils.library.restartPython()

In [None]:
# Configuration - UPDATE THESE VALUES
CATALOG = "your_catalog"
SCHEMA = "rag_agents"

# Agent configurations: (volume_name, table_name)
AGENTS = [
    ("agent_a_pdfs", "agent_a_docs"),
    ("agent_b_pdfs", "agent_b_docs"),
    ("agent_c_pdfs", "agent_c_docs"),
]

# Chunk settings
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructField, StructType


def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> list[str]:
    """Split text into overlapping chunks."""
    if not text or len(text) <= chunk_size:
        return [text] if text else []

    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk)
        start = end - overlap

    return chunks


# Register UDF
chunk_udf = F.udf(chunk_text, ArrayType(StringType()))

In [None]:
def process_agent_pdfs(volume_name: str, table_name: str):
    """Process PDFs from volume and save to Delta table."""

    volume_path = f"/Volumes/{CATALOG}/{SCHEMA}/{volume_name}"
    table_full_name = f"{CATALOG}.{SCHEMA}.{table_name}"

    print(f"Processing: {volume_path} -> {table_full_name}")

    # Read PDFs using READ_FILES
    pdf_df = spark.read.format("binaryFile").load(f"{volume_path}/*.pdf")

    # Parse documents using ai_parse_document
    parsed_df = pdf_df.selectExpr(
        "path as source_path",
        "ai_parse_document(content, 'text') as parsed_content",
    )

    # Extract text and chunk
    chunked_df = (
        parsed_df.withColumn("full_text", F.col("parsed_content.text"))
        .withColumn("chunks", chunk_udf(F.col("full_text")))
        .select(
            F.col("source_path").alias("source"),
            F.posexplode("chunks").alias("chunk_id", "content"),
        )
        .withColumn("doc_id", F.md5(F.concat(F.col("source"), F.col("chunk_id"))))
    )

    # Write to Delta with Change Data Feed (required for Delta Sync index)
    chunked_df.write.format("delta").mode("overwrite").option(
        "delta.enableChangeDataFeed", "true"
    ).saveAsTable(table_full_name)

    # Verify
    count = spark.table(table_full_name).count()
    print(f"  Written {count} chunks to {table_full_name}")

    return count

In [None]:
# Process all agents
results = {}
for volume_name, table_name in AGENTS:
    try:
        count = process_agent_pdfs(volume_name, table_name)
        results[table_name] = count
    except Exception as e:
        print(f"Error processing {volume_name}: {e}")
        results[table_name] = f"Error: {e}"

print("\n=== Summary ===")
for table, result in results.items():
    print(f"{table}: {result}")

## Verify Tables

In [None]:
# Show sample from first table
first_table = f"{CATALOG}.{SCHEMA}.{AGENTS[0][1]}"
display(spark.table(first_table).limit(5))