# Basic Example

This tutorial demonstrates the core concepts of TableVault through a practical example: building a document processing pipeline with searchable embeddings.

You can find the Colab version of this tutorial at: [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](TODO_COLAB_LINK)

## Step 1: Install Dependencies

First, install TableVault.

In [None]:
# Install TableVault
!pip install tablevault

## Step 2: Setup ArangoDB

Install and run ArangoDB directly on the Colab VM with vector index support enabled.


In [None]:
# Install ArangoDB 3.12 with root password pre-configured via debconf
!sudo apt-get update -q
!sudo apt-get install -y curl gnupg
!sudo rm -f /etc/apt/sources.list.d/arangodb.list
!sudo rm -f /usr/share/keyrings/arangodb-3.12.gpg
!curl -fsSL https://download.arangodb.com/arangodb312/DEBIAN/Release.key | sudo gpg --dearmor -o /usr/share/keyrings/arangodb-3.12.gpg
!echo 'deb [signed-by=/usr/share/keyrings/arangodb-3.12.gpg] https://download.arangodb.com/arangodb312/DEBIAN/ /' | sudo tee /etc/apt/sources.list.d/arangodb.list
!printf 'arangodb3 arangodb3/password password rootpassword\narangodb3 arangodb3/password_again password rootpassword\n' | sudo debconf-set-selections
!sudo apt-get update -q
!sudo DEBIAN_FRONTEND=noninteractive apt-get install -y arangodb3

In [None]:
import subprocess
import time

# Stop any existing ArangoDB started by package post-install scripts
subprocess.run(["sudo", "pkill", "-f", "arangod"], check=False)
time.sleep(1)

# Ensure runtime/data directories exist and are writable by the arangodb user
subprocess.run(["sudo", "mkdir", "-p",
    "/var/log/arangodb3", "/var/lib/arangodb3", "/var/lib/arangodb3-apps", "/var/run/arangodb3"],
    check=True)
subprocess.run(["sudo", "chown", "-R", "arangodb:arangodb",
    "/var/log/arangodb3", "/var/lib/arangodb3", "/var/lib/arangodb3-apps", "/var/run/arangodb3"],
    check=True)

# Make sure the experimental vector index flag is present exactly once
config_path = "/etc/arangodb3/arangod.conf"
subprocess.run(["sudo", "sed", "-i",
    "/^[[:space:]]*experimental-vector-index[[:space:]]*=.*/d", config_path],
    check=True)
subprocess.run(["sudo", "sed", "-i",
    "/^[[]server[]]/a experimental-vector-index = true", config_path],
    check=True)

# Start arangod in the background as the arangodb user
log_path = "/tmp/arangod.log"
log_file = open(log_path, "w")
proc = subprocess.Popen(
    ["sudo", "-u", "arangodb", "arangod", "--config", config_path],
    stdout=log_file, stderr=log_file,
)

time.sleep(2)
if proc.poll() is not None:
    log_file.close()
    raise RuntimeError(
        f"arangod exited early with code {proc.returncode}. Check {log_path} for details."
    )

print("arangod started.")
print(f"Logs: {log_path}")

Verify ArangoDB is running and vector index creation works:

In [None]:
import time
from arango import ArangoClient

client = ArangoClient(hosts="http://localhost:8529")

for attempt in range(60):
    try:
        sys_db = client.db("_system", username="root", password="rootpassword")
        info = sys_db.version()
        version = info.get("version") if isinstance(info, dict) else info
        print(f"ArangoDB is ready: {version}")
        break
    except Exception:
        if attempt == 59:
            raise RuntimeError("ArangoDB did not become ready in time. Check /tmp/arangod.log.")
        time.sleep(1)
        print(f"Waiting for ArangoDB... ({attempt + 1}/60)")

# Explicitly validate vector-index capability
test_col = "__tv_colab_vector_check"
if sys_db.has_collection(test_col):
    sys_db.delete_collection(test_col)
col = sys_db.create_collection(test_col)
try:
    col.add_index({
        "type": "vector",
        "name": "vec_idx",
        "fields": ["embedding_4"],
        "params": {"metric": "cosine", "dimension": 4, "nLists": 1},
    })
    print("Vector index creation succeeded.")
finally:
    sys_db.delete_collection(test_col)

## Step 3: Initialize the Vault

Create a TableVault instance connected to your ArangoDB. The `process_name` identifies this run — all data written through this vault is attributed to the `document_pipeline` process, making it easy to trace where each item came from later.

In [None]:
from tablevault import Vault

# Create a new TableVault process
vault = Vault(
    user_id="tutorial_user",
    process_name="document_pipeline",
    arango_url="http://localhost:8529",
    arango_db="tutorial_db",
    new_arango_db=True,  # Start fresh
    arango_root_password="rootpassword"
)

print("Vault initialized successfully!")

TableVault organizes data into typed lists:

- **Document lists**: Store text content
- **Embedding lists**: Store vector embeddings
- **Record lists**: Store structured metadata

Each list stores items at sequential integer positions. Items across lists can be linked by position range to track lineage — for example, recording that embedding position 2 was derived from document positions 2–3.

In [None]:
# Create a document list for storing text chunks
vault.create_document_list("research_papers")

# Create an embedding list (using 384-dim for this example)
EMBEDDING_DIM = 384
vault.create_embedding_list("paper_embeddings", ndim=EMBEDDING_DIM)

# Create a record list for metadata
vault.create_record_list("paper_metadata", column_names=["title", "author", "chunk_id"])

print("Item lists created!")


We'll add sample documents and their embeddings, tracking the lineage between them. The `input_items` argument on `append_embedding` records which source positions the embedding was derived from, forming an explicit link that can be queried later.

In [None]:
# Sample document chunks
documents = [
    "Machine learning is a subset of artificial intelligence.",
    "Neural networks are inspired by biological neurons.",
    "Deep learning has revolutionized computer vision.",
    "Transformers have changed natural language processing.",
]

# Mock embedding function (replace with your actual model like sentence-transformers)
def get_embedding(text):
    import hashlib
    import random

    seed = int.from_bytes(hashlib.sha256(text.encode()).digest(), "big")
    rng = random.Random(seed)
    return [rng.random() for _ in range(EMBEDDING_DIM)]

print(f"Mock embedding dimension: {len(get_embedding('test'))}")


In [None]:
# Add documents and their embeddings with lineage tracking
for idx, doc in enumerate(documents):
    # Add document
    vault.append_document("research_papers", doc)

    # Generate and add embedding with lineage tracking
    embedding = get_embedding(doc)
    vault.append_embedding(
        "paper_embeddings",
        embedding,
        input_items={"research_papers": [idx, idx + 1]},  # Links to source document
        index_rebuild_count=max(0, len(documents) - 1),  # Force index build for small demo sets
    )

    # Add metadata
    vault.append_record("paper_metadata", {
        "title": f"Paper Section {idx + 1}",
        "author": "Tutorial Author",
        "chunk_id": idx
    })

    print(f"Added document {idx + 1}: {doc[:50]}...")

    # All writes for this item are complete — safe to stop or pause the process here
    vault.checkpoint_execution()

has_index = vault.has_vector_index(EMBEDDING_DIM)
print(f"\nVector index created: {has_index}")
if not has_index:
    print("Vector index was not created; approximate search may be unavailable on this ArangoDB setup.")
print("All documents added with lineage tracking!")

`vault.checkpoint_execution()` marks the end of each loop iteration as a safe point for stopping or pausing. TableVault uses these markers to coordinate with other processes: if an external request to pause or stop this pipeline has been issued, it will only take effect at a checkpoint — never in the middle of a write or a pending API call. This also works the other direction: a paused process will only resume at a checkpoint, keeping the pipeline state consistent. Without checkpoints, a stop or pause request would be deferred indefinitely; with them, the pipeline can be interrupted or resumed cleanly between items.

Each item list can have a description — a short text and optional embedding that annotates what the list contains. Descriptions serve two purposes: they make lists self-documenting, and they act as a semantic filter when querying. In Step 9 you will see how `description_text` narrows a search to only the lists relevant to your query.

In [None]:
# Add semantic descriptions for queryability
vault.create_description(
    "research_papers",
    description="Collection of machine learning research paper excerpts",
    embedding=get_embedding("machine learning research papers")
)

vault.create_description(
    "paper_embeddings",
    description="Vector embeddings of research paper text chunks",
    embedding=get_embedding("document embeddings vectors")
)

print("Descriptions added!")

## Step 7: Query Content

Now let's query the stored content.

In [None]:
# Get all documents
all_docs = vault.query_item_content("research_papers")
print("All documents:")
for i, doc in enumerate(all_docs):
    print(f"  [{i}]: {doc}")

In [None]:
# Get specific document by index
first_doc = vault.query_item_content("research_papers", index=0)
print(f"First document: {first_doc}")

In [None]:
# Get item metadata
metadata = vault.query_item_list("research_papers")
print(f"Document list info: {metadata}")

Lineage lets you trace exactly which source data produced each derived item. This is useful for debugging data quality issues, reproducing results, and auditing how your pipeline transformed data over time. You can traverse in either direction — from a derived item back to its sources, or from a source forward to everything derived from it.

In [None]:
# Find what the embeddings were derived from
parents = vault.query_item_parent("paper_embeddings")
print(f"Embedding parents: {parents}")

In [None]:
# Find what was derived from the documents
children = vault.query_item_child("research_papers")
print(f"Document children: {children}")

In [None]:
# Get specific range lineage
first_embedding_source = vault.query_item_parent(
    "paper_embeddings",
    start_position=0,
    end_position=1
)
print(f"First embedding came from: {first_embedding_source}")

TableVault supports both vector similarity search over embeddings and full-text search over documents. Both query types accept optional filters to narrow the search scope: `description_text` restricts results to lists whose description matches, and `code_text` restricts to lists created by processes whose source code contains the given string.

In [None]:
# Search by embedding similarity
query_text = "artificial intelligence and deep learning"
query_embedding = get_embedding(query_text)

# Find similar embeddings
similar = vault.query_embedding_list(
    embedding=query_embedding,
    use_approx=False,
)
print(f"Similar embeddings: {similar}")

In [None]:
# Search documents by text
results = vault.query_document_list(
    document_text="neural networks"
)
print(f"Documents matching 'neural networks': {results}")

Combining these filters is especially useful in large vaults with many lists: you can target exactly the data that is semantically relevant and was produced by the right pipeline stage.

In [None]:
# Filter embedding search by description text
# Only searches within lists whose description contains "document embeddings"
similar_filtered = vault.query_embedding_list(
    embedding=query_embedding,
    description_text="document embeddings",
    use_approx=False,
)
print(f"Embeddings filtered by description: {similar_filtered}")

In [None]:
# Filter document search by description text and code text
# description_text: restricts to lists whose description mentions "research paper"
# code_text: restricts to lists created by processes whose code contains "append_document"
results_filtered = vault.query_document_list(
    document_text="neural networks",
    description_text="research paper",
    code_text="append_document",
)
print(f"Documents filtered by description and code: {results_filtered}")

Every item in TableVault is attributed to the process that created it. Process queries let you audit the full output of a given pipeline run, or find which process was responsible for a particular item — useful when you have multiple pipelines writing to the same vault.

In [None]:
# Find which process created these items
creation_process = vault.query_item_creation_process("research_papers")
print(f"Created by process: {creation_process}")

In [None]:
# Find all items created in this process
process_items = vault.query_process_item("document_pipeline")
print(f"Items in process: {process_items}")