# Ingestion and the Vector Database

---

This notebook is looking more closely at the vector database, how to query it and especially its metadata.

Minimal preparations, assuming the /backend/data_vs.db has been written already. If not, execute the steps at the bottom.

In [17]:
from conversational_toolkit.vectorstores.chromadb import ChromaDBVectorStore
from pathlib import Path
PROJECT_ROOT = Path.cwd()

# ----------------------------------------------------------------------
# 2️⃣ Build the absolute path to the DB file
# ----------------------------------------------------------------------
VS_PATH = PROJECT_ROOT / "backend" / "data_vs.db"
print(f"Current Working Directory:{PROJECT_ROOT}")
print(f"Path Variable:{VS_PATH}")
vs = ChromaDBVectorStore(db_path=str(VS_PATH))


Current Working Directory:/home/roger/Documents/DEV/sme-kt-zh-collaboration-rag
Path Variable:/home/roger/Documents/DEV/sme-kt-zh-collaboration-rag/backend/data_vs.db


#Query Metadata

In [16]:
results = vs.collection.get(include=["metadatas"])
for meta in results["metadatas"][:10]:
    print(meta)

{'title': '## **Water-Activated Tape**', 'source_file': 'EPD_tape_IPG_wateractivated (Copy 1).pdf', 'mime_type': 'text/markdown', 'source': 'EPD_tape_IPG_wateractivated (Copy 1).pdf', 'file_hash': '642d4738ef34f1d134ad583d5979b07967c6b062ec3da52317c3b31e7f4cdee9', 'chapters': '["## **Water-Activated Tape**"]'}
{'mime_type': 'text/markdown', 'source_file': 'EPD_tape_IPG_wateractivated (Copy 1).pdf', 'file_hash': '642d4738ef34f1d134ad583d5979b07967c6b062ec3da52317c3b31e7f4cdee9', 'source': 'EPD_tape_IPG_wateractivated (Copy 1).pdf', 'chapters': '["## **Water-Activated Tape**", "###### from"]', 'title': '###### from'}
{'chapters': '["## **Water-Activated Tape**", "###### from", "###### **Environmental** **Product** **Declaration**"]', 'mime_type': 'text/markdown', 'source': 'EPD_tape_IPG_wateractivated (Copy 1).pdf', 'title': '###### **Environmental** **Product** **Declaration**', 'source_file': 'EPD_tape_IPG_wateractivated (Copy 1).pdf', 'file_hash': '642d4738ef34f1d134ad583d5979b07967c6

or formatted a little nicer:

In [21]:
import pandas as pd

results = vs.collection.get(include=["metadatas"])

df = pd.DataFrame([
    {"id": chunk_id, **meta}
    for chunk_id, meta in zip(results["ids"], results["metadatas"])
])

df

Unnamed: 0,id,mime_type,source_file,chapters,source,file_hash,title,rows,sheet,columns
0,45d61dd4-931f-416b-87a3-ee7f99900215,text/markdown,EPD_tape_IPG_wateractivated (Copy 1).pdf,"[""## **Water-Activated Tape**""]",EPD_tape_IPG_wateractivated (Copy 1).pdf,642d4738ef34f1d134ad583d5979b07967c6b062ec3da5...,## **Water-Activated Tape**,,,
1,ddfadcf6-c973-4d96-88a9-5aa8b408c069,text/markdown,EPD_tape_IPG_wateractivated (Copy 1).pdf,"[""## **Water-Activated Tape**"", ""###### from""]",EPD_tape_IPG_wateractivated (Copy 1).pdf,642d4738ef34f1d134ad583d5979b07967c6b062ec3da5...,###### from,,,
2,3bd95a23-a5d4-462c-bae3-39b258a5df49,text/markdown,EPD_tape_IPG_wateractivated (Copy 1).pdf,"[""## **Water-Activated Tape**"", ""###### from"",...",EPD_tape_IPG_wateractivated (Copy 1).pdf,642d4738ef34f1d134ad583d5979b07967c6b062ec3da5...,###### **Environmental** **Product** **Declara...,,,
3,a8e63ce3-cdb4-4931-ac1a-8425b2d778eb,text/markdown,EPD_tape_IPG_wateractivated (Copy 1).pdf,"[""## **Water-Activated Tape**"", ""###### from"",...",EPD_tape_IPG_wateractivated (Copy 1).pdf,642d4738ef34f1d134ad583d5979b07967c6b062ec3da5...,###### 1,,,
4,78641a49-7ae9-40e2-8e9d-357729c858cc,text/markdown,EPD_tape_IPG_wateractivated (Copy 1).pdf,"[""## **Water-Activated Tape**"", ""## **EPD Prog...",EPD_tape_IPG_wateractivated (Copy 1).pdf,642d4738ef34f1d134ad583d5979b07967c6b062ec3da5...,## **EPD Programme Information**,,,
...,...,...,...,...,...,...,...,...,...,...
369,2d1463b6-47ee-4a20-9f50-7567b4e289f5,text/markdown,SPEC_tape_tesa_tesapack58297.pdf,"[""# tesapack\u00ae Eco & Ultra Strong ecoLogo\...",SPEC_tape_tesa_tesapack58297.pdf,300b4decc8927c6edf4fe0036de78d7080bc4b6c3bc0e5...,### **Eigenschaften / Leistungswerte**,,,
370,98b90ff3-3d1e-4457-b568-879e1b0a8550,text/markdown,SPEC_tape_tesa_tesapack58297.pdf,"[""# tesapack\u00ae Eco & Ultra Strong ecoLogo\...",SPEC_tape_tesa_tesapack58297.pdf,300b4decc8927c6edf4fe0036de78d7080bc4b6c3bc0e5...,# tesapack® Eco & Ultra Strong ecoLogo®,,,
371,4bef34b6-93be-4c1f-aa1b-32920254d9df,text/markdown,SPEC_tape_tesa_tesapack58297.pdf,"[""# tesapack\u00ae Eco & Ultra Strong ecoLogo\...",SPEC_tape_tesa_tesapack58297.pdf,300b4decc8927c6edf4fe0036de78d7080bc4b6c3bc0e5...,## Produkt Information,,,
372,39f2a524-3391-4216-bea0-68d026ed2ad0,text/markdown,SPEC_tape_tesa_tesapack58297.pdf,"[""# tesapack\u00ae Eco & Ultra Strong ecoLogo\...",SPEC_tape_tesa_tesapack58297.pdf,300b4decc8927c6edf4fe0036de78d7080bc4b6c3bc0e5...,### Haftungsausschluss,,,


## Updating Chunk Metadata After Ingestion

ChromaDB allows you to add or modify metadata fields on already-stored chunks
using `collection.update()`. This is useful for backfilling information you didn't
have at ingestion time — for example, a review status, a quality score, or (as we
will see later) a file hash for deduplication.

> **Important:** `collection.update()` replaces the entire metadata dict for each
> chunk. Always merge the existing metadata with your new field using `{**meta, "new_key": value}`
> to avoid wiping existing fields.

The cells below add a dummy `"demo"` field to every chunk as a minimal example,
then verify the result.


In [7]:
# Get all existing IDs and metadata
results = vs.collection.get(include=["metadatas"])

# Merge existing metadata with the new field
updated_metadatas = [
    {**meta, "demo": "test"}
    for meta in results["metadatas"]
]

vs.collection.update(ids=results["ids"], metadatas=updated_metadatas)
print(f"Updated {len(results['ids'])} chunks")

Updated 374 chunks


In [9]:
sample = vs.collection.get(ids=results["ids"][:5], include=["metadatas"])
pd.DataFrame(sample["metadatas"])

Unnamed: 0,mime_type,title,source,demo,source_file,chapters
0,text/markdown,# Supplier Sustainability Requirements,ART_internal_procurement_policy.pdf,test,ART_internal_procurement_policy.pdf,"[""# Supplier Sustainability Requirements""]"
1,text/markdown,## 1. Purpose and Scope,ART_internal_procurement_policy.pdf,test,ART_internal_procurement_policy.pdf,"[""# Supplier Sustainability Requirements"", ""##..."
2,text/markdown,## 2. Evidence Standards,ART_internal_procurement_policy.pdf,test,ART_internal_procurement_policy.pdf,"[""# Supplier Sustainability Requirements"", ""##..."
3,text/markdown,## 3. Requirements by Category,ART_internal_procurement_policy.pdf,test,ART_internal_procurement_policy.pdf,"[""# Supplier Sustainability Requirements"", ""##..."
4,text/markdown,### 3.1 All Suppliers and Products,ART_internal_procurement_policy.pdf,test,ART_internal_procurement_policy.pdf,"[""# Supplier Sustainability Requirements"", ""##..."


## Idempotence of the ingestion / Deduplication via File Hashing

A common problem in document ingestion pipelines is **accidentally re-ingesting
the same file twice** — for example when adding new
documents to the data folder again, or in case of duplicates with different filenames in the document corpus. This bloats the vector
store with duplicate chunks and degrades retrieval quality.

### The idea

Instead of tracking filenames (which can change), we compute a **SHA-256 hash
of the file's raw bytes**. This hash is a unique fingerprint of the file's
content — if the content hasn't changed, the hash won't change either.

The strategy has three steps:

1. **Hash** — Before ingesting a file, compute its SHA-256 hash.
2. **Check** — Query the vector store for any chunk that already carries that
   hash as metadata. If one exists, the file is already ingested — skip it.
3. **Stamp** — If the file is new, add the hash as a `"file_hash"` metadata
   field on every chunk before inserting them. Future runs can then detect it.

This approach correctly handles renamed files (same hash → skip) and detects
modified files (different hash → re-ingest). Disadvantage: the filenames and folder path of the duplicates will be lost.

The cells below demonstrate all three steps.

In [None]:
#1. Hash the file:


import hashlib

def file_hash(path: str) -> str:
    return hashlib.sha256(Path(path).read_bytes()).hexdigest()

    

In [None]:

#2. Stamp it onto chunks before ingestion (this is where you'd extend load_chunks):


for chunk in file_chunks:
    chunk.metadata["file_hash"] = file_hash(str(file_path))



In [None]:

#3. Check for duplicates before ingesting a file:


def already_ingested(collection, hash_value: str) -> bool:
    results = collection.get(where={"file_hash": hash_value}, limit=1)
    return len(results["ids"]) > 0
Then the guard becomes:


hash_value = file_hash(str(file_path))
if already_ingested(vs.collection, hash_value):
    print(f"Skipping {file_path.name} — already in store")
else:
    # embed and insert chunks
# The key thing: ChromaDB's where= filter on .get() lets you query by any metadata field, so the hash becomes a good lookup key.


## Changes Made to `feature0_baseline_rag.py`

Three functions were added or updated to support incremental, hash-based ingestion.

**`file_hash()`** is a new helper that computes a SHA-256 fingerprint of a file's
raw bytes. Crucially, the hash is based on file *content*, not the filename —
renaming a file produces the same hash, and modifying it produces a different one.

**`get_existing_hashes()`** is a new helper that reads the vector store metadata
and returns the set of file hashes already persisted. It is called before
`load_chunks()` so that file parsing can be skipped early for known files.

**`load_chunks()`** now accepts an `existing_hashes` parameter and maintains an
internal `seen_hashes` set as it iterates over files. This gives it two layers
of deduplication: it skips files whose hash is already in the store (cross-run
dedup), and it skips files with identical content to one already processed in the
current batch (within-run dedup). The expensive PDF parsing step is therefore
never wasted on a file that would be discarded anyway.

**`build_vector_store()`** replaces the all-or-nothing empty-store check with a
per-file check. Incoming chunks are grouped by `"file_hash"` and each group is
checked against the store before embedding. It logs a summary of how many
files and chunks were skipped versus embedded, and warns when two different
filenames in the current batch carry identical content.


In [None]:

def file_hash(path: Path) -> str:
    """SHA-256 fingerprint of a file's raw bytes."""
    return hashlib.sha256(path.read_bytes()).hexdigest()


def get_existing_hashes(db_path: Path = VS_PATH) -> set[str]:
    """Return file hashes already present in the vector store."""
    if not db_path.exists():
        return set()
    vs = ChromaDBVectorStore(db_path=str(db_path))
    result = vs.collection.get(include=["metadatas"])
    return {m["file_hash"] for m in result["metadatas"] if "file_hash" in m}


def load_chunks(max_files: int | None = None, existing_hashes: set[str] | None = None) -> list[Chunk]:
    """Load documents from DATA_DIR and split them into chunks.

    Supported formats:
        .pdf: converted to Markdown via pymupdf4llm, split on headings
        .xlsx, .xls: one chunk per sheet (Markdown table)

    Unsupported formats (e.g. standalone images) are logged as warnings and skipped.
    Images embedded inside PDFs are not extracted as text by default!

    Pass 'max_files' to cap the total number of files processed. Useful for quick
    iteration during development before scaling to all files.
    """
    all_chunks: list[Chunk] = []
    all_files = sorted(f for f in DATA_DIR.iterdir() if f.is_file())

    if max_files is not None:
        all_files = all_files[:max_files]
        print(len(all_files))

    for f in all_files:
        ext = f.suffix.lower()
        if ext not in _CHUNKERS:
            if ext in _IMAGE_EXTENSIONS:
                logger.warning(f"Skipping image file (not supported): {f.name}")
            else:
                logger.warning(f"Skipping unsupported file type {ext!r}: {f.name}")

    supported_files = [f for f in all_files if f.suffix.lower() in _CHUNKERS]
    logger.info(f"Chunking {len(supported_files)} files from {DATA_DIR}")

    seen_hashes: set[str] = set()
    for file_path in supported_files:
        hash_value = file_hash(file_path)
        if existing_hashes is not None and hash_value in existing_hashes:
            logger.info(f"Skipping {file_path.name!r} — already in store (hash={hash_value[:8]}…)")
            continue
        if hash_value in seen_hashes:
            logger.warning(f"Skipping {file_path.name!r} — duplicate content in current batch (hash={hash_value[:8]}…)")
            continue
        seen_hashes.add(hash_value)
        chunker = _CHUNKERS[file_path.suffix.lower()]
        try:
            file_chunks = chunker.make_chunks(str(file_path))
            for chunk in file_chunks:
                chunk.metadata["file_hash"] = hash_value
                chunk.metadata["source_file"] = file_path.name
                chunk.metadata["source"] = file_path.name
                chunk.metadata["title"] = chunk.title
            all_chunks.extend(file_chunks)
            logger.debug(f"  {file_path.name}: {len(file_chunks)} chunks")
        except Exception as exc:
            logger.warning(f"Skipping {file_path.name}: {exc}")

    logger.info(f"Done, {len(all_chunks)} chunks total")
    return all_chunks

In [None]:
async def build_vector_store(
    chunks: list[Chunk],
    embedding_model: SentenceTransformerEmbeddings,
    db_path: Path = VS_PATH,
    reset: bool = False,
) -> ChromaDBVectorStore:
    """Embed 'chunks' and persist them in a ChromaDB vector store.

    Set 'reset=True' to delete and rebuild the store from scratch. Leave
    'reset=False' (default) to reuse an existing store, embedding all documents
    takes time; skipping it on subsequent runs saves time.
    """
    if reset and db_path.exists():
        import shutil
        shutil.rmtree(db_path)
        logger.info(f"Deleted existing vector store at {db_path}")

    vector_store = ChromaDBVectorStore(db_path=str(db_path))
    

    # Group chunks by file hash for per-file deduplication
    chunks_by_hash: dict[str, list[Chunk]] = {}
    for chunk in chunks:
        h = chunk.metadata.get("file_hash", "unknown")
        chunks_by_hash.setdefault(h, []).append(chunk)

    # Warn about same-content files in the current batch
    for hash_value, file_chunks in chunks_by_hash.items():
        sources = list(dict.fromkeys(c.metadata.get("source_file", "?") for c in file_chunks))
        if len(sources) > 1:
            logger.warning(f"Duplicate content detected across files (hash={hash_value[:8]}…): {sources} — only ingesting once.")

    new_chunks: list[Chunk] = []
    skipped_files = 0
    skipped_chunks = 0
    for hash_value, file_chunks in chunks_by_hash.items():
        existing = vector_store.collection.get(where={"file_hash": hash_value}, limit=1)
        if existing["ids"]:
            source = file_chunks[0].metadata.get("source_file", "?")
            logger.info(f"Skipping {source!r} — already in store (hash={hash_value[:8]}…)")
            skipped_files += 1
            skipped_chunks += len(file_chunks)
        else:
            new_chunks.extend(file_chunks)
    logger.info(
        f"Deduplication: {skipped_files} file(s) / {skipped_chunks} chunk(s) skipped, "
        f"{len(chunks_by_hash) - skipped_files} file(s) / {len(new_chunks)} chunk(s) to embed."
    )

    if not new_chunks:
        logger.info("All files already in store — nothing to embed.")
        return vector_store

    logger.info(f"Embedding {len(new_chunks)} new chunks with {embedding_model.model_name!r} …")
    embeddings = await embedding_model.get_embeddings([c.content for c in new_chunks])
    logger.info(f"Embedding matrix: shape={embeddings.shape}  dtype={embeddings.dtype}")
    await vector_store.insert_chunks(chunks=new_chunks, embedding=embeddings)
    logger.info(f"Done! Vector store written to {db_path}")
    return vector_store

In the Notebook, we need to pass the existing_hashes to the load_chunks function. Also the calculation can lead to a division by 0, which needs to be checked.

In [None]:
# Load documents from DATA_DIR and split them into chunks.
existing_hashes = get_existing_hashes()
chunks = load_chunks(max_files=None, existing_hashes=existing_hashes)
# Print a statistical summary and sampled content for visual inspection.
inspect_chunks(chunks)

# Print size distribution
char_lengths = [len(c.content) for c in chunks]
over_limit = sum(1 for n in char_lengths if n > 1024)
print(f"\nChunks total       : {len(chunks)}")
if char_lengths:
    print(f"Mean length (chars): {sum(char_lengths) // len(char_lengths)}")
    print(f"Over 1024-char limit (≈256 tok embedding limit): {over_limit} / {len(chunks)}")
else:
    print("All files already in store — no new chunks loaded.")
print(f"Over 1024-char limit (≈256 tok embedding limit): {over_limit} / {len(chunks)}")
print("\nSuccessfully loaded and chunked the documents!")

Helper to release the database, to prevent locks:

In [23]:
del vs

NameError: name 'vs' is not defined