# RAGdoll Key Modules Demo

Hands-on walkthrough for ingestion, chunking, embeddings, storage layers, and orchestration. Every example relies on the sample assets under `tests/test_data`, so the notebook can run offline.

> **Note:** 
- Set `USE_OPENAI = True` at the top to use real OpenAI embeddings, `False` for fake embeddings.
- Cells 7 and 9 call OpenAI's GPT endpoints via `get_llm_caller`. Export `OPENAI_API_KEY` (or add it to `.env`) before running them.
- **SSL Certificate Issues**: If you encounter `SSL: CERTIFICATE_VERIFY_FAILED` errors (common in corporate networks), see the SSL workaround cell below to disable verification for development purposes.

## What you'll see

1. **Ingestion** – `DocumentLoaderService` from [`docs/ingestion.md`](../docs/ingestion.md).
2. **Chunking** – `ragdoll.chunkers` helpers from [`docs/chunking.md`](../docs/chunking.md).
3. **Embeddings** – provider factory from [`docs/embeddings.md`](../docs/embeddings.md) controlled by USE_OPENAI flag.
4. **Vector stores** – `vector_store_from_config` customization from [`docs/vector_stores.md`](../docs/vector_stores.md).
5. **Graph stores** – `get_graph_store` JSON persistence from [`docs/graph_stores.md`](../docs/graph_stores.md).
6. **Entity extraction** – `EntityExtractionService` for building knowledge graphs.
7. **LLMs** – `get_llm_caller`/`call_llm_sync` bridge described in [`docs/llm_integration.md`](../docs/llm_integration.md) hitting your real OpenAI model.
8. **Pipeline** – `IngestionPipeline` snapshot from [`docs/architecture.md`](../docs/architecture.md).
9. **Retrieval** – New modular retrieval system with `VectorRetriever`, `GraphRetriever`, and `HybridRetriever` from [`docs/retrieval.md`](../docs/retrieval.md).
10. **Advanced Patterns** – Hybrid retrieval modes, graph traversal strategies, and async retrieval.

Each cell builds on the previous ones so you can treat this as a scratchpad for experimenting with new loaders or configuration overrides.

In [8]:
from pathlib import Path
from pprint import pprint
import shutil
import time

from langchain_core.documents import Document

from ragdoll import Ragdoll
from ragdoll.app_config import bootstrap_app
from ragdoll.ingestion import DocumentLoaderService
from ragdoll.chunkers import get_text_splitter, split_documents
from ragdoll.embeddings import get_embedding_model
from ragdoll.vector_stores import vector_store_from_config
from ragdoll.config.base_config import VectorStoreConfig
from ragdoll.graph_stores import get_graph_store
from ragdoll.entity_extraction import EntityExtractionService
from ragdoll.entity_extraction.models import Graph, GraphNode, GraphEdge
from ragdoll.entity_extraction.graph_persistence import GraphPersistenceService
from ragdoll.retrieval import VectorRetriever, GraphRetriever, HybridRetriever
from ragdoll.llms import get_llm_caller
from ragdoll.llms.callers import call_llm_sync
from ragdoll.pipeline import ingest_documents, IngestionPipeline, IngestionOptions
from dotenv import load_dotenv

In [9]:
load_dotenv(override=True)

DATA_DIR = Path('../tests/test_data').resolve()
STATE_DIR = Path('demo_state').resolve()
STATE_DIR.mkdir(exist_ok=True)

SAMPLE_TXT = DATA_DIR / 'test_txt.txt'
#SAMPLE_TXT = DATA_DIR / '*'

# Set to True to use real OpenAI embeddings, False for fake embeddings
USE_OPENAI = True

app_config = bootstrap_app(
    overrides={
        'monitor': {'enabled': False, 'collect_metrics': False},
    }
)


def normalize_documents(raw_docs):
    docs = []
    for entry in raw_docs:
        if isinstance(entry, Document):
            docs.append(entry)
        elif isinstance(entry, dict):
            docs.append(
                Document(
                    page_content=str(entry.get('page_content', '')),
                    metadata=entry.get('metadata', {}) or {},
                )
            )
        else:
            docs.append(Document(page_content=str(entry), metadata={}))
    return docs


def reset_subdir(name: str) -> Path:
    path = STATE_DIR / name
    if path.exists():
        for attempt in range(5):
            try:
                shutil.rmtree(path)
                break
            except PermissionError:
                time.sleep(0.5)
        else:
            timestamped = STATE_DIR / f"{name}_{int(time.time())}"
            timestamped.mkdir(parents=True, exist_ok=True)
            print(f"Warning: {path} was locked, using {timestamped} instead.")
            return timestamped
    path.mkdir(parents=True, exist_ok=True)
    return path

In [17]:
import os
k = os.getenv("OPENAI_API_KEY")
print("Key len:", len(k), "prefix:", k[:12], "suffix:", k[-6:])

Key len: 164 prefix: sk-proj-Ml-_ suffix: NUDc8A


## 1. Load sample data
`DocumentLoaderService` fans out across the loader registry defined in `ragdoll/config/default_config.yaml`. We point it at the lightweight TXT fixture so the demo does not need optional dependencies.


In [18]:
loader = DocumentLoaderService(
    app_config=app_config,
    use_cache=False,
    collect_metrics=False,
)

raw_documents = loader.ingest_documents([str(SAMPLE_TXT)])
documents = normalize_documents(raw_documents)

print(f"Loaded {len(documents)} document(s) from {SAMPLE_TXT.name}")
print('Metadata sample:')
pprint(documents[0].metadata)
print('Preview:')
print(documents[0].page_content[:400])


Loaded 1 document(s) from test_txt.txt
Metadata sample:
{'source': 'C:\\dev\\RAGdoll\\tests\\test_data\\test_txt.txt'}
Preview:
Rag graph, or Retrieval-Augmented Generation with graph-based knowledge representation, enhances traditional RAG systems by structuring retrieved information into a dynamic knowledge graph rather than treating it as flat text chunks. At its core, it begins with a user query that triggers a retrieval step from a vector database or search index. Instead of directly feeding retrieved documents into a


## 2. Chunk documents
`ragdoll.chunkers.get_text_splitter` mirrors the strategies in [`docs/chunking.md`](../docs/chunking.md). Reusing the splitter instance keeps experiments consistent when you tweak chunk sizes/overlap.


In [19]:
splitter = get_text_splitter(
    splitter_type='recursive',
    chunk_size=250,
    chunk_overlap=40,
    app_config=app_config,
)
chunks = split_documents(documents, text_splitter=splitter)

print(f"Created {len(chunks)} chunk(s)")
for idx, chunk in enumerate(chunks[:3], start=1):
    preview = chunk.page_content[:180].replace('\n', ' ')
    print(f"Chunk {idx} metadata: {chunk.metadata}")
    print(preview)
    print('---')


Created 20 chunk(s)
Chunk 1 metadata: {'source': 'C:\\dev\\RAGdoll\\tests\\test_data\\test_txt.txt'}
Rag graph, or Retrieval-Augmented Generation with graph-based knowledge representation, enhances traditional RAG systems by structuring retrieved information into a dynamic knowled
---
Chunk 2 metadata: {'source': 'C:\\dev\\RAGdoll\\tests\\test_data\\test_txt.txt'}
it as flat text chunks. At its core, it begins with a user query that triggers a retrieval step from a vector database or search index. Instead of directly feeding retrieved docume
---
Chunk 3 metadata: {'source': 'C:\\dev\\RAGdoll\\tests\\test_data\\test_txt.txt'}
the system parses these documents to extract entities (such as people, organizations, or concepts) and relationships (like "works at" or "caused by") using named entity recognition
---


## 3. Create embeddings
`ragdoll.embeddings.get_embedding_model` instantiates providers dynamically. Passing `provider="fake"` gives deterministic vectors without hitting OpenAI/HuggingFace, but the rest of the flow matches production usage.


In [20]:
#print(os.getenv("OPENAI_API_KEY"))

In [None]:
embedding_inputs = [chunk.page_content for chunk in chunks[:3]]
if not embedding_inputs:
    embedding_inputs = [documents[0].page_content]

embeddings = (
    get_embedding_model() if USE_OPENAI 
    else get_embedding_model(provider='fake', size=256)
)
vectors = embeddings.embed_documents(embedding_inputs)

print(f"Generated {len(vectors)} embedding vector(s) with dimension {len(vectors[0])}")
print('First vector slice:', vectors[0][:8])

## 4. Build a vector store
`vector_store_from_config` consumes a `VectorStoreConfig`, so you can swap FAISS/Chroma/etc. on demand. This cell provisions a Chroma collection under `demo_state` and runs a quick similarity query.


In [None]:
core_store_dir = reset_subdir('chroma_core_demo')
vector_config = VectorStoreConfig(
    enabled=True,
    store_type='chroma',
    params={
        'collection_name': 'ragdoll_core_demo',
        'persist_directory': str(core_store_dir),
    },
)

demo_vector_store = vector_store_from_config(
    vector_config,
    embedding=embeddings,
)

demo_vector_store.add_documents(chunks)
question = 'What content lives in the txt sample?'
results = demo_vector_store.similarity_search(question, k=2)
for idx, doc in enumerate(results, start=1):
    snippet = doc.page_content[:160].replace('\n', ' ')
    print(f"Result {idx} (source={doc.metadata.get('source')}) -> {snippet}")


## 5. Run entity extraction + persist a graph
`EntityExtractionService` runs spaCy over the chunks and hands everything to `GraphPersistenceService`. We disable re-chunking, dump the output to JSON under `demo_state/`, and render a quick visualization so you can inspect the extracted entities before wiring them into a retriever.

In [None]:
import os

# Create directories
graph_store_dir = reset_subdir('graph_demo')
graph_image_path = graph_store_dir / 'graph_demo.png'
graph_store_file = graph_store_dir / 'graph.pkl'

# Initialize LLM
if not os.getenv('OPENAI_API_KEY'):
    print("OPENAI_API_KEY not set, using fallback LLM for demo.")
    class ExampleFallbackLLM:
        async def call(self, prompt: str) -> str:
            return "Fallback response (no API key for entity extraction)"
    llm_caller = ExampleFallbackLLM()
else:
    llm_caller = get_llm_caller(app_config=app_config)
    if llm_caller is None:
        print("Unable to initialize LLM, using fallback.")
        llm_caller = ExampleFallbackLLM()
    else:
        print("LLM initialized for entity extraction.")

# Configure ingestion options - use the shared embeddings from earlier
options = IngestionOptions(
    batch_size=5,
    extract_entities=True,
    chunking_options={'chunk_size': 1000, 'chunk_overlap': 200},
    vector_store_options={
        "store_type": "chroma",
        "params": {
            "collection_name": "graph_demo",
            "persist_directory": str(graph_store_dir / "vector"),
        },
    },
    graph_store_options={
        "store_type": "networkx",
        "output_file": str(graph_store_file),
    },
    llm_caller=llm_caller,
    entity_extraction_options={
        "entity_types": ["Person", "Organization", "Location"],
    },
)


In [None]:
# Run ingestion using a custom pipeline with shared embeddings
from ragdoll.vector_stores import vector_store_from_config

# Create vector store for graph demo with shared embeddings
graph_vector_store = vector_store_from_config(
    VectorStoreConfig(
        enabled=True,
        store_type="chroma",
        params={
            "collection_name": "graph_demo",
            "persist_directory": str(graph_store_dir / "vector"),
        },
    ),
    embedding=embeddings,  # Use the shared embeddings instance
)

# Create custom pipeline with shared embeddings
graph_pipeline = IngestionPipeline(
    app_config=app_config,
    content_extraction_service=DocumentLoaderService(
        app_config=app_config,
        use_cache=False,
        collect_metrics=False,
    ),
    embedding_model=embeddings,  # Use shared embeddings
    vector_store=graph_vector_store,
    options=options,
)

sources = [str(SAMPLE_TXT)]
stats = await graph_pipeline.ingest(sources)
graph = getattr(graph_pipeline, "last_graph", None)
graph_store = graph_pipeline.get_graph_store()

# Print results
print(f"✅ Ingestion complete!")
print(f"Documents processed: {stats.get('documents_processed')}")
print(f"Chunks created: {stats.get('chunks_created')}")
print(f"Entities extracted: {stats.get('entities_extracted')}")
print(f"Relationships extracted: {stats.get('relationships_extracted')}")
print(f"Vector entries added: {stats.get('vector_entries_added')}")
print(f"Graph entries added: {stats.get('graph_entries_added')}")

if stats.get("errors"):
    print(f"⚠️ Warnings/Errors:")
    for error in stats["errors"]:
        print(f"  - {error}")

# Visualize the graph
try:
    import matplotlib.pyplot as plt
    import networkx as nx

    if graph and hasattr(graph, 'nodes') and graph.nodes:
        viz_graph = nx.Graph()
        for node in graph.nodes:
            viz_graph.add_node(node.id, label=node.name, type=node.type)
        for edge in graph.edges:
            viz_graph.add_edge(edge.source, edge.target, relationship=edge.type)

        if viz_graph.number_of_nodes() > 0:
            plt.figure(figsize=(6, 5))
            positions = nx.spring_layout(viz_graph, seed=42)
            labels = {node_id: viz_graph.nodes[node_id].get('label', node_id) for node_id in viz_graph.nodes}
            nx.draw_networkx_nodes(viz_graph, positions, node_color='#c7d2fe', linewidths=1)
            nx.draw_networkx_edges(viz_graph, positions, edge_color='#4c51bf')
            nx.draw_networkx_labels(viz_graph, positions, labels=labels, font_size=9)
            plt.axis('off')
            plt.tight_layout()
            plt.savefig(graph_image_path, dpi=160)
            plt.show()
            print(f'Saved visualization to {graph_image_path}')
        else:
            print('Graph has no nodes to visualize.')
    else:
        print('No graph available to visualize.')
except Exception as exc:
    print(f'Skipped visualization: {exc}')


## 6. Query vector + graph context
`GraphPersistenceService` reloads the extracted graph into a LangChain retriever so we can compare vector hits against graph nodes for the same question. This mirrors the hybrid pattern we feed into the orchestration demo later on.

In [None]:
graph_persistence = GraphPersistenceService(
    output_format='custom_graph_object',
    retriever_backend='simple',
    retriever_config={'top_k': 3, 'include_edges': True},
)

_ = graph_persistence.save(graph)
graph_retriever = graph_persistence.create_retriever()

hybrid_question = 'Which people or organizations are mentioned in the txt sample?'
vector_hits = demo_vector_store.similarity_search(hybrid_question, k=2)
graph_hits = graph_retriever.get_relevant_documents(hybrid_question)

print(f"Vector store returned {len(vector_hits)} document(s)")
for idx, doc in enumerate(vector_hits, start=1):
    snippet = doc.page_content[:200].replace('\n', ' ')
    print(f"- Vector hit {idx} (source={doc.metadata.get('source')}): {snippet}")

print(f"Graph retriever returned {len(graph_hits)} node(s)")
for doc in graph_hits:
    print(
        f"- Node {doc.metadata.get('node_id')} ({doc.metadata.get('node_type')}): {doc.page_content}"
    )

## 7. Wire up an LLM caller
`get_llm_caller` now instantiates the OpenAI chat model defined in `ragdoll/config/default_config.yaml` (defaults to `gpt-4o-mini`). Make sure `OPENAI_API_KEY` is available before running the next cell.


In [None]:
import os

if not os.getenv('OPENAI_API_KEY'):
    raise EnvironmentError('Set OPENAI_API_KEY before calling the real OpenAI demo cell.')

openai_llm_caller = get_llm_caller(app_config=app_config)

sample_text = chunks[0].page_content if chunks else documents[0].page_content
prompt = (
    'Summarize the following text sample in one sentence. Mention what the document is about and highlight any key people, organizations, or actions.'
    f"{sample_text[:2048]}"
)
print('Sample excerpt:', sample_text[:360].replace('\n', ' '))
llm_reply = call_llm_sync(openai_llm_caller, prompt)
print('OpenAI response:', llm_reply)


## 8. Run the ingestion pipeline (async)
`IngestionPipeline` stitches together the loader, chunker, embeddings, vector store, and optional graph/entity stages. We disable entity extraction to keep the run lightweight and await the coroutine directly inside the notebook.


In [None]:
pipeline_store_dir = reset_subdir('chroma_pipeline_demo')
pipeline_vector_config = VectorStoreConfig(
    enabled=True,
    store_type='chroma',
    params={
        'collection_name': 'ragdoll_pipeline_demo',
        'persist_directory': str(pipeline_store_dir),
    },
)
pipeline_vector_store = vector_store_from_config(
    pipeline_vector_config,
    embedding=embeddings,
)

pipeline = IngestionPipeline(
    app_config=app_config,
    content_extraction_service=DocumentLoaderService(
        app_config=app_config,
        use_cache=False,
        collect_metrics=False,
    ),
    embedding_model=embeddings,
    vector_store=pipeline_vector_store,
    options=IngestionOptions(
        batch_size=2,
        extract_entities=False,
        skip_graph_store=True,
        chunking_options={'chunk_size': 300, 'chunk_overlap': 60, 'splitter_type': 'recursive'},
    ),
)

pipeline_stats = await pipeline.ingest([str(SAMPLE_TXT)])
pipeline_stats


## 9. Use the new retrieval module
The refactored `ragdoll.retrieval` module provides clean separation between vector, graph, and hybrid retrieval strategies. You can use `VectorRetriever`, `GraphRetriever`, or `HybridRetriever` directly with LangChain compatibility.

In [None]:
from ragdoll.retrieval import VectorRetriever, GraphRetriever, HybridRetriever

# Setup vector retriever with the demo vector store
vector_retriever = VectorRetriever(
    vector_store=demo_vector_store,
    top_k=3,
    search_type="similarity"
)

# Query about graph RAG concepts
query = "How does graph RAG handle entity extraction and relationships?"

print("=== Vector Retrieval ===")
vector_results = vector_retriever.get_relevant_documents(query)
for idx, doc in enumerate(vector_results, start=1):
    snippet = doc.page_content[:200].replace('\n', ' ')
    print(f"Result {idx}: {snippet}...")
    print(f"  Score: {doc.metadata.get('relevance_score', 'N/A')}\n")

# If we have a graph store from earlier, demonstrate graph retrieval
if 'graph_store' in globals() and graph_store is not None:
    print("\n=== Graph Retrieval ===")
    graph_retriever = GraphRetriever(
        graph_store=graph_store,
        top_k=5,
        max_hops=2,
        traversal_strategy="bfs"
    )
    
    graph_results = graph_retriever.get_relevant_documents(query)
    for idx, doc in enumerate(graph_results, start=1):
        node_id = doc.metadata.get('node_id', 'unknown')
        node_type = doc.metadata.get('node_type', 'unknown')
        hop_distance = doc.metadata.get('hop_distance', 0)
        print(f"Node {idx}: {node_type} '{node_id}' (hop distance: {hop_distance})")
        print(f"  Content: {doc.page_content[:150]}...\n")
    
    # Demonstrate hybrid retrieval combining both
    print("\n=== Hybrid Retrieval (Vector + Graph) ===")
    hybrid_retriever = HybridRetriever(
        vector_store=demo_vector_store,
        graph_store=graph_store,
        vector_top_k=3,
        graph_top_k=3,
        graph_max_hops=1,
        mode="concat"  # Can also be "rerank", "weighted", or "expand"
    )
    
    hybrid_results = hybrid_retriever.get_relevant_documents(query)
    print(f"Retrieved {len(hybrid_results)} total documents from hybrid search")
    for idx, doc in enumerate(hybrid_results, start=1):
        source = doc.metadata.get('source', 'unknown')
        snippet = doc.page_content[:120].replace('\n', ' ')
        print(f"{idx}. [{source}] {snippet}...")
else:
    print("\n(Graph store not available - run Section 5 to create it)")

# Demonstrate different retrieval strategies
print("\n=== MMR Search (Maximal Marginal Relevance) ===")
mmr_retriever = VectorRetriever(
    vector_store=demo_vector_store,
    top_k=3,
    search_type="mmr",
    search_kwargs={"fetch_k": 10, "lambda_mult": 0.5}
)
mmr_results = mmr_retriever.get_relevant_documents("What is multi-hop reasoning in RAG systems?")
for idx, doc in enumerate(mmr_results, start=1):
    print(f"{idx}. {doc.page_content[:150].replace(chr(10), ' ')}...")

## 10. Advanced Retrieval Patterns

The new retrieval module supports various advanced patterns:

### Hybrid Retrieval Modes
- **concat**: Simple concatenation of vector and graph results
- **rerank**: Rerank combined results by relevance score
- **weighted**: Weighted combination (adjust vector_weight/graph_weight)
- **expand**: Use vector results as seeds for graph expansion

### Graph Traversal Strategies
- **BFS (Breadth-First)**: Explores neighbors level by level (default)
- **DFS (Depth-First)**: Follows paths deeply before backtracking

### Search Types
- **similarity**: Standard vector similarity search
- **mmr**: Maximal Marginal Relevance for diverse results
- **similarity_score_threshold**: Filter by minimum similarity score

In [None]:
# Example: Weighted hybrid retrieval for balanced results
if 'graph_store' in globals() and graph_store is not None:
    weighted_retriever = HybridRetriever(
        vector_store=demo_vector_store,
        graph_store=graph_store,
        vector_top_k=5,
        graph_top_k=5,
        mode="weighted",
        vector_weight=0.6,  # Favor vector results slightly
        graph_weight=0.4
    )
    
    query = "Explain how graph RAG systems handle iterative refinement"
    weighted_results = weighted_retriever.get_relevant_documents(query)
    
    print(f"=== Weighted Hybrid Results (vector=0.6, graph=0.4) ===")
    print(f"Retrieved {len(weighted_results)} documents\n")
    
    for idx, doc in enumerate(weighted_results[:5], start=1):
        score = doc.metadata.get('relevance_score', 0)
        source_type = 'graph' if 'node_id' in doc.metadata else 'vector'
        snippet = doc.page_content[:150].replace('\n', ' ')
        print(f"{idx}. [{source_type}] Score: {score:.3f}")
        print(f"   {snippet}...\n")
    
    # Example: Graph expansion mode - use vector hits to seed graph traversal
    expand_retriever = HybridRetriever(
        vector_store=demo_vector_store,
        graph_store=graph_store,
        vector_top_k=2,
        graph_max_hops=2,
        mode="expand"
    )
    
    expand_query = "What are graph neural networks used for in RAG?"
    expand_results = expand_retriever.get_relevant_documents(expand_query)
    
    print(f"\n=== Expand Mode (vector seeds + graph traversal) ===")
    print(f"Retrieved {len(expand_results)} documents from graph expansion\n")
    
    for idx, doc in enumerate(expand_results[:3], start=1):
        if 'node_id' in doc.metadata:
            node_id = doc.metadata['node_id']
            hop = doc.metadata.get('hop_distance', 0)
            print(f"{idx}. Node: {node_id} (hop: {hop})")
            print(f"   {doc.page_content[:120]}...\n")
else:
    print("Graph store not available. Run Section 5 to enable graph retrieval examples.")

In [None]:
# Example: Async retrieval for concurrent queries
import asyncio

async def demo_async_retrieval():
    """Demonstrate async retrieval capabilities."""
    queries = [
        "What is entity extraction?",
        "How do graph algorithms help RAG?",
        "Explain knowledge graph construction"
    ]
    
    # Create retrievers
    vector_ret = VectorRetriever(vector_store=demo_vector_store, top_k=2)
    
    print("=== Async Concurrent Retrieval ===")
    # Retrieve all queries concurrently
    results = await asyncio.gather(*[
        vector_ret.aget_relevant_documents(q) for q in queries
    ])
    
    for query, docs in zip(queries, results):
        print(f"\nQuery: {query}")
        print(f"  Found {len(docs)} documents")
        if docs:
            print(f"  Top result: {docs[0].page_content[:100].replace(chr(10), ' ')}...")

# Run async demo
await demo_async_retrieval()

# Get retriever statistics
print("\n=== Retriever Statistics ===")
vector_stats = vector_retriever.get_stats()
print(f"Vector Retriever: {vector_stats}")

if 'graph_retriever' in globals():
    graph_stats = graph_retriever.get_stats()
    print(f"Graph Retriever: {graph_stats}")

In [None]:
result