In [None]:
import json
from pathlib import Path

import dotenv

from open_deep_researcher.retriever.local import local_search, process_documents, search_local_documents

dotenv.load_dotenv()

## Set Up Document Directory

First, let's set up a directory with some sample documents to process. You can customize this to use your own documents.


In [None]:
# Create a directory for sample documents
doc_dir = Path("./tmp/sample_docs")
doc_dir.mkdir(exist_ok=True, parents=True)

# Create some sample documents
sample_docs = [
    (
        "ai_intro.txt",
        "Artificial intelligence (AI) is intelligence demonstrated by machines. Modern AI systems can learn from data and make predictions.",
    ),
    (
        "python_basics.md",
        "# Python Basics\n\nPython is a high-level programming language. It's known for readability and simplicity.",
    ),
    (
        "data_science.txt",
        "Data science combines domain expertise, programming skills, and statistical knowledge to extract insights from data.",
    ),
]

# Write the sample documents to the directory
for filename, content in sample_docs:
    file_path = doc_dir / filename
    with open(file_path, "w") as f:
        f.write(content)
    print(f"Created: {file_path}")

## Configure Process Documents Parameters

Now, let's set up the parameters for the `process_documents` function.


In [None]:
# Set up vector store path
vector_store_path = Path("./tmp/vector_store")
vector_store_path.mkdir(exist_ok=True, parents=True)

# Define parameters for process_documents
params = {
    "local_document_path": str(doc_dir),
    "vector_store_path": str(vector_store_path),
    "embedding_provider": "openai",  # Currently only OpenAI embeddings are supported
    "embedding_model": "text-embedding-3-small",
    "collection_name": "sample_collection",  # Optional: custom collection name
}

# Display the parameters
print("Processing documents with the following parameters:")
for key, value in params.items():
    print(f"  {key}: {value}")

## Process Documents

Now let's call the `process_documents` function to actually process the documents and create embeddings.


In [None]:
# Define an async function to run the process_documents function
async def run_process_documents():
    print("Starting document processing...")
    vector_store = await process_documents(**params)
    print("Document processing completed.")
    return vector_store


# Run the async function
vector_store = await run_process_documents()

## Examine the Vector Store

Let's look at what was created in the vector store.


In [None]:
# Check the files in the vector store directory
print("Files in the vector store directory:")
for file_path in vector_store_path.glob("*"):
    print(f"  {file_path.name}")

# Examine the metadata file if it exists
metadata_path = vector_store_path / f"doc_metadata_{params['collection_name']}.json"
if metadata_path.exists():
    with open(metadata_path) as f:
        metadata = json.load(f)
    print("\nDocument metadata:")
    for doc_path, doc_hash in metadata.items():
        print(f"  {doc_path}: {doc_hash}")

## Test Incremental Processing

One of the key features of `process_documents` is that it tracks document changes and only reprocesses documents that have changed. Let's test this by modifying one document and adding a new one.


In [None]:
# Modify an existing document
modified_file = doc_dir / "ai_intro.txt"
original_content = modified_file.read_text()
new_content = (
    original_content
    + "\n\nAI systems are used in many applications including natural language processing, computer vision, and robotics."
)
modified_file.write_text(new_content)
print(f"Modified: {modified_file}")


# Add a new document
new_file = doc_dir / "machine_learning.txt"
new_file.write_text(
    "Machine learning is a subset of AI that enables systems to learn from data without being explicitly programmed."
)
print(f"Created: {new_file}")

In [None]:
# Run process_documents again
print("Running process_documents again to process changed files...")
updated_vector_store = await run_process_documents()

# Check the updated metadata
if metadata_path.exists():
    with open(metadata_path) as f:
        updated_metadata = json.load(f)
    print("\nUpdated document metadata:")
    for doc_path, doc_hash in updated_metadata.items():
        print(f"  {doc_path}: {doc_hash}")

## Working with the Vector Store

The `process_documents` function returns a Chroma vector store instance. We can use this directly to perform operations on the vector store.


In [None]:
# Check if we have a valid vector store
if updated_vector_store is not None:
    # Get the vector store collection
    collection = updated_vector_store._collection

    # Get the count of documents in the vector store
    count = collection.count()
    print(f"Number of document chunks in vector store: {count}")

    # Get some information about the collection
    print(f"Collection name: {collection.name}")
else:
    print("Vector store not available.")

## Simple Query Example

Let's try a simple query against the vector store to verify that it's working.


In [None]:
# Perform a simple similarity search
if updated_vector_store is not None:
    query = "What is artificial intelligence?"
    results = updated_vector_store.similarity_search_with_relevance_scores(query, k=4)

    print(f"Query: '{query}'")
    print("Results:")
    for i, (doc, score) in enumerate(results, 1):
        print(f"\nResult {i} (Score: {score:.4f})")
        print(f"Content: {doc.page_content}")
        print(f"Source: {doc.metadata.get('source', 'Unknown')}")

In [None]:
# Define an async function to run local_search
async def run_search_local_docments(query, vector_store_path, collection_name="sample_collection", top_k=2):
    results = await search_local_documents(
        query=query, vector_store_path=str(vector_store_path), collection_name=collection_name, top_k=top_k
    )
    return results


async def run_local_search(query, vector_store_path, collection_name="sample_collection", top_k=2):
    queries = [query]  # local_search expects a list of queries
    results = await local_search(
        search_queries=queries, vector_store_path=str(vector_store_path), collection_name=collection_name, top_k=top_k
    )
    return results


# Define a test query
test_query = "What is artificial intelligence?"
print(f"Query: '{test_query}'")


# Define a test query
test_query = "What is artificial intelligence?"
print(f"Query: '{test_query}'")

# Get results using local_search function
print("\n=== RESULTS USING search_local_documents FUNCTION ===")
local_search_results = await run_search_local_docments(test_query, vector_store_path)
for i, doc in enumerate(local_search_results[0]["results"], 1):
    print(f"\nResult {i} (Score: {doc['score']:.4f})")
    print(f"Content: {doc['content']}")
    print(f"Source: {doc.get('url', 'Unknown')}")

# Get results using direct Chroma similarity search
print("\n=== RESULTS USING DIRECT CHROMA QUERY ===")
if updated_vector_store is not None:
    chroma_results = updated_vector_store.similarity_search_with_relevance_scores(test_query, k=2)

    for i, (doc, score) in enumerate(chroma_results, 1):
        print(f"\nResult {i} (Score: {score:.4f})")
        print(f"Content: {doc.page_content}")
        print(f"Source: {doc.metadata.get('source', 'Unknown')}")

# Get results using local_search function
print("\n=== RESULTS USING local_search FUNCTION ===")
local_search_results = await run_local_search(test_query, vector_store_path)
print(local_search_results)

## Cleanup (Optional)

If you want to clean up the directories created in this notebook, you can run the following cell.


In [None]:
# Uncomment these lines if you want to clean up the directories
import shutil

shutil.rmtree("./tmp", ignore_errors=True)
print("Cleaned up sample directories.")