# Chunking Experiments

This notebook contains code for testing best chunking strategies, libraries and snippets.

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

pdf_path = "../../data/cao-pdfs/Cao Bouw en Infra 2025 - 2027.pdf"

TOKENIZER_ENCODING = "cl100k_base"  # For OpenAI models
TOKENIZER_MAX_TOKENS = 8192  # Adjust based on your chosen model

MAX_TOKENS = 8192  # Adjust based on your chosen model
VECTOR_DIM = 1536  # Adjust based on your chosen embeddings model

AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")
AZURE_SEARCH_API_KEY = os.getenv("AZURE_SEARCH_API_KEY")  # Ensure this is your Admin Key
AZURE_SEARCH_INDEX_NAME = os.getenv("AZURE_SEARCH_INDEX_NAME", "cao-rag-sample")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION", "2024-10-21")
AZURE_OPENAI_CHAT_MODEL_NAME = os.getenv(
    "AZURE_OPENAI_CHAT_MODEL_NAME"
)
AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME = os.getenv(
    "AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME", "text-embedding-3-large"
)  # Using a deployed model named "text-embeddings-3-large
AZURE_OPENAI_EMBEDDING_MODEL_NAME = os.getenv(
    "AZURE_OPENAI_EMBEDDING_MODEL_NAME", "text-embedding-3-large"
)  # Using a deployed model named "text-embeddings-3-large

In [None]:
import torch

print(f"PyTorch version: {torch.__version__}")

if torch.cuda.is_available():
    print(f"CUDA is available. Using GPU: {torch.cuda.get_device_name(0)}: {torch.cuda.device_count()} GPU(s)")
else:
    print("CUDA is not available. Using CPU.")
    print(f"{torch.cpu.device_count()} CPU core(s) available")


In [None]:
from pathlib import Path
from docling.document_converter import DocumentConverter

converter = DocumentConverter() 

In [None]:
from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import (
    PdfPipelineOptions,
)
from docling.datamodel.settings import settings
from docling.document_converter import DocumentConverter, PdfFormatOption

# Explicitly set the accelerator options
accelerator_options = AcceleratorOptions(
    num_threads=8, device=AcceleratorDevice.CUDA
)

pipeline_options = PdfPipelineOptions()
pipeline_options.accelerator_options = accelerator_options
pipeline_options.do_ocr = False
pipeline_options.do_table_structure = True
pipeline_options.table_structure_options.do_cell_matching = True

converter = DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(
            pipeline_options=pipeline_options,
        )
    }
)

In [None]:
# Convert the document
conversion_result = converter.convert(pdf_path)

In [None]:
output_dir = Path("outputs/02-chunking-experiments")
output_dir.mkdir(parents=True, exist_ok=True)

doc_filename = conversion_result.input.file.stem

In [None]:
print(f"Document has {len(conversion_result.document.pages)} pages and {len(conversion_result.document.tables)} tables.")
# print(f"Document text content:\n{conversion_result.document.export_to_markdown()}...")

In [None]:
# Export tables
'''
import pandas as pd

for table_ix, table in enumerate(conversion_result.document.tables):
    table_df: pd.DataFrame = table.export_to_dataframe(doc=conversion_result.document)
    print(f"## Table {table_ix}")
    print(table_df.to_markdown())

    # Save the table as CSV
    element_csv_filename = output_dir / f"{doc_filename}-table-{table_ix + 1}.csv"
    table_df.to_csv(element_csv_filename)
'''

## Chunking

We convert the Document into smaller chunks for embedding and indexing. The built-in HierarchicalChunker preserves structure.


In [None]:
from docling.chunking import HierarchicalChunker
import tiktoken
from docling_core.transforms.chunker.tokenizer.openai import OpenAITokenizer

# Initialize tiktoken encoding for OpenAI embedding models
encoding = tiktoken.get_encoding(TOKENIZER_ENCODING)

# Create Docling's OpenAITokenizer wrapper
tokenizer = OpenAITokenizer(tokenizer=encoding, max_tokens=TOKENIZER_MAX_TOKENS)

# Instantiate HierarchicalChunker with tokenizer
chunker = HierarchicalChunker(tokenizer=tokenizer, merge_peers=True)

In [None]:
from xxhash import xxh64
doc_chunks = list(chunker.chunk(conversion_result.document))

all_chunks = []
for idx, c in enumerate(doc_chunks):
    # Enrich chunks (example: add custom metadata or transform)
    chunk_text = chunker.contextualize(c)

    byte_data = chunk_text.encode('utf-8')
    chunk_index = xxh64(byte_data).hexdigest()

    all_chunks.append((chunk_index, chunk_text))

print(f"Total chunks from PDF: {len(all_chunks)}")

### Part 3: Create Azure AI Search Index and Push Chunk Embeddings
We’ll define a vector index in Azure AI Search, then embed each chunk using Azure OpenAI and upload in batches.

In [None]:
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    AzureOpenAIVectorizer,
    AzureOpenAIVectorizerParameters,
    HnswAlgorithmConfiguration,
    SearchableField,
    SearchField,
    SearchFieldDataType,
    SearchIndex,
    SimpleField,
    VectorSearch,
    VectorSearchProfile,
)

VECTOR_FIELD_NAME = "content_vector"
CONTENT_FIELD_NAME = "content"

index_client = SearchIndexClient(
    AZURE_SEARCH_ENDPOINT, AzureKeyCredential(AZURE_SEARCH_API_KEY)
)

def create_search_index(index_name: str):
    # Define fields
    fields = [
        SimpleField(name="chunk_id", type=SearchFieldDataType.String, key=True),
        SearchableField(name=CONTENT_FIELD_NAME, type=SearchFieldDataType.String),
        SearchField(
            name=VECTOR_FIELD_NAME,
            type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
            searchable=True,
            filterable=False,
            sortable=False,
            facetable=False,
            vector_search_dimensions=VECTOR_DIM,
            vector_search_profile_name="default",
        ),
    ]
    # Vector search config with an AzureOpenAIVectorizer
    vector_search = VectorSearch(
        algorithms=[HnswAlgorithmConfiguration(name="default")],
        profiles=[
            VectorSearchProfile(
                name="default",
                algorithm_configuration_name="default",
                vectorizer_name="default",
            )
        ],
        vectorizers=[
            AzureOpenAIVectorizer(
                vectorizer_name="default",
                parameters=AzureOpenAIVectorizerParameters(
                    resource_url=AZURE_OPENAI_ENDPOINT,
                    deployment_name=AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME,
                    model_name=AZURE_OPENAI_EMBEDDING_MODEL_NAME,
                    api_key=AZURE_OPENAI_API_KEY,
                ),
            )
        ],
    )

    # Create or update the index
    new_index = SearchIndex(name=index_name, fields=fields, vector_search=vector_search)
    try:
        index_client.delete_index(index_name)
    except Exception:
        pass

    index_client.create_or_update_index(new_index)
    print(f"Index '{index_name}' created.")

In [None]:
create_search_index(AZURE_SEARCH_INDEX_NAME)

### Generate Embeddings and Upload to Azure AI Search

In [None]:
from azure.search.documents import SearchClient
from openai import AzureOpenAI

search_client = SearchClient(
    AZURE_SEARCH_ENDPOINT, AZURE_SEARCH_INDEX_NAME, AzureKeyCredential(AZURE_SEARCH_API_KEY)
)
openai_client = AzureOpenAI(
    api_key=AZURE_OPENAI_API_KEY,
    api_version=AZURE_OPENAI_API_VERSION,
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
)


def embed_text(text: str):
    """
    Helper to generate embeddings with Azure OpenAI.
    """
    response = openai_client.embeddings.create(
        input=text, model=AZURE_OPENAI_EMBEDDING_MODEL_NAME, dimensions=VECTOR_DIM
    )
    return response.data[0].embedding

In [None]:
upload_docs = []
for chunk_id, chunk_text in all_chunks:
    embedding_vector = embed_text(chunk_text)
    upload_docs.append(
        {
            "chunk_id": chunk_id,
            "content": chunk_text,
            "content_vector": embedding_vector,
        }
    )

In [None]:
BATCH_SIZE = 50
for i in range(0, len(upload_docs), BATCH_SIZE):
    subset = upload_docs[i : i + BATCH_SIZE]
    resp = search_client.upload_documents(documents=subset)

    all_succeeded = all(r.succeeded for r in resp)
    print(
        f"Uploaded batch {i} -> {i + len(subset)}; all_succeeded: {all_succeeded}, "
        f"first_doc_status_code: {resp[0].status_code}"
    )

print("All chunks uploaded to Azure Search.")

### Part 4: Perform RAG over PDF
Combine retrieval from Azure AI Search with Azure OpenAI Chat Completions (aka. grounding your LLM)

In [None]:
from typing import Optional

from azure.search.documents.models import VectorizableTextQuery

def generate_chat_response(prompt: str, system_message: Optional[str] = None):
    """
    Generates a single-turn chat response using Azure OpenAI Chat.
    If you need multi-turn conversation or follow-up queries, you'll have to
    maintain the messages list externally.
    """
    messages = []
    if system_message:
        messages.append({"role": "system", "content": system_message})
    messages.append({"role": "user", "content": prompt})

    completion = openai_client.chat.completions.create(
        model=AZURE_OPENAI_CHAT_MODEL_NAME, messages=messages, temperature=1
    )
    return completion.choices[0].message.content


user_query = "What are the Probationary period duration of 1-2 year employment contract as per the cao 2025-2027?"
user_embed = embed_text(user_query)

vector_query = VectorizableTextQuery(
    text=user_query,  # passing in text for a hybrid search
    k_nearest_neighbors=5,
    fields=VECTOR_FIELD_NAME,
)

In [None]:
search_results = search_client.search(
    search_text=user_query, vector_queries=[vector_query], select=[CONTENT_FIELD_NAME], top=10
)

retrieved_chunks = []
for result in search_results:
    snippet = result[CONTENT_FIELD_NAME]
    retrieved_chunks.append(snippet)

In [None]:
context_str = "\n---\n".join(retrieved_chunks)
rag_prompt = f"""
You are an AI assistant helping answering questions about Dutch CAO.
Use ONLY the text below to answer the user's question.
If the answer isn't in the text, say you don't know.

Context:
{context_str}

Question: {user_query}
Answer:
"""

final_answer = generate_chat_response(rag_prompt)

In [None]:
print("\nRAG Prompt and Response:")
print(rag_prompt)

In [None]:
print("\nFinal Answer:")
print(final_answer)