# Document Ingestion Pipeline

This notebook implements a complete document ingestion and chunking pipeline for PDF documents. The pipeline:

1. **Loads PDF files** from a Unity Catalog Volume
2. **Parses documents** using Databricks AI functions to extract structured content
3. **Chunks content** into logical paragraphs with context (headers)
4. **Prepares data** for downstream RAG (Retrieval-Augmented Generation) applications

The final output is stored in the `docs_chunked` table with Change Data Feed enabled for incremental processing.

## Step 1: Configure Source Volume

Define the Unity Catalog Volume path where PDF documents are stored. This volume serves as the source for all documents to be ingested.

In [0]:

volume_path = "/Volumes/mkr_gcp_sandbox_euw3/default/source_vol/docs/"


## Step 2: Parse PDF Documents

Use the `ai_parse_document()` function to extract structured content from PDF files:

* **Reads binary PDF files** from the volume using `binaryFile` format
* **Extracts structured elements** including pages, text, tables, headers, and metadata
* **Stores parsed results** in the `docs_parsed` table for inspection

The parsed output includes:
* `pages`: Page-level information
* `elements`: Structured document elements (headers, paragraphs, tables)
* `error_status`: Parsing error information (if any)
* `metadata`: Document metadata

In [0]:
from pyspark.sql.functions import *

df = (
  spark.read.format("binaryFile")
    .load(volume_path+"*.pdf")
    .withColumn("parsed", ai_parse_document(col("content")))
    .select(
      "path",
      expr("parsed:document:pages"),
      expr("parsed:document:elements"),
      expr("parsed:error_status"),
      expr("parsed:metadata")
    )
)

df.write.saveAsTable("docs_parsed")
display(df)


## Step 3: Create Contextual Chunks

Implement a custom chunking strategy that preserves document structure and context:

### Chunking Logic
The `extract_paragraph_texts` UDF processes document elements and creates chunks that include:
* **Page headers**: Top-level context for each page
* **Section headers**: Hierarchical section information
* **Content**: Text and table content grouped into logical paragraphs

### Strategy
* Chunks are created when encountering new headers or at document boundaries
* Each chunk includes the relevant page header and section header for context
* Only `text` and `table` elements are included in the content
* This approach ensures each chunk is self-contained with sufficient context for retrieval

In [0]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf

@udf(ArrayType(StringType()))
def extract_paragraph_texts(elements):
    allowed_types = {'text','table'}
    results = []
    current_page_header = ""
    current_section_header = ""
    current_paragraph = ""

    if not elements:
        return []
    
    for e in elements:
        e = e.toPython()
        e_type = e.get('type')
        match e_type:
            case 'page_header' | 'section_header':
                if current_paragraph != "":
                    chunk = current_page_header + "\n\n" + current_section_header + "\n\n" + current_paragraph
                    results.append(chunk)
                if e_type == 'page_header':
                    current_page_header = e.get('content')
                    current_section_header = ""
                    current_paragraph = ""
                elif e_type == 'section_header':
                    current_section_header += "\n\n" + e.get('content')
                    current_paragraph = ""
            case 'table' | 'text':
                current_paragraph += "\n\n" + e.get('content')
            case _:
                pass
    if current_paragraph != "":
        chunk = current_page_header + "\n\n" + current_section_header + "\n\n" + current_paragraph
        results.append(chunk)
    return results

df_with_paragraphs = df.withColumn("paragraph_texts", extract_paragraph_texts(expr("cast(elements as array<VARIANT>)")))
display(df_with_paragraphs.select("path", "paragraph_texts"))

## Step 4: Create Final Chunked Table

Explode the array of chunks into individual rows and create the final `docs_chunked` table:

### Table Schema
* `id`: Unique identifier for each chunk (monotonically increasing)
* `path`: Source document path in the volume
* `paragraph_text`: The chunk content with headers and context

This table structure is optimized for:
* Vector embedding generation
* Semantic search and retrieval
* RAG (Retrieval-Augmented Generation) applications

In [0]:
from pyspark.sql.functions import explode, monotonically_increasing_id

exploded_df = (
    df_with_paragraphs
    .withColumn("paragraph_text", explode("paragraph_texts"))
    .withColumn("id", monotonically_increasing_id())
    .select("id", "path", "paragraph_text")
)

exploded_df.write.saveAsTable("docs_chunked")

display(exploded_df)

## Step 5: Enable Change Data Feed

Enable Change Data Feed (CDF) on the `docs_chunked` table to support:

* **Incremental processing**: Track inserts, updates, and deletes
* **Downstream pipelines**: Enable efficient incremental updates to vector indexes
* **Audit trail**: Maintain history of changes to the chunked documents

With CDF enabled, downstream systems can process only the changed data rather than reprocessing the entire table.

In [0]:
%sql
ALTER TABLE docs_chunked
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true)