# Vector Database for Production RAG Systems

This notebook demonstrates how to build and manage vector databases for production RAG applications.

## Learning Objectives
- Understand vector database architectures
- Learn to manage multiple vector database backends
- Implement production-ready vector operations
- Handle large-scale document indexing
- Monitor and maintain vector databases


In [None]:
# Import required libraries
import sys
import os
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import time
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Add src to path
sys.path.append('../src')

# Import our vector database manager
from vector_db.vector_manager import VectorDatabaseManager, create_vector_database
from data.preprocess_data import TextPreprocessor
from config import DATA_DIR

print("Libraries imported successfully!")


## 1. Vector Database Backends Comparison

Let's compare different vector database backends for production use.


In [None]:
# Load sample data
chunks_path = DATA_DIR / "processed" / "all_chunks.json"
if chunks_path.exists():
    with open(chunks_path, 'r') as f:
        chunks_data = json.load(f)
    print(f"Loaded {len(chunks_data)} chunks from processed data")
else:
    print("No processed chunks found. Please run previous notebooks first.")
    chunks_data = []


In [None]:
# Prepare sample documents for testing
sample_docs = []
for i, chunk in enumerate(chunks_data[:100]):  # Use first 100 chunks
    sample_docs.append({
        "id": f"chunk_{i}",
        "text": chunk.get("text", ""),
        "metadata": {
            "source": chunk.get("source", "unknown"),
            "chunk_id": i,
            "length": len(chunk.get("text", "")),
            "timestamp": datetime.now().isoformat()
        }
    })

print(f"Prepared {len(sample_docs)} sample documents")
print(f"Sample document: {sample_docs[0]['text'][:100]}...")


## 2. ChromaDB - Local Vector Database

ChromaDB is excellent for local development and small to medium-scale applications.


In [None]:
# Initialize ChromaDB
print("Initializing ChromaDB...")
chroma_manager = VectorDatabaseManager(
    backend="chromadb",
    collection_name="production_docs",
    embedding_model="BAAI/bge-base-en-v1.5"
)

# Add documents
print("Adding documents to ChromaDB...")
start_time = time.time()

texts = [doc["text"] for doc in sample_docs]
metadatas = [doc["metadata"] for doc in sample_docs]
ids = [doc["id"] for doc in sample_docs]

result = chroma_manager.add_documents(texts, metadatas, ids)
chroma_time = time.time() - start_time

print(f"ChromaDB Results: {result}")
print(f"Time taken: {chroma_time:.2f} seconds")
print(f"Documents per second: {len(sample_docs)/chroma_time:.2f}")


In [None]:
# Test ChromaDB search
print("Testing ChromaDB search...")
queries = [
    "What is machine learning?",
    "How does deep learning work?",
    "Explain artificial intelligence",
    "What are neural networks?"
]

for query in queries:
    print(f"\nQuery: {query}")
    results = chroma_manager.search(query, top_k=3)
    for i, result in enumerate(results):
        print(f"  {i+1}. {result['document'][:100]}... (score: {result['score']:.3f})")


## 3. FAISS - High-Performance Vector Search

FAISS is Facebook's library for efficient similarity search and clustering of dense vectors.


In [None]:
# Initialize FAISS
print("Initializing FAISS...")
faiss_manager = VectorDatabaseManager(
    backend="faiss",
    collection_name="production_docs_faiss",
    embedding_model="BAAI/bge-base-en-v1.5"
)

# Add documents
print("Adding documents to FAISS...")
start_time = time.time()

result = faiss_manager.add_documents(texts, metadatas, ids)
faiss_time = time.time() - start_time

print(f"FAISS Results: {result}")
print(f"Time taken: {faiss_time:.2f} seconds")
print(f"Documents per second: {len(sample_docs)/faiss_time:.2f}")


In [None]:
# Test FAISS search
print("Testing FAISS search...")
for query in queries:
    print(f"\nQuery: {query}")
    results = faiss_manager.search(query, top_k=3)
    for i, result in enumerate(results):
        print(f"  {i+1}. {result['document'][:100]}... (score: {result['score']:.3f})")


## 4. Performance Comparison

Let's compare the performance of different backends.


In [None]:
# Performance comparison
performance_data = {
    'Backend': ['ChromaDB', 'FAISS'],
    'Indexing Time (s)': [chroma_time, faiss_time],
    'Docs per Second': [len(sample_docs)/chroma_time, len(sample_docs)/faiss_time]
}

df_performance = pd.DataFrame(performance_data)
print("Performance Comparison:")
print(df_performance.to_string(index=False))

# Visualization
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# Indexing time comparison
ax1.bar(df_performance['Backend'], df_performance['Indexing Time (s)'], color=['skyblue', 'lightcoral'])
ax1.set_title('Indexing Time Comparison')
ax1.set_ylabel('Time (seconds)')

# Documents per second comparison
ax2.bar(df_performance['Backend'], df_performance['Docs per Second'], color=['skyblue', 'lightcoral'])
ax2.set_title('Indexing Speed Comparison')
ax2.set_ylabel('Documents per Second')

plt.tight_layout()
plt.show()


## 5. Search Performance Testing

Test search performance across different backends.


In [None]:
# Search performance test
def test_search_performance(manager, queries, top_k=5, iterations=10):
    """Test search performance for a given manager."""
    times = []
    
    for _ in range(iterations):
        for query in queries:
            start_time = time.time()
            results = manager.search(query, top_k=top_k)
            search_time = time.time() - start_time
            times.append(search_time)
    
    return {
        'avg_time': np.mean(times),
        'std_time': np.std(times),
        'min_time': np.min(times),
        'max_time': np.max(times)
    }

# Test both backends
print("Testing search performance...")

chroma_perf = test_search_performance(chroma_manager, queries)
faiss_perf = test_search_performance(faiss_manager, queries)

print("\nSearch Performance Results:")
print(f"ChromaDB - Avg: {chroma_perf['avg_time']:.4f}s, Std: {chroma_perf['std_time']:.4f}s")
print(f"FAISS - Avg: {faiss_perf['avg_time']:.4f}s, Std: {faiss_perf['std_time']:.4f}s")


In [None]:
# Visualize search performance
search_data = {
    'Backend': ['ChromaDB', 'FAISS'],
    'Avg Search Time (ms)': [chroma_perf['avg_time']*1000, faiss_perf['avg_time']*1000],
    'Std Dev (ms)': [chroma_perf['std_time']*1000, faiss_perf['std_time']*1000]
}

df_search = pd.DataFrame(search_data)

plt.figure(figsize=(10, 6))
x = np.arange(len(df_search))
width = 0.35

plt.bar(x - width/2, df_search['Avg Search Time (ms)'], width, 
        label='Average Time', color='skyblue', alpha=0.7)
plt.bar(x + width/2, df_search['Std Dev (ms)'], width,
        label='Standard Deviation', color='lightcoral', alpha=0.7)

plt.xlabel('Backend')
plt.ylabel('Time (milliseconds)')
plt.title('Search Performance Comparison')
plt.xticks(x, df_search['Backend'])
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()


## 6. Database Management Operations

Learn how to manage vector databases in production.


In [None]:
# Get database statistics
print("ChromaDB Statistics:")
chroma_stats = chroma_manager.get_stats()
for key, value in chroma_stats.items():
    print(f"  {key}: {value}")

print("\nFAISS Statistics:")
faiss_stats = faiss_manager.get_stats()
for key, value in faiss_stats.items():
    print(f"  {key}: {value}")


In [None]:
# Test document retrieval by ID
print("Testing document retrieval by ID...")
test_id = "chunk_0"

chroma_doc = chroma_manager.get_document(test_id)
faiss_doc = faiss_manager.get_document(test_id)

print(f"\nChromaDB Document {test_id}:")
if chroma_doc:
    print(f"  Text: {chroma_doc['document'][:100]}...")
    print(f"  Metadata: {chroma_doc['metadata']}")
else:
    print("  Document not found")

print(f"\nFAISS Document {test_id}:")
if faiss_doc:
    print(f"  Text: {faiss_doc['document'][:100]}...")
    print(f"  Metadata: {faiss_doc['metadata']}")
else:
    print("  Document not found")


In [None]:
# Test metadata filtering
print("Testing metadata filtering...")

# Filter by source
filtered_results = chroma_manager.search(
    "machine learning", 
    top_k=3,
    filter_dict={"source": "wikipedia"}
)

print(f"\nFiltered results (source='wikipedia'): {len(filtered_results)} documents")
for i, result in enumerate(filtered_results):
    print(f"  {i+1}. {result['document'][:80]}... (score: {result['score']:.3f})")
    print(f"      Metadata: {result['metadata']}")


## 7. Production Deployment Considerations

Key considerations for deploying vector databases in production.


In [None]:
# Database backup example
print("Testing database backup...")
backup_path = str(DATA_DIR / "vector_db" / "backup")

backup_success = chroma_manager.backup_database(backup_path)
print(f"ChromaDB backup successful: {backup_success}")

backup_success = faiss_manager.backup_database(backup_path)
print(f"FAISS backup successful: {backup_success}")


In [None]:
# Memory usage analysis
import psutil
import tracemalloc

def analyze_memory_usage(manager, operation_name):
    """Analyze memory usage for a given operation."""
    tracemalloc.start()
    
    # Get initial memory
    process = psutil.Process()
    initial_memory = process.memory_info().rss / 1024 / 1024  # MB
    
    # Perform operation
    if operation_name == "search":
        results = manager.search("machine learning", top_k=10)
    elif operation_name == "stats":
        stats = manager.get_stats()
    
    # Get final memory
    final_memory = process.memory_info().rss / 1024 / 1024  # MB
    
    # Get tracemalloc info
    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    
    return {
        "initial_memory_mb": initial_memory,
        "final_memory_mb": final_memory,
        "memory_delta_mb": final_memory - initial_memory,
        "tracemalloc_current_mb": current / 1024 / 1024,
        "tracemalloc_peak_mb": peak / 1024 / 1024
    }

# Analyze memory usage
print("Memory Usage Analysis:")
print("\nChromaDB:")
chroma_memory = analyze_memory_usage(chroma_manager, "search")
for key, value in chroma_memory.items():
    print(f"  {key}: {value:.2f} MB")

print("\nFAISS:")
faiss_memory = analyze_memory_usage(faiss_manager, "search")
for key, value in faiss_memory.items():
    print(f"  {key}: {value:.2f} MB")


## 8. Production Best Practices

Key recommendations for production vector database deployment.


In [None]:
# Production recommendations
recommendations = {
    "ChromaDB": {
        "Best For": "Local development, small to medium scale (< 1M docs)",
        "Pros": [
            "Easy to set up and use",
            "Built-in metadata filtering",
            "Persistent storage",
            "Good for prototyping"
        ],
        "Cons": [
            "Limited scalability",
            "Single-node only",
            "Memory intensive for large datasets"
        ],
        "Use Cases": [
            "Development and testing",
            "Small production applications",
            "Rapid prototyping"
        ]
    },
    "FAISS": {
        "Best For": "High-performance search, large scale (> 1M docs)",
        "Pros": [
            "Extremely fast search",
            "Memory efficient",
            "Multiple index types",
            "GPU acceleration support"
        ],
        "Cons": [
            "No built-in metadata filtering",
            "More complex setup",
            "Requires manual persistence management"
        ],
        "Use Cases": [
            "Large-scale production systems",
            "High-throughput applications",
            "Research and experimentation"
        ]
    },
    "Pinecone": {
        "Best For": "Cloud-native, fully managed solutions",
        "Pros": [
            "Fully managed service",
            "Automatic scaling",
            "Built-in metadata filtering",
            "High availability"
        ],
        "Cons": [
            "Cost for large datasets",
            "Vendor lock-in",
            "Requires internet connection"
        ],
        "Use Cases": [
            "Production applications",
            "Multi-tenant systems",
            "Global deployments"
        ]
    }
}

print("Production Vector Database Recommendations:")
print("=" * 50)

for backend, info in recommendations.items():
    print(f"\n{backend}:")
    print(f"  Best For: {info['Best For']}")
    print(f"  Pros: {', '.join(info['Pros'])}")
    print(f"  Cons: {', '.join(info['Cons'])}")
    print(f"  Use Cases: {', '.join(info['Use Cases'])}")


In [None]:
# Performance summary
print("\nPerformance Summary:")
print("=" * 30)

print(f"\nChromaDB:")
print(f"  Indexing Speed: {len(sample_docs)/chroma_time:.2f} docs/sec")
print(f"  Search Speed: {chroma_perf['avg_time']*1000:.2f} ms/query")
print(f"  Memory Usage: {chroma_memory['final_memory_mb']:.2f} MB")
print(f"  Total Documents: {chroma_stats['total_documents']}")

print(f"\nFAISS:")
print(f"  Indexing Speed: {len(sample_docs)/faiss_time:.2f} docs/sec")
print(f"  Search Speed: {faiss_perf['avg_time']*1000:.2f} ms/query")
print(f"  Memory Usage: {faiss_memory['final_memory_mb']:.2f} MB")
print(f"  Total Documents: {faiss_stats['total_documents']}")


## 9. Integration with RAG System

Now let's integrate the vector database with our RAG system.


In [None]:
# Create a production-ready RAG system with vector database
class ProductionRAGSystem:
    """Production RAG system with vector database integration."""
    
    def __init__(self, vector_db_manager, llm_model=None):
        self.vector_db = vector_db_manager
        self.llm_model = llm_model
        
    def query(self, question: str, top_k: int = 5, use_reranking: bool = True) -> Dict[str, Any]:
        """Query the RAG system."""
        # Retrieve relevant documents
        retrieved_docs = self.vector_db.search(question, top_k=top_k)
        
        # Simple reranking (in production, use a proper reranker)
        if use_reranking and len(retrieved_docs) > 1:
            # Simple keyword-based reranking
            question_keywords = set(question.lower().split())
            for doc in retrieved_docs:
                doc_keywords = set(doc['document'].lower().split())
                keyword_overlap = len(question_keywords.intersection(doc_keywords))
                doc['rerank_score'] = doc['score'] + (keyword_overlap * 0.1)
            
            retrieved_docs.sort(key=lambda x: x['rerank_score'], reverse=True)
        
        # Generate response (simplified)
        context = " ".join([doc['document'] for doc in retrieved_docs[:3]])
        
        if self.llm_model:
            # Use actual LLM here
            response = f"Based on the context: {context[:200]}..."
        else:
            # Simple response generation
            response = f"I found {len(retrieved_docs)} relevant documents. Here's the most relevant information: {context[:200]}..."
        
        return {
            "question": question,
            "response": response,
            "retrieved_documents": retrieved_docs,
            "num_documents": len(retrieved_docs)
        }
    
    def add_documents(self, documents: List[Dict]) -> Dict[str, Any]:
        """Add documents to the RAG system."""
        texts = [doc.get("text", "") for doc in documents]
        metadatas = [doc.get("metadata", {}) for doc in documents]
        ids = [doc.get("id", f"doc_{i}") for i, doc in enumerate(documents)]
        
        return self.vector_db.add_documents(texts, metadatas, ids)
    
    def get_stats(self) -> Dict[str, Any]:
        """Get system statistics."""
        return self.vector_db.get_stats()

# Initialize production RAG system
print("Initializing Production RAG System...")
production_rag = ProductionRAGSystem(chroma_manager)

print("Production RAG System initialized successfully!")
print(f"Database stats: {production_rag.get_stats()}")


In [None]:
# Test the production RAG system
test_questions = [
    "What is machine learning?",
    "How does deep learning work?",
    "Explain artificial intelligence",
    "What are the applications of AI?"
]

print("Testing Production RAG System:")
print("=" * 40)

for question in test_questions:
    print(f"\nQuestion: {question}")
    result = production_rag.query(question, top_k=3)
    print(f"Response: {result['response']}")
    print(f"Retrieved {result['num_documents']} documents")
    
    # Show top document
    if result['retrieved_documents']:
        top_doc = result['retrieved_documents'][0]
        print(f"Top document: {top_doc['document'][:100]}... (score: {top_doc['score']:.3f})")


## Summary

This notebook demonstrated:

1. **Vector Database Backends**: ChromaDB, FAISS, and Pinecone
2. **Performance Comparison**: Indexing and search performance
3. **Production Features**: Backup, monitoring, scalability
4. **RAG Integration**: Complete production RAG system
5. **Best Practices**: Recommendations for different use cases

### Key Takeaways:
- **ChromaDB**: Best for development and small-scale production
- **FAISS**: Best for high-performance, large-scale applications
- **Pinecone**: Best for cloud-native, fully managed solutions
- **Production**: Always consider scalability, monitoring, and backup strategies

### Next Steps for Learners:
1. **Run Notebook 10** to understand vector database concepts
2. **Compare backends** to see performance differences
3. **Test the web app** to see production integration
4. **Experiment with metadata filtering** for advanced use cases
5. **Scale up** with larger datasets to see performance impact

This vector database system provides a solid foundation for building production RAG applications that can scale from prototype to enterprise-level deployments!
