In [1]:
"""
Enterprise Conversational RAG System
- Low-latency (<15s on CPU)
- Modular design for easy component replacement
- Comprehensive logging for performance monitoring
- Hybrid retrieval (BM25 + Embeddings)
"""

import os
import time
import logging
import json
from typing import List, Dict, Any, Tuple, Optional
from dataclasses import dataclass
import torch
from tqdm import tqdm

In [2]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("rag_system.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("rag_system")

In [3]:
# Performance tracking
@dataclass
class LatencyTracker:
    start_time: float = 0
    
    def start(self):
        self.start_time = time.time()
        return self
    
    def end(self, operation_name: str) -> float:
        duration = time.time() - self.start_time
        logger.info(f"LATENCY: {operation_name} took {duration:.4f} seconds")
        return duration

##### Document Processing

In [4]:
class DocumentProcessor:
    """Handles document loading and extraction from various formats"""
    
    def __init__(self):
        try:
            from unstructured.partition.pdf import partition_pdf
            self.partition_pdf = partition_pdf
        except ImportError:
            logger.error("unstructured package not installed. Install with: pip install unstructured pdf2image pytesseract")
            raise
    
    def process_pdf(self, file_path: str) -> str:
        """Extract text from PDF files"""
        tracker = LatencyTracker().start()
        
        try:
            elements = self.partition_pdf(file_path)
            text = "\n\n".join([str(element) for element in elements])
            
            tracker.end(f"PDF processing ({os.path.basename(file_path)})")
            return text
        except Exception as e:
            logger.error(f"Error processing PDF {file_path}: {str(e)}")
            raise

##### Text Splitting

In [5]:
class TextSplitter:
    """Handles document chunking using various strategies"""
    
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        try:
            from langchain_text_splitters import RecursiveCharacterTextSplitter
            self.splitter = RecursiveCharacterTextSplitter(
                chunk_size=self.chunk_size,
                chunk_overlap=self.chunk_overlap,
                separators=["\n\n", "\n", ". ", " ", ""]
            )
        except ImportError:
            logger.error("langchain_text_splitters not installed. Install with: pip install langchain-text-splitters")
            raise
    
    def split_text(self, text: str, metadata: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        """Split text into chunks with metadata"""
        tracker = LatencyTracker().start()
        
        if metadata is None:
            metadata = {}
            
        chunks = self.splitter.create_documents([text], [metadata])
        chunks_with_metadata = []
        
        for i, chunk in enumerate(chunks):
            chunks_with_metadata.append({
                "id": f"{metadata.get('source', 'doc')}_{i}",
                "text": chunk.page_content,
                "metadata": {**chunk.metadata, "chunk_id": i}
            })
        
        tracker.end(f"Text splitting ({len(chunks_with_metadata)} chunks)")
        return chunks_with_metadata

##### EMBEDDING MODEL

In [6]:
class EmbeddingModel:
    """Handles text embedding using Sentence Transformers"""
    
    def __init__(self, model_name: str = "BAAI/bge-small-en-v1.5"):
        self.model_name = model_name
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        logger.info(f"Using device: {self.device} for embeddings")
        
        try:
            from sentence_transformers import SentenceTransformer
            tracker = LatencyTracker().start()
            self.model = SentenceTransformer(model_name, device=self.device)
            tracker.end(f"Loading embedding model {model_name}")
        except ImportError:
            logger.error("sentence_transformers not installed. Install with: pip install sentence-transformers")
            raise
    
    def embed_texts(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for a list of texts"""
        tracker = LatencyTracker().start()
        
        # Use batching for efficiency
        batch_size = 32
        all_embeddings = []
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i+batch_size]
            with torch.no_grad():
                embeddings = self.model.encode(batch, convert_to_tensor=True)
                embeddings = embeddings.cpu().numpy().tolist()
                all_embeddings.extend(embeddings)
        
        tracker.end(f"Embedding {len(texts)} texts")
        return all_embeddings

    def embed_query(self, query: str) -> List[float]:
        """Generate embedding for a single query"""
        tracker = LatencyTracker().start()
        
        with torch.no_grad():
            embedding = self.model.encode(query)
            
        if isinstance(embedding, torch.Tensor):
            embedding = embedding.cpu().numpy().tolist()
            
        tracker.end("Query embedding")
        return embedding

##### VECTOR DATABASE

In [7]:
class VectorStore:
    """Handles vector storage and retrieval using Qdrant"""
    
    def __init__(self, collection_name: str = "documents", vector_size: int = 384):
        self.collection_name = collection_name
        self.vector_size = vector_size
        
        try:
            from qdrant_client import QdrantClient
            from qdrant_client.http import models
            
            # Use local Qdrant instance (can be replaced with cloud URL)
            self.client = QdrantClient(":memory:")  # In-memory for testing; use location="./qdrant_data" for persistence
            
            # Create collection if it doesn't exist
            self.client.recreate_collection(
                collection_name=self.collection_name,
                vectors_config=models.VectorParams(
                    size=self.vector_size,
                    distance=models.Distance.COSINE
                )
            )
            
            logger.info(f"Initialized Qdrant collection: {collection_name}")
            
        except ImportError:
            logger.error("qdrant_client not installed. Install with: pip install qdrant-client")
            raise
    
    def add_documents(self, documents: List[Dict[str, Any]], embeddings: List[List[float]]):
        """Add documents and their embeddings to the vector store"""
        tracker = LatencyTracker().start()
        
        from qdrant_client.http import models
        
        # Prepare points for batch upload
        points = []
        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            points.append(models.PointStruct(
                id=i,
                vector=embedding,
                payload={
                    "text": doc["text"],
                    "metadata": doc["metadata"]
                }
            ))
        
        # Upload in batch
        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )
        
        tracker.end(f"Adding {len(documents)} documents to vector store")
    
    def semantic_search(self, query_embedding: List[float], top_k: int = 5) -> List[Dict[str, Any]]:
        """Retrieve documents using semantic similarity"""
        tracker = LatencyTracker().start()
        
        search_result = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_embedding,
            limit=top_k
        )
        
        results = []
        for res in search_result:
            results.append({
                "id": res.id,
                "text": res.payload["text"],
                "metadata": res.payload["metadata"],
                "score": res.score
            })
        
        tracker.end(f"Semantic search (top-{top_k})")
        return results

##### BM25 LEXICAL SEARCH

In [8]:
class BM25Retriever:
    """Implements BM25 for lexical search"""
    
    def __init__(self):
        try:
            from rank_bm25 import BM25Okapi
            self.BM25Okapi = BM25Okapi
            self.bm25 = None
            self.documents = []
            self.tokenized_corpus = []
        except ImportError:
            logger.error("rank_bm25 not installed. Install with: pip install rank-bm25")
            raise
    
    def add_documents(self, documents: List[Dict[str, Any]]):
        """Index documents for BM25 retrieval"""
        tracker = LatencyTracker().start()
        
        self.documents = documents
        
        # Tokenize and index documents
        import nltk
        try:
            nltk.data.find('tokenizers/punkt')
        except LookupError:
            nltk.download('punkt', quiet=True)
        
        from nltk.tokenize import word_tokenize
        
        self.tokenized_corpus = [word_tokenize(doc["text"].lower()) for doc in documents]
        self.bm25 = self.BM25Okapi(self.tokenized_corpus)
        
        tracker.end(f"Indexing {len(documents)} documents for BM25")
    
    def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """Retrieve documents using BM25 lexical similarity"""
        tracker = LatencyTracker().start()
        
        from nltk.tokenize import word_tokenize
        tokenized_query = word_tokenize(query.lower())
        
        # Get BM25 scores
        doc_scores = self.bm25.get_scores(tokenized_query)
        
        # Get top_k documents
        top_indices = doc_scores.argsort()[-top_k:][::-1]
        
        results = []
        for i in top_indices:
            results.append({
                "id": i,
                "text": self.documents[i]["text"],
                "metadata": self.documents[i]["metadata"],
                "score": float(doc_scores[i])  # Convert numpy float to Python float
            })
        
        tracker.end(f"BM25 search (top-{top_k})")
        return results

##### HYBRID RETRIEVAL

In [9]:
class HybridRetriever:
    """Implements hybrid retrieval combining BM25 and embedding search"""
    
    def __init__(
        self, 
        vector_store: VectorStore, 
        bm25_retriever: BM25Retriever,
        embedding_model: EmbeddingModel,
        bm25_weight: float = 0.4,
        semantic_weight: float = 0.6,
    ):
        self.vector_store = vector_store
        self.bm25_retriever = bm25_retriever
        self.embedding_model = embedding_model
        self.bm25_weight = bm25_weight
        self.semantic_weight = semantic_weight
    
    def retrieve(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """Perform hybrid retrieval"""
        tracker = LatencyTracker().start()
        
        # Get query embedding for semantic search
        query_embedding = self.embedding_model.embed_query(query)
        
        # Retrieve from both sources (get more results than needed for re-ranking)
        retrieval_k = min(top_k * 2, 10)  # Get more docs for re-ranking
        semantic_results = self.vector_store.semantic_search(query_embedding, retrieval_k)
        bm25_results = self.bm25_retriever.search(query, retrieval_k)
        
        # Normalize scores from both retrievers (min-max normalization)
        def normalize_scores(results):
            scores = [r["score"] for r in results]
            if not scores:
                return results
            min_score = min(scores)
            max_score = max(scores)
            score_range = max_score - min_score
            
            if score_range > 0:
                for r in results:
                    r["normalized_score"] = (r["score"] - min_score) / score_range
            else:
                for r in results:
                    r["normalized_score"] = 1.0
                    
            return results
        
        semantic_results = normalize_scores(semantic_results)
        bm25_results = normalize_scores(bm25_results)
        
        # Create a unified document pool with combined scores
        doc_map = {}
        
        # Add semantic results
        for doc in semantic_results:
            doc_id = doc["id"]
            doc_map[doc_id] = {
                "id": doc_id,
                "text": doc["text"],
                "metadata": doc["metadata"],
                "semantic_score": doc["normalized_score"] * self.semantic_weight,
                "bm25_score": 0,  # Will be updated if in BM25 results
                "combined_score": doc["normalized_score"] * self.semantic_weight
            }
        
        # Add/update with BM25 results
        for doc in bm25_results:
            doc_id = doc["id"]
            if doc_id in doc_map:
                # Update existing entry
                doc_map[doc_id]["bm25_score"] = doc["normalized_score"] * self.bm25_weight
                doc_map[doc_id]["combined_score"] += doc["normalized_score"] * self.bm25_weight
            else:
                # New entry
                doc_map[doc_id] = {
                    "id": doc_id,
                    "text": doc["text"],
                    "metadata": doc["metadata"],
                    "semantic_score": 0,  # Not in semantic results
                    "bm25_score": doc["normalized_score"] * self.bm25_weight,
                    "combined_score": doc["normalized_score"] * self.bm25_weight
                }
        
        # Sort by combined score and take top_k
        ranked_results = sorted(
            doc_map.values(), 
            key=lambda x: x["combined_score"], 
            reverse=True
        )[:top_k]
        
        tracker.end(f"Hybrid retrieval (top-{top_k})")
        return ranked_results

##### LLM INTERFACE

In [17]:
class LLMInterface:
    """Interface for interacting with LLMs"""
    
    def __init__(self, model_name: str = "mistral-7b-instruct-v0.2.Q4_K_M.gguf"):
        self.model_name = model_name
        
        # Determine if we're using a local GGUF model or an API
        if model_name.endswith(".gguf"):
            self._init_local_llm(model_name)
        else:
            raise ValueError("Currently only supporting local GGUF models. Use a .gguf file.")
    
    def _init_local_llm(self, model_name: str):
        """Initialize a local LLM using llama.cpp Python bindings"""
        try:
            from llama_cpp import Llama
            
            # Check if model exists in the models directory
            model_dir = os.path.join(os.getcwd(), "models")
            model_path = os.path.join(model_dir, model_name)
            
            if not os.path.exists(model_path):
                logger.warning(f"Model file not found at {model_path}. Please download it first.")
                logger.info(f"You can download Mistral 7B Instruct GGUF from: https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF")
                model_path = model_name  # Try with just the name in case it's in PATH
            
            tracker = LatencyTracker().start()
            
            # Load model with optimal parameters for CPU inference
            self.llm = Llama(
                model_path=model_path,
                n_ctx=4096,         # Context window
                n_threads=os.cpu_count() or 4,  # Use all CPU cores
                n_batch=512,        # Batch size for prompt processing
                use_mlock=True      # Keep model in RAM
            )
            
            tracker.end(f"Loading LLM model {model_name}")
            
        except ImportError:
            logger.error("llama-cpp-python not installed. Install with: pip install llama-cpp-python")
            raise
    
    def generate_standalone_question(self, current_question: str, chat_history: List[Dict[str, str]]) -> str:
        """Generate a standalone question from a follow-up question and chat history"""
        tracker = LatencyTracker().start()
        
        # Format chat history for context
        history_text = ""
        for turn in chat_history[-3:]:  # Just use last 3 turns for efficiency
            if "user" in turn:
                history_text += f"User: {turn['user']}\n"
            if "assistant" in turn:
                history_text += f"Assistant: {turn['assistant']}\n"
        
        # Create prompt
        prompt = f"""[INST] Given the following conversation history and a follow-up question, rephrase the follow-up question to be a standalone question that includes all necessary context. Your task is to create a complete, standalone question.

Conversation History:
{history_text}

Follow-up Question: {current_question}

Standalone Question: [/INST]"""
        
        # Generate response
        response = self.llm(
            prompt,
            max_tokens=256,
            temperature=0.1,
            stop=["</s>"]
        )
        
        standalone_question = response["choices"][0]["text"].strip()
        
        tracker.end("Generating standalone question")
        logger.info(f"Follow-up: '{current_question}' -> Standalone: '{standalone_question}'")
        
        return standalone_question
    
    def generate_answer(self, query: str, retrieved_docs: List[Dict[str, Any]]) -> str:
        """Generate an answer based on the query and retrieved documents"""
        tracker = LatencyTracker().start()
        
        # Format retrieved documents
        context = ""
        for i, doc in enumerate(retrieved_docs, 1):
            source = doc["metadata"].get("source", "Unknown")
            context += f"\nDocument {i} (Source: {source}):\n{doc['text']}\n"
        
        # Create prompt 
        prompt = f"""[INST] Answer the following question based only on the provided documents. If the documents don't contain the necessary information to answer the question, say "I don't have enough information to answer this question."

Documents:
{context}

Question: {query}

Answer: [/INST]"""
        
        # Generate response
        response = self.llm(
            prompt,
            max_tokens=512,
            temperature=0.3,
            stop=["</s>"]
        )
        
        answer = response["choices"][0]["text"].strip()
        print(context)
        
        tracker.end("Generating answer")
        return answer

##### RAG SYSTEM

In [11]:
class RAGSystem:
    """Main RAG system that orchestrates the entire process"""
    
    def __init__(self, top_k: int = 3):
        self.top_k = top_k
        
        # Initialize components
        self.document_processor = DocumentProcessor()
        self.text_splitter = TextSplitter(chunk_size=512, chunk_overlap=50)
        self.embedding_model = EmbeddingModel(model_name="BAAI/bge-small-en-v1.5")
        self.vector_store = VectorStore(
            collection_name="documents",
            vector_size=384  # BGE-small embedding dimension
        )
        self.bm25_retriever = BM25Retriever()
        self.hybrid_retriever = HybridRetriever(
            vector_store=self.vector_store,
            bm25_retriever=self.bm25_retriever,
            embedding_model=self.embedding_model,
            bm25_weight=0.4,
            semantic_weight=0.6,
        )
        self.llm = LLMInterface(model_name="mistral-7b-instruct-v0.2.Q4_K_M.gguf")
        
        # Chat history for conversational context
        self.chat_history = []
    
    def ingest_documents(self, file_paths: List[str]):
        """Process and index documents"""
        total_tracker = LatencyTracker().start()
        all_chunks = []
        
        # Process each document
        for file_path in file_paths:
            if file_path.lower().endswith(".pdf"):
                text = self.document_processor.process_pdf(file_path)
                chunks = self.text_splitter.split_text(
                    text,
                    metadata={"source": os.path.basename(file_path)}
                )
                all_chunks.extend(chunks)
            else:
                logger.warning(f"Unsupported file type: {file_path}")
        
        # Extract texts for embedding
        texts = [chunk["text"] for chunk in all_chunks]
        
        # Generate embeddings
        embeddings = self.embedding_model.embed_texts(texts)
        
        # Index in both vector store and BM25
        self.vector_store.add_documents(all_chunks, embeddings)
        self.bm25_retriever.add_documents(all_chunks)
        
        total_tracker.end(f"Ingesting {len(file_paths)} documents ({len(all_chunks)} chunks)")
        return len(all_chunks)
    
    def query(self, question: str) -> Dict[str, Any]:
        """Process a query and return results"""
        total_tracker = LatencyTracker().start()
        
        # Handle follow-up questions
        if self.chat_history:
            standalone_question = self.llm.generate_standalone_question(
                question, 
                self.chat_history
            )
        else:
            standalone_question = question
        
        # Retrieve relevant documents
        retrieved_docs = self.hybrid_retriever.retrieve(
            standalone_question, 
            top_k=self.top_k
        )
        
        # Generate answer
        answer = self.llm.generate_answer(standalone_question, retrieved_docs)
        
        # Update chat history
        self.chat_history.append({"user": question})
        self.chat_history.append({"assistant": answer})
        
        # Prepare sources information for citation
        sources = []
        for doc in retrieved_docs:
            source = doc["metadata"].get("source", "Unknown")
            if source not in [s["name"] for s in sources]:
                sources.append({
                    "name": source,
                    "score": doc["combined_score"]
                })
        
        result = {
            "question": question,
            "standalone_question": standalone_question,
            "answer": answer,
            "sources": sources,
            "latency": total_tracker.end("Total query processing")
        }
        
        return result

##### EVALUATION METRICS

In [12]:
class RAGEvaluator:
    """Evaluates RAG system performance"""
    
    def __init__(self, rag_system: RAGSystem):
        self.rag_system = rag_system
        
        try:
            import nltk
            try:
                nltk.data.find('tokenizers/punkt')
                nltk.data.find('metrics/bleu_ref.txt')
            except LookupError:
                nltk.download('punkt', quiet=True)
                nltk.download('bleu_ref', quiet=True)
                
            import rouge
            self.rouge = rouge.Rouge()
            
        except ImportError:
            logger.error("Evaluation packages not installed. Install with: pip install nltk rouge-score")
            raise
    
    def evaluate_retrieval(self, query: str, relevant_doc_ids: List[str], retrieved_docs: List[Dict[str, Any]]) -> Dict[str, float]:
        """Evaluate retrieval performance using precision, recall, f1"""
        retrieved_ids = [str(doc["id"]) for doc in retrieved_docs]
        
        # Calculate metrics
        true_positives = len(set(retrieved_ids).intersection(set(relevant_doc_ids)))
        precision = true_positives / len(retrieved_ids) if retrieved_ids else 0
        recall = true_positives / len(relevant_doc_ids) if relevant_doc_ids else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        return {
            "precision": precision,
            "recall": recall,
            "f1_score": f1,
        }
    
    def evaluate_generation(self, generated_answer: str, reference_answer: str) -> Dict[str, float]:
        """Evaluate answer generation using BLEU and ROUGE"""
        import nltk
        from nltk.translate.bleu_score import sentence_bleu
        
        # Prepare for BLEU
        reference_tokens = [nltk.word_tokenize(reference_answer.lower())]
        generated_tokens = nltk.word_tokenize(generated_answer.lower())
        
        # Calculate BLEU score
        bleu_score = sentence_bleu(reference_tokens, generated_tokens)
        
        # Calculate ROUGE scores
        try:
            rouge_scores = self.rouge.get_scores(generated_answer, reference_answer)[0]
        except:
            rouge_scores = {"rouge-1": {"f": 0}, "rouge-2": {"f": 0}, "rouge-l": {"f": 0}}
        
        return {
            "bleu": bleu_score,
            "rouge1_f1": rouge_scores["rouge-1"]["f"],
            "rouge2_f1": rouge_scores["rouge-2"]["f"],
            "rougeL_f1": rouge_scores["rouge-l"]["f"],
        }
    
    def evaluate_latency(self, result: Dict[str, Any]) -> Dict[str, float]:
        """Evaluate system latency"""
        return {
            "total_latency": result["latency"],
        }
    
    def run_benchmark(self, test_queries: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Run a benchmark on a set of test queries with reference answers"""
        results = {
            "retrieval": {
                "precision": [],
                "recall": [],
                "f1_score": [],
            },
            "generation": {
                "bleu": [],
                "rouge1_f1": [],
                "rouge2_f1": [],
                "rougeL_f1": [],
            },
            "latency": {
                "total_latency": [],
            }
        }
        
        for test in tqdm(test_queries, desc="Running benchmark"):
            query = test["query"]
            reference_answer = test["reference_answer"]
            relevant_doc_ids = test.get("relevant_doc_ids", [])
            
            # Run query
            result = self.rag_system.query(query)
            generated_answer = result["answer"]
            
            # We need to get the retrieved docs from the result for evaluation
            # This would require modifying the RAG system to return retrieved docs
            # For this example, we'll just use an empty list
            retrieved_docs = []  # Placeholder
            
            # Evaluate
            retrieval_metrics = self.evaluate_retrieval(query, relevant_doc_ids, retrieved_docs)
            generation_metrics = self.evaluate_generation(generated_answer, reference_answer)
            latency_metrics = self.evaluate_latency(result)
            
            # Collect results
            for k, v in retrieval_metrics.items():
                results["retrieval"][k].append(v)
            for k, v in generation_metrics.items():
                results["generation"][k].append(v)
            for k, v in latency_metrics.items():
                results["latency"][k].append(v)
        
        # Calculate averages
        summary = {
            "retrieval": {},
            "generation": {},
            "latency": {}
        }
        
        for category in results:
            for metric in results[category]:
                values = results[category][metric]
                summary[category][metric] = sum(values) / len(values) if values else 0
        
        return {
            "details": results,
            "summary": summary
        }

#### Main

In [18]:
def main():
    """Example usage of the RAG system"""
    
    # Initialize the system
    rag = RAGSystem(top_k=3)
    
    # Ingest documents (example paths)
    document_paths = [
        "Documents/EveriseHandbook.pdf",
    ]
    
    # Check if documents exist
    existing_docs = [path for path in document_paths if os.path.exists(path)]
    if not existing_docs:
        logger.warning(f"No documents found at the specified paths. Skipping ingestion.")
    else:
        rag.ingest_documents(existing_docs)
    
    # Interactive querying
    print("\n===== RAG Conversational System Demo =====")
    print("Type 'exit' to quit\n")
    
    while True:
        question = input("\nYour question: ")
        if question.lower() in ["exit", "quit"]:
            break
            
        print("\nProcessing...")
        
        try:
            result = rag.query(question)
            
            print(f"\nAnswer: {result['answer']}")
            print(f"\nSources:")
            for source in result['sources']:
                print(f"- {source['name']} (score: {source['score']:.3f})")
            print(f"\nLatency: {result['latency']:.3f} seconds")
            
        except Exception as e:
            logger.error(f"Error processing query: {str(e)}")
            print(f"Error: {str(e)}")

if __name__ == "__main__":
    main()

2025-03-03 10:53:20,073 - rag_system - INFO - Using device: cpu for embeddings
2025-03-03 10:53:20,263 - sentence_transformers.SentenceTransformer - INFO - Load pretrained SentenceTransformer: BAAI/bge-small-en-v1.5
2025-03-03 10:53:27,297 - rag_system - INFO - LATENCY: Loading embedding model BAAI/bge-small-en-v1.5 took 7.1858 seconds
  self.client.recreate_collection(
2025-03-03 10:53:27,429 - rag_system - INFO - Initialized Qdrant collection: documents
llama_model_loader: loaded meta data with 24 key-value pairs and 291 tensors from C:\Users\Magrawal\Downloads\Experiments\Humana-poc\models\mistral-7b-instruct-v0.2.Q4_K_M.gguf (version GGUF V3 (latest))
llama_model_loader: Dumping metadata keys/values. Note: KV overrides do not apply in this output.
llama_model_loader: - kv   0:                       general.architecture str              = llama
llama_model_loader: - kv   1:                               general.name str              = mistralai_mistral-7b-instruct-v0.2
llama_model_l


===== RAG Conversational System Demo =====
Type 'exit' to quit




Your question:  can you explain reporting policy?



Processing...


Batches: 100%|███████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  3.53it/s]
2025-03-03 10:57:04,610 - rag_system - INFO - LATENCY: Query embedding took 0.3131 seconds
  search_result = self.client.search(
2025-03-03 10:57:04,641 - rag_system - INFO - LATENCY: Semantic search (top-6) took 0.0295 seconds
2025-03-03 10:57:04,657 - rag_system - INFO - LATENCY: BM25 search (top-6) took 0.0000 seconds
2025-03-03 10:57:04,657 - rag_system - INFO - LATENCY: Hybrid retrieval (top-3) took 0.3604 seconds
llama_perf_context_print:        load time =   75446.88 ms
llama_perf_context_print: prompt eval time =   75445.33 ms /   492 tokens (  153.34 ms per token,     6.52 tokens per second)
llama_perf_context_print:        eval time =   28411.57 ms /   112 runs   (  253.67 ms per token,     3.94 tokens per second)
llama_perf_context_print:       total time =  103998.89 ms /   604 tokens
2025-03-03 10:58:48,682 - rag_system - INFO - LATENCY: Generating answer


Document 1 (Source: EveriseHandbook.pdf):
policies referred to herein, you have an obligation to report it. To make a report, follow the

section of the Code: “Reporting Potential Code Violations.”

How will I know if there is a problem?

The Code attempts to deal with the most common issues that you may encounter, but it

cannot address every question that may arise. When you’re not sure what to do, ask yourself

the following questions:



Is it illegal?



Does it feel like the wrong thing to do?

Document 2 (Source: EveriseHandbook.pdf):
False, misleading, or artificial entries should never be made in the books and records

of the company for any reason.

9.

Reports and Complaints

Internal reporting is critical to the company’s success, and it is both expected and

valued. You are required to be proactive and promptly report any suspected

violations of this ABC Policy, or any illegal or unethical behavior of which you become

4150-0260-1799, v. 4

31

aware, including, but not 


Your question:  exit


In [19]:
"""
Self-Evaluation Module for RAG System
- Automatically generates test queries
- Creates synthetic reference answers
- Evaluates system without external data
"""

import json
import time
import random
from typing import List, Dict, Any
import logging

logger = logging.getLogger("rag_system")

class RAGSelfEvaluator:
    """Self-evaluation for RAG systems without external datasets"""
    
    def __init__(self, rag_system):
        self.rag_system = rag_system
        
        try:
            import nltk
            try:
                nltk.data.find('tokenizers/punkt')
                nltk.data.find('metrics/bleu_ref.txt')
            except LookupError:
                nltk.download('punkt', quiet=True)
                nltk.download('bleu_ref', quiet=True)
                
            import rouge
            self.rouge = rouge.Rouge()
            
        except ImportError:
            logger.error("Evaluation packages not installed. Install with: pip install nltk rouge-score")
            raise
    
    def generate_test_queries(self, num_queries: int = 5) -> List[Dict[str, Any]]:
        """Generate test queries based on documents in the system"""
        # Get all documents from the system
        # This assumes we can access the documents - we'll extract from BM25 retriever
        if not hasattr(self.rag_system, 'bm25_retriever') or not self.rag_system.bm25_retriever.documents:
            logger.warning("No documents found in the system for query generation")
            return []
        
        all_docs = self.rag_system.bm25_retriever.documents
        if not all_docs:
            logger.warning("No documents found in the system for query generation")
            return []
            
        test_queries = []
        
        # Extract key sentences from random documents
        for _ in range(num_queries):
            # Pick a random document
            doc = random.choice(all_docs)
            doc_text = doc["text"]
            
            # Split into sentences
            import nltk
            sentences = nltk.sent_tokenize(doc_text)
            
            if not sentences:
                continue
                
            # Pick a random sentence as the base for our query
            sentence = random.choice(sentences)
            
            # Create a question from the sentence
            question_types = [
                "What is ", "How does ", "Can you explain ", 
                "Tell me about ", "Why is ", "What are "
            ]
            
            # Extract key terms (nouns) from the sentence
            words = nltk.word_tokenize(sentence)
            tagged = nltk.pos_tag(words)
            
            # Find nouns
            nouns = [word for word, tag in tagged if tag.startswith('NN')]
            
            if not nouns:
                # If no nouns, just use the whole sentence
                subject = sentence
            else:
                # Use a random noun as the subject
                subject = random.choice(nouns)
            
            # Generate the question
            question = random.choice(question_types) + subject + "?"
            
            # The source document will be our "relevant" document
            relevant_doc_id = doc["id"]
            
            # Use the sentence as our "reference answer"
            reference_answer = sentence
            
            test_queries.append({
                "query": question,
                "relevant_doc_ids": [str(relevant_doc_id)],
                "reference_answer": reference_answer,
                "source_doc": doc["id"],
                "source_text": doc_text[:200] + "..." # First 200 chars for reference
            })
        
        return test_queries
    
    def evaluate_query(self, query: Dict[str, Any]) -> Dict[str, Any]:
        """Evaluate a single query"""
        start_time = time.time()
        
        # Run the query
        result = self.rag_system.query(query["query"])
        
        # Calculate latency
        latency = time.time() - start_time
        
        # Calculate ROUGE scores
        try:
            rouge_scores = self.rouge.get_scores(result["answer"], query["reference_answer"])[0]
        except:
            rouge_scores = {"rouge-1": {"f": 0}, "rouge-2": {"f": 0}, "rouge-l": {"f": 0}}
        
        # Check if source document was retrieved
        # We can't directly check this without modifying the RAG system
        # So we'll check if any key phrases from the source appear in the answer
        source_text = query["source_text"]
        key_phrases = nltk.word_tokenize(source_text)[:20]  # First 20 tokens
        
        # Count how many key phrases appear in the answer
        answer_tokens = nltk.word_tokenize(result["answer"].lower())
        key_phrase_overlap = sum(1 for token in key_phrases if token.lower() in answer_tokens)
        retrieval_score = key_phrase_overlap / max(1, len(key_phrases))
        
        evaluation = {
            "query": query["query"],
            "reference_answer": query["reference_answer"],
            "generated_answer": result["answer"],
            "latency": latency,
            "rouge1_f1": rouge_scores["rouge-1"]["f"],
            "rouge2_f1": rouge_scores["rouge-2"]["f"], 
            "rougeL_f1": rouge_scores["rouge-l"]["f"],
            "retrieval_score": retrieval_score,
            "sources": result.get("sources", [])
        }
        
        return evaluation
    
    def run_self_evaluation(self, num_queries: int = 5) -> Dict[str, Any]:
        """Run a complete self-evaluation"""
        # Generate test queries
        logger.info(f"Generating {num_queries} test queries")
        test_queries = self.generate_test_queries(num_queries)
        
        if not test_queries:
            return {"error": "Could not generate test queries. No documents available."}
        
        # Evaluate each query
        results = []
        for query in test_queries:
            logger.info(f"Evaluating query: {query['query']}")
            result = self.evaluate_query(query)
            results.append(result)
        
        # Calculate average scores
        avg_latency = sum(r["latency"] for r in results) / len(results)
        avg_rouge1 = sum(r["rouge1_f1"] for r in results) / len(results)
        avg_rouge2 = sum(r["rouge2_f1"] for r in results) / len(results)
        avg_rougeL = sum(r["rougeL_f1"] for r in results) / len(results)
        avg_retrieval = sum(r["retrieval_score"] for r in results) / len(results)
        
        summary = {
            "num_queries": len(results),
            "avg_latency": avg_latency,
            "avg_rouge1_f1": avg_rouge1,
            "avg_rouge2_f1": avg_rouge2,
            "avg_rougeL_f1": avg_rougeL,
            "avg_retrieval_score": avg_retrieval,
            "results": results
        }
        
        # Log summary
        logger.info("Evaluation Summary:")
        logger.info(f"  Number of Queries: {summary['num_queries']}")
        logger.info(f"  Average Latency: {summary['avg_latency']:.2f} seconds")
        logger.info(f"  Average ROUGE-1 F1: {summary['avg_rouge1_f1']:.4f}")
        logger.info(f"  Average ROUGE-L F1: {summary['avg_rougeL_f1']:.4f}")
        logger.info(f"  Average Retrieval Score: {summary['avg_retrieval_score']:.4f}")
        
        return summary