# Docling PDF Ingestion with Voyage Context-3 Embeddings

This notebook reproduces the CLI pipeline in a sequential, cell-by-cell workflow. It ingests PDFs with Docling, extracts figure images, chunks text with the hybrid chunker, generates contextualized embeddings with VoyageAI, and writes chunk documents into MongoDB Atlas.

## 1. Environment & Dependencies

Execute the cell below to install the required libraries. Re-run if any dependency updates are needed.

In [1]:
!pip install --quiet --upgrade \
    docling>=2.8.0 \
    docling-core>=2.29.0 \
    numpy==2.0.2 \
    pymongo>=4.9.0 \
    python-dotenv>=1.0.1 \
    rich==13.7.1 \
    tiktoken>=0.7.0 \
    transformers>=4.39.0 \
    voyageai>=0.2.3 \
    pymupdf>=1.23.0 \
    torch \
    modelscope \
    rapidocr_onnxruntime>=1.3.x

In [2]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [3]:
import os
from pathlib import Path

# Directory in Google Drive containing the PDF files
PDF_DIRECTORY_PATH = "/content/drive/MyDrive/SharedForColab/brn temp"
SOURCE_DIR = Path(PDF_DIRECTORY_PATH)

# Get all PDF file paths from the directory
source_urls = [os.path.join(PDF_DIRECTORY_PATH, f) for f in os.listdir(PDF_DIRECTORY_PATH) if f.lower().endswith('.pdf')]
len(source_urls)
# You can print the list of files to verify
# print("Found PDF files:")
# for pdf_file in source_urls:
#     print(pdf_file)

91

In [4]:
import torch
from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
from rich import print

# Configure accelerator (prefers Colab T4 GPU when available)
if torch.cuda.is_available():
    GPU_DEVICE = torch.device('cuda')
    ACCELERATOR_OPTIONS = AcceleratorOptions(device=AcceleratorDevice.CUDA, cuda_use_flash_attention2=True)
    print('[bold green]CUDA GPU is enabled:[/bold green] {}'.format(torch.cuda.get_device_name(0)))
elif getattr(torch.backends, 'mps', None) and torch.backends.mps.is_available():
    GPU_DEVICE = torch.device('mps')
    ACCELERATOR_OPTIONS = AcceleratorOptions(device=AcceleratorDevice.MPS)
    print('[bold green]Apple MPS GPU is enabled.[/bold green]')
else:
    GPU_DEVICE = torch.device('cpu')
    ACCELERATOR_OPTIONS = AcceleratorOptions(device=AcceleratorDevice.CPU)
    print('[yellow]No GPU detected; running on CPU.[/yellow]')

## 2. Imports & Helper Types

We import the core libraries and define serialization helpers to keep figure data referenced rather than embedded.

In [5]:
from __future__ import annotations

import hashlib
import json
import io
import logging
import base64
import os
import re
from pathlib import Path
from typing import Any, Iterable, Optional, Sequence

import fitz
import numpy as np
import torch
from dotenv import load_dotenv
from pymongo import MongoClient
from pymongo.collection import Collection
from pymongo.errors import BulkWriteError
from rich import print
from voyageai import Client as VoyageClient

from docling.chunking import HybridChunker
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling_core.transforms.chunker.hierarchical_chunker import (
    ChunkingDocSerializer,
    ChunkingSerializerProvider,
)
from docling_core.transforms.serializer.base import SerializationResult
from docling_core.transforms.serializer.common import create_ser_result
from docling_core.transforms.serializer.markdown import (
    MarkdownParams,
    MarkdownPictureSerializer,
)
from docling_core.types.doc.document import DoclingDocument, PictureItem
from transformers import AutoTokenizer
from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer

## 3. Figure Serialization & Utility Functions

The helpers below mirror the CLI functionality: figure extraction without bounding boxes, chunk preparation, embedding normalization, and Mongo persistence.

In [6]:
class FigureReferencePictureSerializer(MarkdownPictureSerializer):
    """Render picture items as references instead of inline images."""

    def serialize(  # type: ignore[override]
        self,
        *,
        item: PictureItem,
        doc_serializer: ChunkingDocSerializer,
        doc: DoclingDocument,
        **kwargs: Any,
    ) -> SerializationResult:
        caption = ""
        if hasattr(item, "caption_text"):
            caption = (item.caption_text(doc=doc) or "").strip()
        text = f"[Figure {item.self_ref}]"
        if caption:
            text = f"{text} — {caption}"
        return create_ser_result(text=text, span_source=item)


class FigureReferenceSerializerProvider(ChunkingSerializerProvider):
    """Provide chunk serializers that keep figures as metadata references."""

    def get_serializer(self, doc: DoclingDocument) -> ChunkingDocSerializer:
        return ChunkingDocSerializer(
            doc=doc,
            params=MarkdownParams(image_placeholder=""),
            picture_serializer=FigureReferencePictureSerializer(),
        )


MIN_FIGURE_HEIGHT_PX = 120
BINARY_IMAGE_FORMAT = "png"
BINARY_MIME_TYPE = "image/png"
BINARY_FILE_EXTENSION = BINARY_IMAGE_FORMAT

EMBED_MAX_RETRIES = 3
EMBED_RETRY_BASE_DELAY = 2.0

def slugify(value: str) -> str:
    value = value.lower()
    value = re.sub(r"[^a-z0-9]+", "-", value)
    value = value.strip("-")
    return value or "document"


def hash_path(path: Path) -> str:
    return hashlib.sha1(str(path).encode("utf-8")).hexdigest()[:8]
def filter_figures_by_height(figure_map: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
    """Return a subset of figures whose height exceeds the minimum allowed threshold."""
    return {
        figure_id: figure
        for figure_id, figure in figure_map.items()
        if figure.get("height", 0) > MIN_FIGURE_HEIGHT_PX
    }






def extract_figures(doc: DoclingDocument, doc_id: str, image_root: Path) -> dict[str, dict[str, Any]]:
    image_root.mkdir(parents=True, exist_ok=True)
    figures: dict[str, dict[str, Any]] = {}
    for picture in getattr(doc, "pictures", []):
        if not isinstance(picture, PictureItem):
            continue

        image_ref = getattr(picture, "image", None)
        if image_ref is None or getattr(image_ref, "pil_image", None) is None:
            continue

        pil_image = image_ref.pil_image
        file_name = f"{slugify(picture.self_ref)}.png"
        image_path = image_root / file_name

        buffer = io.BytesIO()
        pil_image.save(buffer, format="PNG")
        image_bytes = buffer.getvalue()
        image_path.write_bytes(image_bytes)
        image_b64 = base64.b64encode(image_bytes).decode("ascii")
        width, height = pil_image.size

        caption = ""
        if hasattr(picture, "caption_text"):
            caption = (picture.caption_text(doc=doc) or "").strip()

        provenance_data = []
        for prov in getattr(picture, "prov", []) or []:
            prov_entry: dict[str, Any] = {}
            for attr in ("page_no", "page_id", "line_no"):
                if hasattr(prov, attr):
                    prov_entry[attr] = getattr(prov, attr)
            if prov_entry:
                provenance_data.append(prov_entry)

        figures[picture.self_ref] = {
            "figure_id": picture.self_ref,
            "doc_id": doc_id,
            "caption": caption or None,
            "image_path": str(image_path.resolve()),
            "image_b64": image_b64,
            "width": int(width),
            "height": int(height),
            "provenance": provenance_data,
        }

    return figures



def build_light_and_binary_maps(figure_map: dict[str, dict[str, Any]], *, image_root: Path) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]:
    """Split a raw figure map into lightweight references and n8n-style binary entries."""
    figure_refs_light: dict[str, dict[str, Any]] = {}
    figure_binary_map: dict[str, dict[str, Any]] = {}
    for figure_id, figure in figure_map.items():
        rel_path = os.path.relpath(figure["image_path"], image_root)
        figure_refs_light[figure_id] = {
            "figure_id": figure_id,
            "doc_id": figure.get("doc_id"),
            "caption": figure.get("caption"),
            "image_path": rel_path,
            "width": figure.get("width"),
            "height": figure.get("height"),
            "provenance": figure.get("provenance", []),
        }
        file_name = Path(rel_path).name
        image_b64 = figure.get("image_b64")
        if image_b64:
            figure_binary_map[figure_id] = {
                "data": image_b64,
                "mimeType": BINARY_MIME_TYPE,
                "fileExtension": BINARY_FILE_EXTENSION,
                "fileName": file_name,
            }
    return figure_refs_light, figure_binary_map


def build_chunk_binary_for_refs(
    figure_refs: list[dict[str, Any]],
    figure_binary_map: dict[str, dict[str, Any]],
) -> dict[str, dict[str, Any]]:
    """Build the per-chunk binary map containing only referenced figures."""
    chunk_binary: dict[str, dict[str, Any]] = {}
    for ref in figure_refs:
        figure_id = ref.get("figure_id")
        if not figure_id:
            continue
        binary_entry = figure_binary_map.get(figure_id)
        if not binary_entry:
            continue
        chunk_binary[figure_id] = dict(binary_entry)
    return chunk_binary


def figure_subset(
    doc_items: Iterable[Any],
    figure_refs_light: dict[str, dict[str, Any]],
) -> list[dict[str, Any]]:
    figure_refs: list[dict[str, Any]] = []
    for doc_item in doc_items:
        ref = None
        if isinstance(doc_item, PictureItem):
            ref = figure_refs_light.get(doc_item.self_ref)
        else:
            item_ref = getattr(doc_item, "self_ref", None)
            if item_ref:
                ref = figure_refs_light.get(item_ref)
        if not ref:
            continue
        figure_refs.append(dict(ref))
    return figure_refs

def cleanup_figure_dir(figure_dir: Path, *, debug_enabled: bool, doc_id: str) -> None:
    if debug_enabled:
        logging.info("DEBUG_CHUNK_IMGS enabled; retaining extracted figures for %s at %s.", doc_id, figure_dir)
        return
    if not figure_dir.exists():
        return
    deleted_files: list[str] = []
    for path in sorted(figure_dir.glob('*')):
        if path.is_file():
            try:
                path.unlink()
                deleted_files.append(path.name)
            except OSError as exc:
                logging.warning("Failed to delete %s for %s: %s", path, doc_id, exc)
    try:
        figure_dir.rmdir()
    except OSError:
        pass
    if deleted_files:
        logging.info(
            "Deleted %d extracted figure file(s) for %s: %s",
            len(deleted_files),
            doc_id,
            ", ".join(deleted_files),
        )
    else:
        logging.info("No extracted figure files to delete for %s.", doc_id)



def normalize_embeddings(raw_embeddings: Any, expected_length: int) -> list[list[float]]:
    if raw_embeddings is None:
        return []

    def is_vector(candidate: Any) -> bool:
        return isinstance(candidate, (list, tuple)) and candidate and isinstance(candidate[0], (float, int))

    def flatten_embeddings(candidate: Any) -> list[list[float]]:
        if is_vector(candidate):
            return [[float(x) for x in candidate]]  # type: ignore[arg-type]
        if isinstance(candidate, list):
            vectors: list[list[float]] = []
            for item in candidate:
                vectors.extend(flatten_embeddings(item))
            return vectors
        raise ValueError(
            "Unexpected embedding payload type "
            f"{type(candidate)!r}; unable to flatten contextualized embeddings."
        )

    vectors = flatten_embeddings(raw_embeddings)
    if len(vectors) != expected_length:
        raise ValueError(
            "Unable to align contextualized embeddings with chunk outputs; "
            f"expected {expected_length} vectors, received {len(vectors)}."
        )
    return vectors


def prepare_chunk_records(
    *,
    doc: DoclingDocument,
    doc_id: str,
    source_path: Path,
    chunker: HybridChunker,
    figure_refs_light: dict[str, dict[str, Any]],
    figure_binary_map: dict[str, dict[str, Any]],
) -> list[dict[str, Any]]:
    records: list[dict[str, Any]] = []
    chunks = list(chunker.chunk(dl_doc=doc))
    for idx, chunk in enumerate(chunks):
        enriched_text = chunker.contextualize(chunk=chunk)
        doc_items = getattr(chunk.meta, "doc_items", []) or []
        figure_refs = figure_subset(doc_items, figure_refs_light)
        record = {
            "chunk_id": f"{doc_id}::chunk-{idx:04d}",
            "document_id": doc_id,
            "chunk_index": idx,
            "text": enriched_text,
            "metadata": {
                "source_pdf": str(source_path),
                "pages": getattr(chunk.meta, "pages", []),
                "headings": getattr(chunk.meta, "headings", []),
                "doc_items": [getattr(item, "self_ref", None) for item in doc_items],
                "figure_refs": figure_refs,
            },
            "binary": build_chunk_binary_for_refs(figure_refs, figure_binary_map),
        }
        records.append(record)
    return records



def embed_chunks(
    *,
    voyage_client: VoyageClient,
    records: Sequence[dict[str, Any]],
    model_name: str,
    output_dimension: int | None,
    output_dtype: str,
) -> list[list[float]]:
    texts = [record["text"] for record in records]
    if not texts:
        return []

    total_chars = sum(len(text) for text in texts)
    for attempt in range(1, EMBED_MAX_RETRIES + 1):
        logging.info(
            "Embedding %d chunk(s) (%d chars) with Voyage model '%s' [attempt %d/%d]",
            len(texts),
            total_chars,
            model_name,
            attempt,
            EMBED_MAX_RETRIES,
        )
        try:
            response = voyage_client.contextualized_embed(
                inputs=[texts],
                model=model_name,
                input_type="document",
                output_dimension=output_dimension,
                output_dtype=output_dtype,
            )
            request_id = getattr(response, "request_id", None)
            if request_id:
                logging.info("Voyage embedding request succeeded (request_id=%s).", request_id)
            raw_embeddings = None
            if hasattr(response, "embeddings"):
                raw_embeddings = response.embeddings
            elif hasattr(response, "results"):
                raw_embeddings = response.results[0].embeddings  # type: ignore[index]
            return normalize_embeddings(raw_embeddings, len(records))
        except Exception as exc:
            if attempt == EMBED_MAX_RETRIES:
                logging.error(
                    "Embedding failed after %d attempt(s) for %d chunk(s): %s",
                    attempt,
                    len(texts),
                    exc,
                )
                raise
            backoff = EMBED_RETRY_BASE_DELAY * (2 ** (attempt - 1))
            logging.warning(
                "Embedding attempt %d/%d failed (%s). Retrying in %.1f seconds...",
                attempt,
                EMBED_MAX_RETRIES,
                exc,
                backoff,
            )
            time.sleep(backoff)

    raise RuntimeError("Unexpected embedding retry loop exit")


def attach_embeddings(records: list[dict[str, Any]], embeddings: Sequence[Sequence[float]]) -> None:
    if len(records) != len(embeddings):
        raise ValueError("Mismatch between records and embeddings length.")
    for record, embedding in zip(records, embeddings):
        record["embedding"] = np.asarray(embedding, dtype=np.float32).tolist()


def persist_records(collection: Collection, records: Sequence[dict[str, Any]]) -> None:
    if not records:
        return
    try:
        collection.insert_many(records, ordered=False)
    except BulkWriteError as exc:
        print("[red]MongoDB bulk write encountered an error:", exc)
        raise


def write_metadata_doc(
    *,
    metadata_path: Path,
    doc_id: str,
    source_pdf: Path,
    figure_refs_light: dict[str, dict[str, Any]],
    chunk_count: int,
    voyage_model: str,
    conversion_strategy: str,
) -> None:
    metadata_path.parent.mkdir(parents=True, exist_ok=True)
    metadata_content = {
        "doc_id": doc_id,
        "source_pdf": str(source_pdf),
        "figure_inventory": [
            dict(fig)
            for fig in figure_refs_light.values()
        ],
        "chunk_count": chunk_count,
        "voyage_model": voyage_model,
        "conversion_strategy": conversion_strategy,
    }
    metadata_path.write_text(json.dumps(metadata_content, indent=2), encoding="utf-8")


## 3a. OCR & Conversion Helpers

Utility functions that configure Docling's OCR, provide fallback conversion paths, and keep track of the active accelerator.


In [7]:
ACCELERATOR_OPTIONS: Optional[AcceleratorOptions] = None
GPU_DEVICE: Optional[torch.device] = None


def build_pdf_pipeline_options() -> PdfPipelineOptions:
    options = PdfPipelineOptions()
    options.generate_picture_images = True
    options.generate_page_images = False
    options.do_picture_description = False
    if ACCELERATOR_OPTIONS is not None:
        options.accelerator_options = ACCELERATOR_OPTIONS
    return options

def convert_with_rapid(pdf_path: Path) -> DoclingDocument:

    from docling.datamodel.document import ConversionResult
    from huggingface_hub import hf_hub_download
    from docling.models.rapid_ocr_model import RapidOcrOptions  # adjust import if your project exposes it elsewhere

    # Minimal English-only RapidOCR assets (single-file downloads)
    print("Downloading RapidOCR models (English only, ONNX)")
    det_model_path = hf_hub_download(
        repo_id="SWHL/RapidOCR",
        filename="PP-OCRv4/en_PP-OCRv3_det_infer.onnx",
    )  # file exists, ~2.4MB.  [oai_citation:0‡Hugging Face](https://huggingface.co/SWHL/RapidOCR/blob/main/PP-OCRv4/en_PP-OCRv3_det_infer.onnx)

    rec_model_path = hf_hub_download(
        repo_id="SWHL/RapidOCR",
        filename="PP-OCRv3/en_PP-OCRv3_rec_infer.onnx",
    )  # file exists, ~9.0MB.  [oai_citation:1‡Hugging Face](https://huggingface.co/SWHL/RapidOCR/blob/main/PP-OCRv3/en_PP-OCRv3_rec_infer.onnx)

    rec_keys_path = hf_hub_download(
    repo_id="deepghs/paddleocr",
    filename="rec/en_PP-OCRv4_rec/dict.txt",
    )

    ocr_options = RapidOcrOptions(
        det_model_path=det_model_path,
        rec_model_path=rec_model_path,
        rec_keys_path=rec_keys_path,
        use_cls=False,  # avoid downloading the CLS model
    )

    pipeline_options = PdfPipelineOptions(
        generate_picture_images=True,
        generate_page_images=False,
        do_picture_description=False,
        ocr_options=ocr_options,
    )
    if ACCELERATOR_OPTIONS is not None:
        pipeline_options.accelerator_options = ACCELERATOR_OPTIONS

    converter = DocumentConverter(
        format_options={
            InputFormat.PDF: PdfFormatOption(
                pipeline_options=pipeline_options,
            ),
        }
    )

    conversion_result: ConversionResult = converter.convert(source=str(pdf_path))
    result = conversion_result.document
    return result

def convert_with_docling(pdf_path: Path) -> DoclingDocument:
    converter = DocumentConverter(
        format_options={
            InputFormat.PDF: PdfFormatOption(
                pipeline_options=build_pdf_pipeline_options(),
            ),
        }
    )
    result = converter.convert(source=str(pdf_path))
    return result.document


def document_has_text(doc: DoclingDocument) -> bool:
    try:
        iterator = doc.iterate_items()  # type: ignore[attr-defined]
    except Exception:
        iterator = []
    for item in iterator:
        text = getattr(item, 'text', None)
        if isinstance(text, str) and text.strip():
            return True
    return False


def convert_with_pymupdf(
    pdf_path: Path,
    *,
    figure_dir: Path,
    doc_id: str,
) -> tuple[list[dict[str, Any]], dict[str, dict[str, Any]]]:
    figure_dir.mkdir(parents=True, exist_ok=True)
    doc = fitz.open(pdf_path)
    plain_chunks: list[dict[str, Any]] = []
    figure_map: dict[str, dict[str, Any]] = {}
    for page_index, page in enumerate(doc):
        page_text = page.get_text('text').strip()
        if page_text:
            plain_chunks.append({'text': page_text, 'pages': [page_index]})
        for image_index, img in enumerate(page.get_images(full=True)):
            xref = img[0]
            pix = fitz.Pixmap(doc, xref)
            if pix.n >= 4:
                pix = fitz.Pixmap(fitz.csRGB, pix)
            width, height = pix.width, pix.height
            image_filename = 'page{page:03d}_img{idx:02d}.png'.format(
                page=page_index + 1,
                idx=image_index + 1,
            )
            image_path = figure_dir / image_filename
            pix.save(str(image_path))
            image_bytes = Path(image_path).read_bytes()
            image_b64 = base64.b64encode(image_bytes).decode("ascii")
            figure_id = 'p{page}_img{idx}'.format(
                page=page_index + 1,
                idx=image_index + 1,
            )
            figure_map[figure_id] = {
                'figure_id': figure_id,
                'doc_id': doc_id,
                'caption': None,
                'image_path': str(image_path.resolve()),
                'image_b64': image_b64,
                'width': int(width),
                'height': int(height),
                'provenance': [{'page_no': page_index}],
            }
    figure_map = filter_figures_by_height(figure_map)
    return plain_chunks, figure_map


def build_records_from_plain_chunks(
    *,
    chunks: list[dict[str, Any]],
    doc_id: str,
    source_path: Path,
    figure_refs_light: dict[str, dict[str, Any]],
    figure_binary_map: dict[str, dict[str, Any]],
) -> list[dict[str, Any]]:
    records: list[dict[str, Any]] = []
    for idx, chunk in enumerate(chunks):
        pages = chunk.get('pages', [])
        figure_refs: list[dict[str, Any]] = []
        for fig in figure_refs_light.values():
            prov_pages = {prov.get('page_no') for prov in fig.get('provenance', [])}
            if prov_pages.intersection(pages):
                figure_refs.append(dict(fig))
        record = {
            'chunk_id': '{}::chunk-{:04d}'.format(doc_id, idx),
            'document_id': doc_id,
            'chunk_index': idx,
            'text': chunk.get('text', ''),
            'metadata': {
                'source_pdf': str(source_path),
                'pages': pages,
                'headings': [],
                'doc_items': [],
                'figure_refs': figure_refs,
            },
            'binary': build_chunk_binary_for_refs(figure_refs, figure_binary_map),
        }
        records.append(record)
    return records

## 4. Load Environment Variables & Configure Paths

We read credentials from `.env` (VOYAGE_API_KEY, MONGODB_URI, etc.) and define directories for source PDFs, figure images, and metadata.

### Environment Loading, Path Config



In [8]:
from dotenv import load_dotenv
import os

# Construct the full path to the .env file
dotenv_path = os.path.join(os.getcwd(), '.env')

# Load the .env file
load_dotenv(dotenv_path)

VOYAGE_API_KEY = os.getenv('VOYAGE_API_KEY')
if not VOYAGE_API_KEY:
    raise RuntimeError('VOYAGE_API_KEY must be set in the environment or .env file.')

MONGODB_URI = os.getenv('MONGODB_URI')
if not MONGODB_URI:
    raise RuntimeError('MONGODB_URI must be set in the environment or .env file.')

MONGODB_DB = os.getenv('MONGODB_DB')
if not MONGODB_DB:
    raise RuntimeError('MONGODB_DB must be set in the environment or .env file.')
MONGODB_COLLECTION = os.getenv('MONGODB_COLLECTION')
if not MONGODB_COLLECTION:
    raise RuntimeError('MONGODB_COLLECTION must be set in the environment or .env file.')

# SOURCE_DIR = Path(os.getenv('SOURCE_DIR').expanduser().resolve()
IMAGE_ROOT = Path(os.getenv('IMAGE_ROOT', 'artifacts/images')).expanduser().resolve()
METADATA_ROOT = Path(os.getenv('METADATA_ROOT', 'artifacts/documents')).expanduser().resolve()
DEBUG_CHUNK_IMGS = os.getenv('DEBUG_CHUNK_IMGS', 'false').lower() in {'1', 'true', 'yes', 'on'}
DO_OCR = os.getenv('DO_OCR', 'true').lower() in {'1', 'true', 'yes', 'on'}

stop_ingest_raw = os.getenv('stop_ingest')

STOP_INGEST: Optional[int] = None
if stop_ingest_raw and stop_ingest_raw.strip():
    try:
        STOP_INGEST = int(stop_ingest_raw.strip())
    except ValueError as exc:
        raise ValueError('stop_ingest must be an integer when provided.') from exc
    if STOP_INGEST < 0:
        raise ValueError('stop_ingest must be non-negative.')

# Chunking & embedding configuration
TOKENIZER_MODEL = 'voyageai/voyage-context-3'  # Hugging Face public tokenizer
VOYAGE_MODEL = 'voyage-context-3'  # VoyageAI embedding model identifier
CHUNK_MAX_TOKENS = int(os.getenv('CHUNK_MAX_TOKENS', '768'))
CHUNK_MERGE_PEERS = os.getenv('CHUNK_MERGE_PEERS', 'true').lower() != 'false'
VOYAGE_OUTPUT_DIM = os.getenv('VOYAGE_OUTPUT_DIM')
VOYAGE_OUTPUT_DTYPE = os.getenv('VOYAGE_OUTPUT_DTYPE', 'float')
VOYAGE_OUTPUT_DIM = int(VOYAGE_OUTPUT_DIM) if VOYAGE_OUTPUT_DIM and VOYAGE_OUTPUT_DIM.lower() != 'none' else None

print(f"[bold green]Source directory:[/bold green] {SOURCE_DIR}")
print(f"[bold green]Image root:[/bold green] {IMAGE_ROOT}")
print(f"[bold green]Metadata root:[/bold green] {METADATA_ROOT}")
print(f"[bold green]DEBUG_CHUNK_IMGS:[/bold green] {DEBUG_CHUNK_IMGS}")
print(f"[bold green]DO_OCR:[/bold green] {DO_OCR}")
if STOP_INGEST is None:
    print('[bold green]stop_ingest:[/bold green] unbounded')
else:
    print(f"[bold green]stop_ingest:[/bold green] {STOP_INGEST}")

## 5. Initialize Docling Converter, Chunker, Voyage Client, and Mongo Client

In [9]:
# Ensure output directories exist
IMAGE_ROOT.mkdir(parents=True, exist_ok=True)
METADATA_ROOT.mkdir(parents=True, exist_ok=True)

# Hybrid chunker configuration (tokenizer remains on CPU yet benefits from GPU-accelerated preprocessing)
hf_tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_MODEL)
chunk_tokenizer = HuggingFaceTokenizer(
    tokenizer=hf_tokenizer,
    max_tokens=CHUNK_MAX_TOKENS,
)
serializer_provider = FigureReferenceSerializerProvider()
chunker = HybridChunker(
    tokenizer=chunk_tokenizer,
    merge_peers=CHUNK_MERGE_PEERS,
    serializer_provider=serializer_provider,
)

# VoyageAI client for contextualized embeddings
voyage_client = VoyageClient(api_key=VOYAGE_API_KEY)

# MongoDB client (Atlas connection string recommended)
mongo_kwargs = {}
if MONGODB_URI.startswith('mongodb://'):
    mongo_kwargs['username'] = os.getenv('MONGODB_USERNAME')
    mongo_kwargs['password'] = os.getenv('MONGODB_PASSWORD')

mongo_client = MongoClient(MONGODB_URI, **mongo_kwargs)
mongo_collection = mongo_client[MONGODB_DB][MONGODB_COLLECTION]

print('[bold green]Initialization complete.[/bold green]')



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

In [10]:
from pymongo.errors import OperationFailure
from pymongo.operations import SearchIndexModel

def ensure_vector_search_index(collection, *, index_name: str | None = None) -> None:
    """Ensure the Atlas vector search index exists on the target collection."""
    input()
    index_name = index_name or os.getenv("VECTOR_INDEX") or "embedding_vector_index"
    if not index_name:
        raise ValueError("Set VECTOR_INDEX (or pass index_name) before creating the search index.")

    try:
        for existing in collection.list_search_indexes():
            if existing.get("name") == index_name:
                print(f"[yellow]Search index '{index_name}' already exists on '{collection.name}'.[/yellow]")
                return
    except OperationFailure as exc:
        raise RuntimeError(
            f"Unable to list search indexes on {collection.full_name}: {exc}"
        ) from exc

    model = SearchIndexModel(
      definition={
        "fields": [
          {
            "type": "vector",
            "numDimensions": 1024,
            "path": "embedding",
            "similarity": "cosine"
          }
        ]
      },
      name=index_name,
      type="vectorSearch",
    )

    collection.create_search_index(model=model)
    print(f"[bold green]Created search index '{index_name}' on '{collection.name}'.[/bold green]")


## 6. Run Ingestion over PDFs

This cell converts each PDF, extracts figures, chunks content, embeds each chunk with Voyage Context-3, and stores everything in MongoDB along with JSON metadata.

In [11]:
def pause():
    input("Paused. Press Enter to continue...")

In [12]:
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')

pdf_paths = sorted(
    path
    for path in SOURCE_DIR.rglob('*')
    if path.suffix.lower() == '.pdf' and path.is_file()
)

if not pdf_paths:
    print(f"[yellow]No PDF files found in {SOURCE_DIR}. Nothing to ingest.")
else:
    print(f"[bold cyan]Discovered {len(pdf_paths)} PDF(s) under {SOURCE_DIR}.[/bold cyan]")

processed_docs = 0
ingest_summary: list[dict[str, Any]] = []
total_chunks_computed = 0

for pdf_path in pdf_paths:
    if STOP_INGEST is not None and processed_docs >= STOP_INGEST:
        print(f"[yellow]stop_ingest limit of {STOP_INGEST} reached; ending ingestion early.")
        break

    doc_slug = slugify(pdf_path.stem)
    doc_hash = hash_path(pdf_path.resolve())
    doc_id = f'{doc_slug}-{doc_hash}'
    doc_image_dir = IMAGE_ROOT / doc_id
    relative_source = pdf_path.relative_to(SOURCE_DIR)

    print(f"[bold cyan]Processing[/bold cyan] {pdf_path} \u2192 {doc_id}")

    figure_map: dict[str, dict[str, Any]] = {}
    figure_refs_light: dict[str, dict[str, Any]] = {}
    figure_binary_map: dict[str, dict[str, Any]] = {}
    records: list[dict[str, Any]] = []
    conversion_strategy = 'docling_rapidocr'

    if DO_OCR:
        try:
            dl_doc = convert_with_rapid(pdf_path)
            # pause()
            if not document_has_text(dl_doc):
                raise ValueError('RapidOCR produced no textual content.')
            figure_map = extract_figures(dl_doc, doc_id, doc_image_dir)
            figure_map = filter_figures_by_height(figure_map)
            figure_refs_light, figure_binary_map = build_light_and_binary_maps(figure_map, image_root=IMAGE_ROOT)
            records = prepare_chunk_records(
                doc=dl_doc,
                doc_id=doc_id,
                source_path=relative_source,
                chunker=chunker,
                figure_refs_light=figure_refs_light,
                figure_binary_map=figure_binary_map,
            )
        except Exception as rapid_error:
            logging.warning('RapidOCR conversion failed for %s: %s', pdf_path, rapid_error)
            conversion_strategy = 'pymupdf'
            plain_chunks, figure_map = convert_with_pymupdf(
                pdf_path,
                figure_dir=doc_image_dir,
                doc_id=doc_id,
            )
            figure_map = filter_figures_by_height(figure_map)
            figure_refs_light, figure_binary_map = build_light_and_binary_maps(figure_map, image_root=IMAGE_ROOT)
            if not plain_chunks:
                logging.error('PyMuPDF fallback produced no textual content for %s; skipping.', pdf_path)
                continue
            records = build_records_from_plain_chunks(
                chunks=plain_chunks,
                doc_id=doc_id,
                source_path=relative_source,
                figure_refs_light=figure_refs_light,
                figure_binary_map=figure_binary_map,
            )
    else:
        conversion_strategy = 'pymupdf'
        logging.info('DO_OCR is false; falling back to PyMuPDF for %s.', pdf_path)
        plain_chunks, figure_map = convert_with_pymupdf(
            pdf_path,
            figure_dir=doc_image_dir,
            doc_id=doc_id,
        )
        figure_map = filter_figures_by_height(figure_map)
        figure_refs_light, figure_binary_map = build_light_and_binary_maps(figure_map, image_root=IMAGE_ROOT)
        if not plain_chunks:
            logging.error('PyMuPDF produced no textual content for %s; skipping.', pdf_path)
            continue
        records = build_records_from_plain_chunks(
            chunks=plain_chunks,
            doc_id=doc_id,
            source_path=relative_source,
            figure_refs_light=figure_refs_light,
            figure_binary_map=figure_binary_map,
        )

    if not records:
        logging.error('No chunk records produced for %s; skipping.', pdf_path)
        continue

    embeddings = embed_chunks(
        voyage_client=voyage_client,
        records=records,
        model_name=VOYAGE_MODEL,
        output_dimension=VOYAGE_OUTPUT_DIM,
        output_dtype=VOYAGE_OUTPUT_DTYPE,
    )
    attach_embeddings(records, embeddings)
    persist_records(mongo_collection, records)

    metadata_path = METADATA_ROOT / f'{doc_id}.json'
    write_metadata_doc(
        metadata_path=metadata_path,
        doc_id=doc_id,
        source_pdf=relative_source,
        figure_refs_light=figure_refs_light,
        chunk_count=len(records),
        voyage_model=VOYAGE_MODEL,
        conversion_strategy=conversion_strategy,
    )

    cleanup_figure_dir(doc_image_dir, debug_enabled=DEBUG_CHUNK_IMGS, doc_id=doc_id)

    binary_total = sum(len(record.get('binary', {})) for record in records)
    ingest_summary.append(
        {
            'doc_id': doc_id,
            'chunks': len(records),
            'figures': len(figure_refs_light),
            'binary_previews': binary_total,
            'metadata_file': metadata_path,
            'strategy': conversion_strategy,
        }
    )
    total_chunks_computed += len(records)

    storage_note = (
        f"figure images retained under {doc_image_dir}" if DEBUG_CHUNK_IMGS else
        "figure images cleaned from disk after ingest"
    )
    print(
        f"[green]Inserted[/green] {len(records)} chunk(s) via {conversion_strategy}; {storage_note}."
    )

    processed_docs += 1

if STOP_INGEST is not None and processed_docs < len(pdf_paths):
    print(
        f"[yellow]Processed {processed_docs} of {len(pdf_paths)} PDF(s) (stop_ingest={STOP_INGEST}).[/yellow]"
    )

print('[bold green]Ingestion run complete.[/bold green]')
print(f"[bold green]Total chunks computed in this session:[/bold green] {total_chunks_computed}")

In [13]:
from pymongo import MongoClient
import os

MONGODB_URI = os.getenv('MONGODB_URI')
MONGODB_DB = os.getenv('MONGODB_DB')
MONGODB_COLLECTION = os.getenv('MONGODB_COLLECTION')

if not MONGODB_URI or not MONGODB_DB or not MONGODB_COLLECTION:
    print('[red]MONGODB_URI, MONGODB_DB, or MONGODB_COLLECTION not set.[/red]')
else:
    try:
        client = MongoClient(MONGODB_URI)
        db = client[MONGODB_DB]
        collection = db[MONGODB_COLLECTION]

        # List all search indexes on the collection
        indexes = collection.list_search_indexes()
        if indexes:
            print(f"[bold cyan]Existing search indexes on collection '{MONGODB_COLLECTION}':[/bold cyan]")
            for index in indexes:
                print(index)
        else:
            print(f"[yellow]No search indexes found on collection '{MONGODB_COLLECTION}'.[/yellow]")
        client.close()
    except Exception as e:
        print(f"[red]Error connecting to MongoDB or listing indexes: {e}[/red]")


After running the cell above, if you find any vector search indexes, especially ones that don't have a `mappings` field in their `definition`, you might need to delete them directly from your MongoDB Atlas UI. Then, you can re-run the `ensure_vector_search_index` cell (cell `LOx5ZT6CFdKC`).

## 7a. Confirm MongoDB Atlas State

Please check your MongoDB Atlas instance to ensure the data has been ingested correctly. Once you've confirmed, run the next cell to create the vector search index.

In [14]:
# This cell will prompt the user to confirm before creating the vector index
user_confirmation = input("Have you checked MongoDB Atlas and confirmed the ingestion? (yes/no): ").lower()

if user_confirmation == 'yes':
    ensure_vector_search_index(mongo_collection)
else:
    print("[yellow]Vector index creation skipped based on user confirmation.[/yellow]")

Have you checked MongoDB Atlas and confirmed the ingestion? (yes/no): no


## 7. Inspect Summary

Review the documents processed in this session.

In [15]:
if ingest_summary:
    total_binary_previews = 0
    for item in ingest_summary:
        print(
            f"• [bold]{item['doc_id']}[/bold]: {item['chunks']} chunk(s), "
            f"{item['figures']} figure(s), binary previews → {item['binary_previews']}, "
            f"strategy → {item['strategy']}, metadata → {item['metadata_file']}"
        )
        total_binary_previews += item['binary_previews']
    print(f"[cyan]Total binary previews attached:[/cyan] {total_binary_previews}")
    print('[cyan]Metadata figure_refs remain lightweight (no embedded base64).[/cyan]')
else:
    print('[yellow]No documents were ingested in this session.')



## 8. Next Steps

- Confirm your MongoDB Atlas vector index is configured on the `embedding` field (float32 vectors).
- In n8n, inspect a retrieved item and ensure the top-level `binary` map surfaces PNG previews while `metadata.figure_refs` stays base64-free.
- If additional preprocessors or chunking strategies are required, experiment by tweaking the chunker configuration cells above.
