# RAG Optimization: Performance, Cost, and Quality Tuning

## Table of Contents
1. [Introduction to RAG Optimization](#introduction)
2. [Performance Optimization](#performance-optimization)
3. [Cost Optimization](#cost-optimization)
4. [Quality Optimization](#quality-optimization)
5. [Caching Strategies](#caching-strategies)
6. [Batch Processing](#batch-processing)
7. [Model Selection](#model-selection)
8. [Infrastructure Optimization](#infrastructure-optimization)
9. [Monitoring & Metrics](#monitoring)
10. [Real-World Optimization Cases](#real-world-cases)

---

## Introduction to RAG Optimization {#introduction}

RAG optimization focuses on improving three key aspects of RAG systems:

### 1. Performance Optimization
- **Latency Reduction**: Faster response times
- **Throughput Improvement**: Handle more requests
- **Resource Efficiency**: Better CPU/memory usage
- **Scalability**: Handle growing workloads

### 2. Cost Optimization
- **API Cost Reduction**: Minimize LLM API calls
- **Infrastructure Costs**: Optimize compute resources
- **Storage Costs**: Efficient data storage
- **Operational Costs**: Reduce maintenance overhead

### 3. Quality Optimization
- **Accuracy Improvement**: Better retrieval and generation
- **Relevance Enhancement**: More relevant responses
- **Consistency**: Reliable performance across queries
- **User Satisfaction**: Better user experience

### Optimization Trade-offs
- **Performance vs Quality**: Faster responses may reduce quality
- **Cost vs Performance**: Better performance often costs more
- **Quality vs Cost**: Higher quality may require more resources
- **Latency vs Throughput**: Lower latency may reduce throughput

In [None]:
# Install required packages
!pip install -q sentence-transformers qdrant-client openai python-dotenv tiktoken asyncio redis psutil memory-profiler

# Import necessary libraries
import os
import time
import asyncio
import json
import hashlib
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import openai
from dotenv import load_dotenv
import tiktoken
import psutil
import gc
from collections import defaultdict
import redis
import pickle

# Load environment variables
load_dotenv()

# Set up OpenAI API
openai.api_key = os.getenv("OPENAI_API_KEY")

print("‚úÖ All packages imported successfully!")
print("üîß Environment configured for RAG optimization")

## Performance Optimization {#performance-optimization}

Performance optimization focuses on reducing latency and improving throughput while maintaining quality.

### Key Optimization Areas

1. **Embedding Optimization**: Faster embedding generation
2. **Vector Search Optimization**: Efficient similarity search
3. **LLM Optimization**: Faster text generation
4. **Caching**: Reduce redundant computations
5. **Parallel Processing**: Concurrent operations
6. **Memory Management**: Efficient memory usage

In [None]:
@dataclass
class PerformanceMetrics:
    """Performance metrics for RAG system"""
    total_time: float
    embedding_time: float
    retrieval_time: float
    generation_time: float
    memory_usage: float
    cpu_usage: float
    tokens_generated: int
    tokens_processed: int

class OptimizedRAGSystem:
    """Optimized RAG system with performance monitoring"""
    
    def __init__(self, 
                 embedding_model: str = "all-MiniLM-L6-v2",
                 use_gpu: bool = False,
                 batch_size: int = 32):
        
        # Initialize embedding model
        self.embedder = SentenceTransformer(embedding_model)
        if use_gpu:
            self.embedder = self.embedder.cuda()
        
        self.batch_size = batch_size
        
        # Initialize vector store
        self.vector_client = QdrantClient(":memory:")
        self.vector_client.create_collection(
            collection_name="optimized_kb",
            vectors_config=VectorParams(
                size=self.embedder.get_sentence_embedding_dimension(),
                distance=Distance.COSINE
            )
        )
        
        # Performance tracking
        self.performance_history = []
        self.cache_hits = 0
        self.cache_misses = 0
        
        print(f"‚úÖ Optimized RAG system initialized with {embedding_model}")
        print(f"üîß GPU acceleration: {use_gpu}")
        print(f"üì¶ Batch size: {batch_size}")
    
    def add_documents_batch(self, documents: List[Dict[str, Any]]):
        """Add documents in optimized batches"""
        start_time = time.time()
        
        # Process documents in batches
        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i + self.batch_size]
            
            # Generate embeddings in batch
            batch_texts = [doc["content"] for doc in batch]
            batch_embeddings = self.embedder.encode(batch_texts, batch_size=self.batch_size)
            
            # Create points
            points = []
            for j, (doc, embedding) in enumerate(zip(batch, batch_embeddings)):
                point = PointStruct(
                    id=i + j,
                    vector=embedding.tolist(),
                    payload={
                        "content": doc["content"],
                        "metadata": doc.get("metadata", {}),
                        "doc_id": doc.get("id", f"doc_{i + j}")
                    }
                )
                points.append(point)
            
            # Add to vector store
            self.vector_client.upsert(
                collection_name="optimized_kb",
                points=points
            )
        
        processing_time = time.time() - start_time
        print(f"‚úÖ Added {len(documents)} documents in {processing_time:.2f}s")
        print(f"üìä Average time per document: {processing_time / len(documents):.4f}s")
    
    async def search_optimized(self, query: str, limit: int = 5) -> Dict[str, Any]:
        """Optimized search with performance monitoring"""
        start_time = time.time()
        memory_before = psutil.Process().memory_info().rss / 1024 / 1024  # MB
        cpu_before = psutil.cpu_percent()
        
        # Generate query embedding
        embedding_start = time.time()
        query_embedding = self.embedder.encode([query], batch_size=1)[0]
        embedding_time = time.time() - embedding_start
        
        # Vector search
        retrieval_start = time.time()
        search_results = self.vector_client.search(
            collection_name="optimized_kb",
            query_vector=query_embedding.tolist(),
            limit=limit
        )
        retrieval_time = time.time() - retrieval_start
        
        # Generate response
        generation_start = time.time()
        context = "\n".join([hit.payload["content"] for hit in search_results])
        
        # Simple response generation (in practice, use LLM)
        response = f"Based on the retrieved information: {context[:200]}..."
        generation_time = time.time() - generation_start
        
        # Calculate metrics
        total_time = time.time() - start_time
        memory_after = psutil.Process().memory_info().rss / 1024 / 1024
        cpu_after = psutil.cpu_percent()
        
        metrics = PerformanceMetrics(
            total_time=total_time,
            embedding_time=embedding_time,
            retrieval_time=retrieval_time,
            generation_time=generation_time,
            memory_usage=memory_after - memory_before,
            cpu_usage=(cpu_before + cpu_after) / 2,
            tokens_generated=len(response.split()),
            tokens_processed=len(query.split()) + len(context.split())
        )
        
        self.performance_history.append(metrics)
        
        return {
            "query": query,
            "response": response,
            "results": [hit.payload for hit in search_results],
            "metrics": metrics
        }
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """Get performance summary"""
        if not self.performance_history:
            return {"message": "No performance data available"}
        
        metrics = self.performance_history[-10:]  # Last 10 queries
        
        return {
            "total_queries": len(self.performance_history),
            "average_total_time": np.mean([m.total_time for m in metrics]),
            "average_embedding_time": np.mean([m.embedding_time for m in metrics]),
            "average_retrieval_time": np.mean([m.retrieval_time for m in metrics]),
            "average_generation_time": np.mean([m.generation_time for m in metrics]),
            "average_memory_usage": np.mean([m.memory_usage for m in metrics]),
            "average_cpu_usage": np.mean([m.cpu_usage for m in metrics]),
            "average_tokens_generated": np.mean([m.tokens_generated for m in metrics]),
            "average_tokens_processed": np.mean([m.tokens_processed for m in metrics]),
            "cache_hit_rate": self.cache_hits / max(self.cache_hits + self.cache_misses, 1)
        }
    
    def optimize_embeddings(self, queries: List[str]) -> List[np.ndarray]:
        """Optimized batch embedding generation"""
        start_time = time.time()
        
        # Process in batches
        all_embeddings = []
        for i in range(0, len(queries), self.batch_size):
            batch = queries[i:i + self.batch_size]
            batch_embeddings = self.embedder.encode(batch, batch_size=self.batch_size)
            all_embeddings.extend(batch_embeddings)
        
        processing_time = time.time() - start_time
        print(f"‚úÖ Generated {len(queries)} embeddings in {processing_time:.2f}s")
        print(f"üìä Average time per embedding: {processing_time / len(queries):.4f}s")
        
        return all_embeddings

# Test optimized RAG system
print("üß™ Testing Optimized RAG System:")

# Create optimized system
optimized_rag = OptimizedRAGSystem(use_gpu=False, batch_size=16)

# Sample documents
sample_docs = [
    {
        "id": f"doc_{i}",
        "content": f"Document {i}: This is sample content about machine learning, artificial intelligence, and data science. It contains information about algorithms, models, and applications.",
        "metadata": {"category": "AI", "type": "technical"}
    }
    for i in range(100)  # 100 documents for testing
]

# Add documents with batch processing
optimized_rag.add_documents_batch(sample_docs)

# Test queries
test_queries = [
    "What is machine learning?",
    "How does artificial intelligence work?",
    "What are data science applications?",
    "Explain machine learning algorithms",
    "What is deep learning?"
]

# Test performance
print(f"\nüîç Testing performance with {len(test_queries)} queries:")

for i, query in enumerate(test_queries):
    print(f"\nQuery {i+1}: '{query}'")
    
    result = await optimized_rag.search_optimized(query, limit=3)
    
    metrics = result["metrics"]
    print(f"  Total time: {metrics.total_time:.3f}s")
    print(f"  Embedding time: {metrics.embedding_time:.3f}s")
    print(f"  Retrieval time: {metrics.retrieval_time:.3f}s")
    print(f"  Generation time: {metrics.generation_time:.3f}s")
    print(f"  Memory usage: {metrics.memory_usage:.2f}MB")
    print(f"  CPU usage: {metrics.cpu_usage:.1f}%")

# Get performance summary
print(f"\nüìä Performance Summary:")
summary = optimized_rag.get_performance_summary()
for key, value in summary.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.3f}")
    else:
        print(f"  {key}: {value}")

# Test batch embedding optimization
print(f"\nüîÑ Testing batch embedding optimization:")
batch_queries = [f"Query {i} about machine learning" for i in range(50)]
embeddings = optimized_rag.optimize_embeddings(batch_queries)
print(f"Generated {len(embeddings)} embeddings")

## Cost Optimization {#cost-optimization}

Cost optimization focuses on reducing operational costs while maintaining system performance and quality.

### Key Cost Factors

1. **LLM API Costs**: Token usage and API calls
2. **Embedding Costs**: Vector generation and storage
3. **Infrastructure Costs**: Compute, storage, and networking
4. **Operational Costs**: Maintenance and monitoring
5. **Data Costs**: Storage and transfer costs

### Optimization Strategies

1. **Token Optimization**: Reduce token usage
2. **Caching**: Avoid redundant computations
3. **Model Selection**: Choose cost-effective models
4. **Batch Processing**: Reduce API call overhead
5. **Compression**: Reduce data storage and transfer

In [None]:
class CostOptimizedRAG:
    """Cost-optimized RAG system with cost tracking and optimization"""
    
    def __init__(self, 
                 embedding_model: str = "all-MiniLM-L6-v2",
                 llm_model: str = "gpt-3.5-turbo",
                 use_caching: bool = True):
        
        self.embedder = SentenceTransformer(embedding_model)
        self.llm_model = llm_model
        self.use_caching = use_caching
        
        # Cost tracking
        self.cost_metrics = {
            "total_queries": 0,
            "total_tokens": 0,
            "total_cost": 0.0,
            "embedding_cost": 0.0,
            "llm_cost": 0.0,
            "cache_savings": 0.0
        }
        
        # Token costs (approximate)
        self.token_costs = {
            "gpt-3.5-turbo": 0.002,  # $0.002 per 1K tokens
            "gpt-4": 0.03,           # $0.03 per 1K tokens
            "embedding": 0.0001      # $0.0001 per 1K tokens
        }
        
        # Caching
        if use_caching:
            self.query_cache = {}
            self.embedding_cache = {}
            self.response_cache = {}
        
        print(f"‚úÖ Cost-optimized RAG initialized")
        print(f"üí∞ LLM model: {llm_model}")
        print(f"üíæ Caching enabled: {use_caching}")
    
    def calculate_token_cost(self, text: str, model: str) -> float:
        """Calculate cost for text based on model"""
        # Simple token counting (in practice, use tiktoken)
        token_count = len(text.split()) * 1.3  # Approximate token count
        
        if model in self.token_costs:
            cost_per_token = self.token_costs[model] / 1000
            return token_count * cost_per_token
        
        return 0.0
    
    def calculate_embedding_cost(self, text: str) -> float:
        """Calculate cost for embedding generation"""
        return self.calculate_token_cost(text, "embedding")
    
    def calculate_llm_cost(self, text: str) -> float:
        """Calculate cost for LLM generation"""
        return self.calculate_token_cost(text, self.llm_model)
    
    def get_cache_key(self, query: str) -> str:
        """Generate cache key for query"""
        return hashlib.md5(query.encode()).hexdigest()
    
    def search_with_cost_optimization(self, query: str, limit: int = 5) -> Dict[str, Any]:
        """Search with cost optimization"""
        start_time = time.time()
        self.cost_metrics["total_queries"] += 1
        
        # Check cache first
        if self.use_caching:
            cache_key = self.get_cache_key(query)
            if cache_key in self.query_cache:
                self.cost_metrics["cache_savings"] += 1
                print(f"üíæ Cache hit for query: {query[:50]}...")
                return self.query_cache[cache_key]
        
        # Generate query embedding
        query_cost = self.calculate_embedding_cost(query)
        self.cost_metrics["embedding_cost"] += query_cost
        
        # Mock retrieval (in practice, use vector store)
        retrieval_results = [
            {"content": f"Result {i} for query: {query}", "score": 0.9 - i * 0.1}
            for i in range(limit)
        ]
        
        # Generate response
        context = "\n".join([r["content"] for r in retrieval_results])
        response = f"Based on the retrieved information: {context[:200]}..."
        
        response_cost = self.calculate_llm_cost(response)
        self.cost_metrics["llm_cost"] += response_cost
        
        # Calculate total cost
        total_cost = query_cost + response_cost
        self.cost_metrics["total_cost"] += total_cost
        self.cost_metrics["total_tokens"] += len(query.split()) + len(response.split())
        
        result = {
            "query": query,
            "response": response,
            "results": retrieval_results,
            "cost_breakdown": {
                "query_cost": query_cost,
                "response_cost": response_cost,
                "total_cost": total_cost
            },
            "processing_time": time.time() - start_time
        }
        
        # Cache result
        if self.use_caching:
            self.query_cache[cache_key] = result
        
        return result
    
    def get_cost_summary(self) -> Dict[str, Any]:
        """Get cost summary"""
        if self.cost_metrics["total_queries"] == 0:
            return {"message": "No cost data available"}
        
        avg_cost_per_query = self.cost_metrics["total_cost"] / self.cost_metrics["total_queries"]
        avg_tokens_per_query = self.cost_metrics["total_tokens"] / self.cost_metrics["total_queries"]
        
        return {
            "total_queries": self.cost_metrics["total_queries"],
            "total_cost": self.cost_metrics["total_cost"],
            "average_cost_per_query": avg_cost_per_query,
            "total_tokens": self.cost_metrics["total_tokens"],
            "average_tokens_per_query": avg_tokens_per_query,
            "embedding_cost": self.cost_metrics["embedding_cost"],
            "llm_cost": self.cost_metrics["llm_cost"],
            "cache_hit_rate": self.cost_metrics["cache_savings"] / self.cost_metrics["total_queries"],
            "cost_breakdown": {
                "embedding_percentage": (self.cost_metrics["embedding_cost"] / self.cost_metrics["total_cost"]) * 100,
                "llm_percentage": (self.cost_metrics["llm_cost"] / self.cost_metrics["total_cost"]) * 100
            }
        }
    
    def optimize_for_cost(self, queries: List[str]) -> Dict[str, Any]:
        """Optimize system for cost reduction"""
        optimizations = {
            "batch_processing": False,
            "query_compression": False,
            "response_compression": False,
            "model_downgrade": False,
            "caching_improvement": False
        }
        
        # Analyze queries for optimization opportunities
        if len(queries) > 10:
            optimizations["batch_processing"] = True
        
        # Check for similar queries
        if self.use_caching:
            cache_hit_rate = self.cost_metrics["cache_savings"] / max(self.cost_metrics["total_queries"], 1)
            if cache_hit_rate < 0.3:
                optimizations["caching_improvement"] = True
        
        # Check for long queries
        avg_query_length = np.mean([len(q.split()) for q in queries])
        if avg_query_length > 20:
            optimizations["query_compression"] = True
        
        # Check for long responses
        if self.cost_metrics["total_tokens"] > 0:
            avg_response_length = self.cost_metrics["total_tokens"] / self.cost_metrics["total_queries"]
            if avg_response_length > 100:
                optimizations["response_compression"] = True
        
        # Check if model upgrade is needed
        if self.llm_model == "gpt-3.5-turbo" and self.cost_metrics["total_cost"] > 10:
            optimizations["model_downgrade"] = True
        
        return optimizations

# Test cost-optimized RAG
print("üß™ Testing Cost-Optimized RAG System:")

# Create cost-optimized system
cost_rag = CostOptimizedRAG(use_caching=True)

# Test queries
test_queries = [
    "What is machine learning?",
    "How does artificial intelligence work?",
    "What is machine learning?",  # Duplicate to test caching
    "Explain deep learning algorithms",
    "What is machine learning?",  # Another duplicate
    "How do neural networks work?",
    "What is data science?",
    "Explain machine learning models",
    "What is machine learning?",  # Another duplicate
    "How does reinforcement learning work?"
]

print(f"üîç Testing cost optimization with {len(test_queries)} queries:")

for i, query in enumerate(test_queries):
    print(f"\nQuery {i+1}: '{query}'")
    
    result = cost_rag.search_with_cost_optimization(query, limit=3)
    
    cost_breakdown = result["cost_breakdown"]
    print(f"  Query cost: ${cost_breakdown['query_cost']:.6f}")
    print(f"  Response cost: ${cost_breakdown['response_cost']:.6f}")
    print(f"  Total cost: ${cost_breakdown['total_cost']:.6f}")
    print(f"  Processing time: {result['processing_time']:.3f}s")

# Get cost summary
print(f"\nüí∞ Cost Summary:")
summary = cost_rag.get_cost_summary()
for key, value in summary.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.6f}")
    else:
        print(f"  {key}: {value}")

# Get optimization recommendations
print(f"\nüîß Optimization Recommendations:")
optimizations = cost_rag.optimize_for_cost(test_queries)
for optimization, recommended in optimizations.items():
    status = "‚úÖ Recommended" if recommended else "‚ùå Not needed"
    print(f"  {optimization}: {status}")

## Quality Optimization {#quality-optimization}

Quality optimization focuses on improving the accuracy, relevance, and coherence of RAG responses.

### Key Quality Metrics

1. **Accuracy**: Factual correctness of responses
2. **Relevance**: How well responses address queries
3. **Completeness**: Coverage of query topics
4. **Coherence**: Logical flow and structure
5. **Consistency**: Reliable performance across queries

### Optimization Strategies

1. **Retrieval Enhancement**: Improve document retrieval
2. **Context Optimization**: Better context selection
3. **Response Refinement**: Improve response generation
4. **Quality Monitoring**: Track and improve quality
5. **Feedback Integration**: Learn from user feedback

In [None]:
class QualityOptimizedRAG:
    """Quality-optimized RAG system with quality monitoring and improvement"""
    
    def __init__(self, 
                 embedding_model: str = "all-MiniLM-L6-v2",
                 quality_threshold: float = 0.7):
        
        self.embedder = SentenceTransformer(embedding_model)
        self.quality_threshold = quality_threshold
        
        # Quality tracking
        self.quality_metrics = {
            "total_queries": 0,
            "high_quality_responses": 0,
            "medium_quality_responses": 0,
            "low_quality_responses": 0,
            "average_quality_score": 0.0,
            "quality_improvements": 0
        }
        
        # Quality assessment criteria
        self.quality_criteria = {
            "relevance": 0.3,
            "completeness": 0.25,
            "accuracy": 0.25,
            "coherence": 0.2
        }
        
        print(f"‚úÖ Quality-optimized RAG initialized")
        print(f"üéØ Quality threshold: {quality_threshold}")
    
    def assess_response_quality(self, query: str, response: str, context: str) -> Dict[str, float]:
        """Assess quality of response across multiple criteria"""
        
        # Relevance assessment
        relevance_score = self._assess_relevance(query, response)
        
        # Completeness assessment
        completeness_score = self._assess_completeness(query, response)
        
        # Accuracy assessment
        accuracy_score = self._assess_accuracy(response, context)
        
        # Coherence assessment
        coherence_score = self._assess_coherence(response)
        
        # Calculate overall quality score
        overall_score = (
            relevance_score * self.quality_criteria["relevance"] +
            completeness_score * self.quality_criteria["completeness"] +
            accuracy_score * self.quality_criteria["accuracy"] +
            coherence_score * self.quality_criteria["coherence"]
        )
        
        return {
            "overall_score": overall_score,
            "relevance": relevance_score,
            "completeness": completeness_score,
            "accuracy": accuracy_score,
            "coherence": coherence_score
        }
    
    def _assess_relevance(self, query: str, response: str) -> float:
        """Assess relevance of response to query"""
        query_terms = set(query.lower().split())
        response_terms = set(response.lower().split())
        
        if not query_terms:
            return 0.5
        
        # Calculate term overlap
        overlap = len(query_terms & response_terms)
        relevance_score = overlap / len(query_terms)
        
        return min(relevance_score, 1.0)
    
    def _assess_completeness(self, query: str, response: str) -> float:
        """Assess completeness of response"""
        query_terms = set(query.lower().split())
        response_terms = set(response.lower().split())
        
        if not query_terms:
            return 0.5
        
        # Check if response addresses query terms
        addressed_terms = len(query_terms & response_terms)
        completeness_score = addressed_terms / len(query_terms)
        
        # Also consider response length
        length_score = min(len(response) / 200, 1.0)  # Normalize to 200 chars
        
        return (completeness_score + length_score) / 2
    
    def _assess_accuracy(self, response: str, context: str) -> float:
        """Assess accuracy of response against context"""
        if not context:
            return 0.5
        
        # Simple accuracy check: count fact claims vs context
        response_sentences = response.split('. ')
        context_sentences = context.split('. ')
        
        if not response_sentences or not context_sentences:
            return 0.5
        
        # Check if response sentences are supported by context
        supported_sentences = 0
        for resp_sentence in response_sentences:
            if any(resp_sentence.lower() in ctx_sentence.lower() for ctx_sentence in context_sentences):
                supported_sentences += 1
        
        accuracy_score = supported_sentences / len(response_sentences)
        return accuracy_score
    
    def _assess_coherence(self, response: str) -> float:
        """Assess coherence of response"""
        sentences = response.split('. ')
        
        if len(sentences) < 2:
            return 0.5
        
        # Check for transition words
        transition_words = ["however", "therefore", "furthermore", "additionally", "moreover", "consequently"]
        transition_count = sum(1 for word in transition_words if word in response.lower())
        
        # Check sentence length variation
        sentence_lengths = [len(sentence.split()) for sentence in sentences]
        length_variation = np.std(sentence_lengths) / np.mean(sentence_lengths) if np.mean(sentence_lengths) > 0 else 0
        
        # Coherence score
        transition_score = min(transition_count / len(sentences), 1.0)
        variation_score = min(length_variation, 1.0)
        
        return (transition_score + variation_score) / 2
    
    def improve_response_quality(self, query: str, response: str, context: str) -> str:
        """Improve response quality based on assessment"""
        quality_scores = self.assess_response_quality(query, response, context)
        
        improved_response = response
        
        # Improve relevance
        if quality_scores["relevance"] < 0.6:
            improved_response = self._improve_relevance(query, improved_response)
        
        # Improve completeness
        if quality_scores["completeness"] < 0.6:
            improved_response = self._improve_completeness(query, improved_response)
        
        # Improve coherence
        if quality_scores["coherence"] < 0.6:
            improved_response = self._improve_coherence(improved_response)
        
        return improved_response
    
    def _improve_relevance(self, query: str, response: str) -> str:
        """Improve relevance of response to query"""
        query_terms = set(query.lower().split())
        response_terms = set(response.lower().split())
        
        # Add missing query terms to response
        missing_terms = query_terms - response_terms
        if missing_terms:
            improved_response = f"{response} This addresses: {', '.join(missing_terms)}."
            return improved_response
        
        return response
    
    def _improve_completeness(self, query: str, response: str) -> str:
        """Improve completeness of response"""
        if len(response) < 100:
            improved_response = f"{response} For more details, consider exploring related topics and applications."
            return improved_response
        
        return response
    
    def _improve_coherence(self, response: str) -> str:
        """Improve coherence of response"""
        sentences = response.split('. ')
        
        if len(sentences) < 2:
            return response
        
        # Add transition words
        improved_sentences = []
        for i, sentence in enumerate(sentences):
            if i > 0 and not any(word in sentence.lower() for word in ["however", "therefore", "furthermore", "additionally"]):
                improved_sentences.append(f"Furthermore, {sentence}")
            else:
                improved_sentences.append(sentence)
        
        return '. '.join(improved_sentences)
    
    def search_with_quality_optimization(self, query: str, limit: int = 5) -> Dict[str, Any]:
        """Search with quality optimization"""
        start_time = time.time()
        self.quality_metrics["total_queries"] += 1
        
        # Mock retrieval
        retrieval_results = [
            {"content": f"Result {i} for query: {query}", "score": 0.9 - i * 0.1}
            for i in range(limit)
        ]
        
        # Generate initial response
        context = "\n".join([r["content"] for r in retrieval_results])
        initial_response = f"Based on the retrieved information: {context[:200]}..."
        
        # Assess quality
        quality_scores = self.assess_response_quality(query, initial_response, context)
        
        # Improve response if needed
        if quality_scores["overall_score"] < self.quality_threshold:
            improved_response = self.improve_response_quality(query, initial_response, context)
            self.quality_metrics["quality_improvements"] += 1
        else:
            improved_response = initial_response
        
        # Update quality metrics
        if quality_scores["overall_score"] >= 0.8:
            self.quality_metrics["high_quality_responses"] += 1
        elif quality_scores["overall_score"] >= 0.6:
            self.quality_metrics["medium_quality_responses"] += 1
        else:
            self.quality_metrics["low_quality_responses"] += 1
        
        # Update average quality score
        total_responses = self.quality_metrics["total_queries"]
        current_avg = self.quality_metrics["average_quality_score"]
        self.quality_metrics["average_quality_score"] = (
            (current_avg * (total_responses - 1) + quality_scores["overall_score"]) / total_responses
        )
        
        return {
            "query": query,
            "response": improved_response,
            "results": retrieval_results,
            "quality_scores": quality_scores,
            "processing_time": time.time() - start_time
        }
    
    def get_quality_summary(self) -> Dict[str, Any]:
        """Get quality summary"""
        if self.quality_metrics["total_queries"] == 0:
            return {"message": "No quality data available"}
        
        total_queries = self.quality_metrics["total_queries"]
        
        return {
            "total_queries": total_queries,
            "average_quality_score": self.quality_metrics["average_quality_score"],
            "high_quality_percentage": (self.quality_metrics["high_quality_responses"] / total_queries) * 100,
            "medium_quality_percentage": (self.quality_metrics["medium_quality_responses"] / total_queries) * 100,
            "low_quality_percentage": (self.quality_metrics["low_quality_responses"] / total_queries) * 100,
            "quality_improvements": self.quality_metrics["quality_improvements"],
            "improvement_rate": (self.quality_metrics["quality_improvements"] / total_queries) * 100
        }

# Test quality-optimized RAG
print("üß™ Testing Quality-Optimized RAG System:")

# Create quality-optimized system
quality_rag = QualityOptimizedRAG(quality_threshold=0.7)

# Test queries
test_queries = [
    "What is machine learning?",
    "How does artificial intelligence work?",
    "Explain deep learning algorithms",
    "What are the applications of data science?",
    "How do neural networks learn?"
]

print(f"üîç Testing quality optimization with {len(test_queries)} queries:")

for i, query in enumerate(test_queries):
    print(f"\nQuery {i+1}: '{query}'")
    
    result = quality_rag.search_with_quality_optimization(query, limit=3)
    
    quality_scores = result["quality_scores"]
    print(f"  Overall quality: {quality_scores['overall_score']:.3f}")
    print(f"  Relevance: {quality_scores['relevance']:.3f}")
    print(f"  Completeness: {quality_scores['completeness']:.3f}")
    print(f"  Accuracy: {quality_scores['accuracy']:.3f}")
    print(f"  Coherence: {quality_scores['coherence']:.3f}")
    print(f"  Processing time: {result['processing_time']:.3f}s")

# Get quality summary
print(f"\nüéØ Quality Summary:")
summary = quality_rag.get_quality_summary()
for key, value in summary.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.3f}")
    else:
        print(f"  {key}: {value}")

## Caching Strategies {#caching-strategies}

Caching is crucial for RAG optimization, reducing latency and costs while improving performance.

### Caching Types

1. **Query Caching**: Cache complete query responses
2. **Embedding Caching**: Cache computed embeddings
3. **Result Caching**: Cache retrieval results
4. **Context Caching**: Cache processed context
5. **Model Caching**: Cache model outputs

### Caching Strategies

1. **LRU (Least Recently Used)**: Remove least recently used items
2. **LFU (Least Frequently Used)**: Remove least frequently used items
3. **TTL (Time To Live)**: Remove items after expiration
4. **Size-based**: Remove items when cache is full
5. **Hybrid**: Combine multiple strategies

In [None]:
class AdvancedCachingSystem:
    """Advanced caching system with multiple strategies"""
    
    def __init__(self, 
                 max_size: int = 1000,
                 ttl: int = 3600,  # 1 hour
                 strategy: str = "lru"):
        
        self.max_size = max_size
        self.ttl = ttl
        self.strategy = strategy
        
        # Cache storage
        self.cache = {}
        self.access_times = {}
        self.access_counts = {}
        self.creation_times = {}
        
        # Cache metrics
        self.metrics = {
            "hits": 0,
            "misses": 0,
            "evictions": 0,
            "total_requests": 0
        }
        
        print(f"‚úÖ Advanced caching system initialized")
        print(f"üì¶ Max size: {max_size}")
        print(f"‚è∞ TTL: {ttl}s")
        print(f"üîÑ Strategy: {strategy}")
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        self.metrics["total_requests"] += 1
        
        if key not in self.cache:
            self.metrics["misses"] += 1
            return None
        
        # Check TTL
        if time.time() - self.creation_times[key] > self.ttl:
            self._evict(key)
            self.metrics["misses"] += 1
            return None
        
        # Update access information
        self.access_times[key] = time.time()
        self.access_counts[key] = self.access_counts.get(key, 0) + 1
        
        self.metrics["hits"] += 1
        return self.cache[key]
    
    def set(self, key: str, value: Any) -> None:
        """Set value in cache"""
        # Check if cache is full
        if len(self.cache) >= self.max_size and key not in self.cache:
            self._evict_oldest()
        
        # Store value
        self.cache[key] = value
        self.access_times[key] = time.time()
        self.access_counts[key] = 1
        self.creation_times[key] = time.time()
    
    def _evict_oldest(self) -> None:
        """Evict oldest item based on strategy"""
        if not self.cache:
            return
        
        if self.strategy == "lru":
            # Remove least recently used
            oldest_key = min(self.access_times, key=self.access_times.get)
        elif self.strategy == "lfu":
            # Remove least frequently used
            oldest_key = min(self.access_counts, key=self.access_counts.get)
        else:
            # Default to LRU
            oldest_key = min(self.access_times, key=self.access_times.get)
        
        self._evict(oldest_key)
    
    def _evict(self, key: str) -> None:
        """Evict specific key"""
        if key in self.cache:
            del self.cache[key]
            del self.access_times[key]
            del self.access_counts[key]
            del self.creation_times[key]
            self.metrics["evictions"] += 1
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get cache metrics"""
        total_requests = self.metrics["total_requests"]
        hit_rate = self.metrics["hits"] / total_requests if total_requests > 0 else 0
        
        return {
            "total_requests": total_requests,
            "hits": self.metrics["hits"],
            "misses": self.metrics["misses"],
            "evictions": self.metrics["evictions"],
            "hit_rate": hit_rate,
            "cache_size": len(self.cache),
            "max_size": self.max_size,
            "utilization": len(self.cache) / self.max_size
        }
    
    def clear(self) -> None:
        """Clear all cache"""
        self.cache.clear()
        self.access_times.clear()
        self.access_counts.clear()
        self.creation_times.clear()
        print("‚úÖ Cache cleared")

# Test caching system
print("üß™ Testing Advanced Caching System:")

# Create caching system
cache = AdvancedCachingSystem(max_size=5, ttl=60, strategy="lru")

# Test caching
test_data = [
    ("key1", "value1"),
    ("key2", "value2"),
    ("key3", "value3"),
    ("key4", "value4"),
    ("key5", "value5"),
    ("key6", "value6"),  # This should evict key1
    ("key7", "value7"),  # This should evict key2
]

print("üìù Adding data to cache:")
for key, value in test_data:
    cache.set(key, value)
    print(f"  Set {key}: {value}")

print("\nüîç Testing cache retrieval:")
for key in ["key1", "key2", "key3", "key4", "key5", "key6", "key7"]:
    value = cache.get(key)
    if value:
        print(f"  Hit: {key} = {value}")
    else:
        print(f"  Miss: {key}")

# Get cache metrics
print(f"\nüìä Cache Metrics:")
metrics = cache.get_metrics()
for key, value in metrics.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.3f}")
    else:
        print(f"  {key}: {value}")

# Test TTL
print(f"\n‚è∞ Testing TTL (waiting 2 seconds):")
time.sleep(2)
cache.set("test_key", "test_value")
print(f"  Set test_key with TTL")
time.sleep(2)
value = cache.get("test_key")
if value:
    print(f"  Hit: test_key = {value}")
else:
    print(f"  Miss: test_key (expired)")

# Clear cache
cache.clear()
print(f"\n‚úÖ Cache cleared")

## Key Takeaways & Next Steps

### What We've Optimized
‚úÖ **Performance Optimization** with batch processing and efficient algorithms
‚úÖ **Cost Optimization** with caching and token management
‚úÖ **Quality Optimization** with response assessment and improvement
‚úÖ **Advanced Caching** with multiple strategies and TTL
‚úÖ **Monitoring & Metrics** for continuous improvement

### Key Insights
1. **Performance vs Quality**: Balance speed with accuracy
2. **Cost vs Performance**: Optimize for your specific needs
3. **Caching is Critical**: Reduces costs and improves performance
4. **Quality Monitoring**: Essential for maintaining user satisfaction
5. **Continuous Optimization**: Regular monitoring and improvement needed

### Next Steps
- **Production Deployment**: Implement in production environment
- **A/B Testing**: Test different optimization strategies
- **User Feedback**: Integrate user feedback for quality improvement
- **Advanced Monitoring**: Implement comprehensive monitoring
- **Cost Analysis**: Regular cost analysis and optimization

### Advanced Topics to Explore
- **Federated Caching**: Distributed caching systems
- **Predictive Caching**: ML-based cache optimization
- **Quality Learning**: Adaptive quality improvement
- **Cost Prediction**: Predictive cost modeling
- **Performance Tuning**: Advanced performance optimization

---

**Ready to optimize your RAG system?** Start with performance monitoring, then gradually add caching, cost optimization, and quality improvement based on your specific requirements!