# Exploring Docling

## Docling Architecture

<img src="https://docling-project.github.io/docling/assets/docling_arch.png" />

<img src="https://codecut.ai/wp-content/uploads/2025/07/image-with-caption.png" />

## Ingest a document

Lets ingest a PDF document and see how docling pipeline works.

In [None]:
report_url = "https://arxiv.org/pdf/2408.09869"

In [None]:
from pathlib import Path

from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import (
    PdfPipelineOptions,
)
from docling.datamodel.settings import settings
from docling.document_converter import DocumentConverter, PdfFormatOption

# Explicitly set the accelerator
# ------------------------------
## Auto selection
# accelerator_options = AcceleratorOptions(
#     num_threads=8, device=AcceleratorDevice.AUTO
# )

## CPU
# accelerator_options = AcceleratorOptions(
#     num_threads=8, device=AcceleratorDevice.CPU
# )

## Mac
accelerator_options = AcceleratorOptions(
    num_threads=8, device=AcceleratorDevice.MPS
)

## NVidia GPU
# accelerator_options = AcceleratorOptions(
#     num_threads=8, device=AcceleratorDevice.CUDA
# )

# easyocr doesnt support cuda:N allocation, defaults to cuda:0
# accelerator_options = AcceleratorOptions(num_threads=8, device="cuda:1")

In [None]:
# Building the pipeline
# ------------------------------
pipeline_options = PdfPipelineOptions()
pipeline_options.accelerator_options = accelerator_options
pipeline_options.do_ocr = True
pipeline_options.do_table_structure = True
pipeline_options.table_structure_options.do_cell_matching = True

converter = DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(
            pipeline_options=pipeline_options,
        )
    }
)

# Enable the profiling to measure the time spent
settings.debug.profile_pipeline_timings = True

In [None]:
# Converting a document
# ------------------------------
conversion_result = converter.convert(report_url)

doc = conversion_result.document

# List with total time per document
doc_conversion_secs = conversion_result.timings["pipeline_total"].times

# Export to markdown and print it
# ---------------------------------
md = doc.export_to_markdown()

with open("output.md", "w") as f:
    f.write(md)
# print(md)
# print(f"Conversion secs: {doc_conversion_secs}")

In [None]:
# Print scores and grades
#
# ConversionResult data type: https://docling-project.github.io/docling/reference/document_converter/#docling.document_converter.ConversionResult
#
# ------------------------------
# conversion_result.confidence.model_dump_json()

## Confidence Scores

Users can and should safely focus on the document-level grade fields — `mean_grade` and `low_grade` — to assess overall conversion quality. Numerical scores are used internally and are for informational purposes only; their computation and weighting may change in the future.

In [None]:
conversion_result.confidence.mean_grade

In [None]:
conversion_result.confidence.low_grade

## Simple RAG with Milvus

In [None]:
from pymilvus import Collection, CollectionSchema, DataType, FieldSchema, connections, utility
from docling.chunking import HybridChunker
import numpy as np
from sentence_transformers import SentenceTransformer
from loguru import logger

# Connect to Milvus Lite
connections.connect(uri='rag.db')

# Define schema for Milvus collection
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=384)  # Assuming a model with 384 dimensions
]
schema = CollectionSchema(fields, "docling_rag_collection")

# Create collection
collection_name = "docling_rag_collection"
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)
collection = Collection(collection_name, schema)

# Index the collection
index_params = {
    "index_type": "IVF_FLAT",
    "metric_type": "L2",
    "params": {"nlist": 1024},
}
collection.create_index("vector", index_params)

# Chunk the document text using Docling's HybridChunker
## Initialize the chunker
chunker = HybridChunker()
## Use the chunker on the Docling document object
chunks_iterator = chunker.chunk(doc)
## Convert the iterator to a list
chunks = list(chunks_iterator)

# Debugging: Print the number of chunks and the first few chunks
logger.info(f"Number of chunks: {len(chunks)}")
if chunks:
    logger.info("First few chunks:")
    for i, chunk in enumerate(chunks[:5]):
        logger.info(f"Chunk {i}: {chunk.text[:100]}...") # Print first 100 characters

# Load a pre-trained sentence transformer model
# You can choose a different model based on your needs
model = SentenceTransformer('all-MiniLM-L6-v2')

# Create embeddings using the loaded model
def create_embeddings(text):
    # Ensure the embedding is a list of floats
    return model.encode(text).tolist()

# Debugging: Print the structure of the entities before insertion
texts = [chunk.text for chunk in chunks]
embeddings = [create_embeddings(chunk.text) for chunk in chunks]

# Convert embeddings to a NumPy array
embeddings_np = np.array(embeddings, dtype=np.float32)

# Debugging: Print the shape of the embeddings array
logger.info(f"Shape of embeddings array: {embeddings_np.shape}")

entities = [texts, embeddings_np] # Use the NumPy array for embeddings

logger.info(f"Structure of entities: {[len(e) for e in entities]}")
if entities and len(entities) > 0 and len(entities[0]) > 0:
    logger.info(f"First text entity: {entities[0][0][:100]}...")
    if len(entities) > 1 and len(entities[1]) > 0:
        logger.info(f"First embedding entity: {entities[1][0][:10]}...")


# Insert chunks and embeddings into Milvus
collection.insert(entities)

### Example search query

In [None]:
# Load collection for search
collection.load()

query_text = input("Enter your query:")
query_embedding = create_embeddings(query_text)

# Perform search
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search([query_embedding], "vector", search_params, limit=3)

# Aggregate and print search results in a single output
logger.info("Search Results:")
all_results_text = ""
for hits in results:
    for hit in hits:
        # Retrieve the original text based on the ID
        result_entity = collection.query(expr=f"id == {hit.id}", output_fields=["text"])
        if result_entity:
            all_results_text += f"ID: {hit.id}, Score: {hit.score}\n"
            all_results_text += f"Text: {result_entity[0]['text']}\n\n"

print(all_results_text)