### Implementation

In [1]:
"""
RAG Evaluation System for Board Game Manuals
============================================
A retrieval-augmented generation (RAG) system with coverage-based evaluation.

Key Features:
- PDF text extraction with normalization
- Ground truth Q&A annotation integration using Aho-Corasick pattern matching
- Coverage-based relevance scoring (measures how much of a relevant span is in a chunk)
- DCG/nDCG metrics for retrieval quality evaluation

Adapted from: "RAG + Langchain Python Project: Easy AI/Chat For Your Docs"
https://www.youtube.com/watch?v=tcqEUSNCn8I

Author: [Your Name]
"""

import tempfile
import re
import json
import copy
# import shutil
import logging
from pathlib import Path
from typing import List, Dict, Any, Tuple

import numpy as np
import ahocorasick
from dotenv import load_dotenv

from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_community.vectorstores.utils import filter_complex_metadata

from langchain_classic.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from datasets import Dataset

# import gc

import numpy as np
from ragas import evaluate
from ragas.metrics import (
    answer_correctness,
    answer_relevancy,
    faithfulness,
    context_precision,
    context_recall,
)
from datasets import Dataset
from IPython.display import display

from itertools import product
import pandas as pd


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# =============================================================================
# LOGGING & CONFIGURATION
# =============================================================================

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Load environment variables (OpenAI API key)
load_dotenv()


# =============================================================================
# CUSTOM EXCEPTIONS
# =============================================================================

class RAGEvaluationError(Exception):
    """Base exception for RAG evaluation system"""
    pass


class DocumentLoadError(RAGEvaluationError):
    """Raised when document loading fails"""
    pass


class AnnotationError(RAGEvaluationError):
    """Raised when Q&A annotation processing fails"""
    pass


class ChunkingError(RAGEvaluationError):
    """Raised when document chunking fails"""
    pass


class VectorStoreError(RAGEvaluationError):
    """Raised when vector store operations fail"""
    pass


class EvaluationError(RAGEvaluationError):
    """Raised when metric calculation fails"""
    pass

In [3]:
# =============================================================================
# DOCUMENT LOADING & PREPROCESSING
# =============================================================================

def normalize_text(text: str) -> str:
    """
    Normalize text to handle encoding inconsistencies between PDF and JSON.
    
    This is critical because:
    - PDFs may have curly quotes/apostrophes: "", '', '
    - JSON files typically use straight quotes: ", '
    - Mismatches break pattern matching for ground truth annotation
    
    Args:
        text: Raw text string
        
    Returns:
        Normalized text with standardized quotes and collapsed whitespace
    """
    # Convert curly quotes to straight quotes
    text = text.replace("‚Äú", '"').replace("‚Äù", '"')
    text = text.replace("‚Äò", "'").replace("‚Äô", "'")
    
    # Collapse all whitespace (newlines, tabs, multiple spaces) to single space
    text = re.sub(r"\s+", " ", text)
    
    return text.strip()


def load_documents(pdf_path: str) -> List[Document]:
    """
    Load PDF and clean text content.
    
    Why cleaning matters:
    - PDFs often have inconsistent spacing/newlines
    - Normalized text improves embedding quality
    - Standardized format makes pattern matching reliable
    
    Args:
        pdf_path: Path to the board game manual PDF
        
    Returns:
        List of Document objects (one per page) with cleaned text
        
    Raises:
        DocumentLoadError: If PDF cannot be loaded or is empty
    """
    try:
        # Validate file exists
        if not Path(pdf_path).exists():
            raise DocumentLoadError(f"PDF file not found: {pdf_path}")
        
        logger.info(f"Loading PDF from: {pdf_path}")
        loader = PyPDFLoader(pdf_path)
        page_docs = loader.load()
        
        if not page_docs:
            raise DocumentLoadError(f"No content extracted from PDF: {pdf_path}")
        
        logger.info(f"Loaded {len(page_docs)} pages from PDF")
        
        # Clean text and filter metadata
        for page_doc in page_docs:
            # Normalize whitespace
            clean_text = normalize_text(page_doc.page_content)
            page_doc.page_content = clean_text
            
            # Keep only essential metadata to avoid Chroma serialization issues
            allowed_keys = {"source", "page"}
            page_doc.metadata = {
                k: v for k, v in page_doc.metadata.items() 
                if k in allowed_keys
            }
        
        return page_docs
        
    except Exception as e:
        if isinstance(e, DocumentLoadError):
            raise
        logger.error(f"Unexpected error loading PDF: {str(e)}")
        raise DocumentLoadError(f"Failed to load PDF: {str(e)}") from e

In [4]:
# =============================================================================
# GROUND TRUTH ANNOTATION INTEGRATION
# =============================================================================

def load_json(json_path: str) -> Dict[str, Any]:
    """
    Load JSON file containing training Q&A pairs.
    
    Args:
        json_path: Path to JSON file
        
    Returns:
        Parsed JSON data
        
    Raises:
        AnnotationError: If file cannot be loaded or parsed
    """
    try:
        if not Path(json_path).exists():
            raise AnnotationError(f"JSON file not found: {json_path}")
        
        with open(json_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        
        logger.info(f"Loaded JSON from: {json_path}")
        return data
        
    except json.JSONDecodeError as e:
        logger.error(f"Invalid JSON format: {str(e)}")
        raise AnnotationError(f"Failed to parse JSON: {str(e)}") from e
    except Exception as e:
        if isinstance(e, AnnotationError):
            raise
        logger.error(f"Unexpected error loading JSON: {str(e)}")
        raise AnnotationError(f"Failed to load JSON: {str(e)}") from e


def load_training_qa_to_docs(training_qas_path: str, page_docs: List[Document]) -> List[Document]:
    """
    Annotate documents with ground truth relevance spans using Aho-Corasick.
    
    Why Aho-Corasick?
    - Efficient multi-pattern matching: O(n + m + z) vs O(n*m) for naive search
    - n = document length, m = total pattern length, z = matches
    - Critical when searching 100+ patterns across large documents
    
    Process:
    1. Build automaton with all relevant chunks from training Q&A
    2. Scan each page once to find all matching spans
    3. Store span metadata (qa_id, page, start/end indices)
    
    Args:
        training_qas_path: Path to JSON with training Q&A pairs
        page_docs: List of Document objects from PDF
        
    Returns:
        Documents annotated with relevance_spans in metadata
        
    Raises:
        AnnotationError: If annotation process fails
    """
    try:
        training_data = load_json(training_qas_path)
        training_qas = training_data.get("training_qas", [])
        
        if not training_qas:
            logger.warning("No training Q&As found in JSON")
            return page_docs
        
        logger.info(f"Processing {len(training_qas)} training Q&A pairs")
        
        # Build Aho-Corasick automaton for efficient pattern matching
        automaton = ahocorasick.Automaton()
        
        for qa_idx, qa in enumerate(training_qas):
            qa["relevance_spans"] = []  # Initialize spans list
            
            for chunk_text in qa.get("relevant_chunks", []):
                chunk_text_normalized = normalize_text(chunk_text)
                
                # Store tuple: (qa_index, original_chunk_text)
                # qa_index allows us to map back to the question
                automaton.add_word(chunk_text_normalized, (qa_idx, chunk_text_normalized))
        
        automaton.make_automaton()  # Compile the automaton
        logger.info("Aho-Corasick automaton built successfully")
        
        # Search all pages for relevant spans
        total_spans = 0
        for page_doc in page_docs:
            page_text = normalize_text(page_doc.page_content)
            page_num = page_doc.metadata.get("page")
            page_doc.metadata["relevance_spans"] = []
            
            # Iterate through all matches in this page
            for end_idx, (qa_idx, chunk_text) in automaton.iter(page_text):
                start_idx = end_idx - len(chunk_text) + 1  # +1 because end_idx is inclusive
                
                span = {
                    "qa_id": training_qas[qa_idx]["id"],
                    "page": page_num,
                    "start": start_idx,
                    "end": end_idx + 1  # Make end exclusive for easier indexing
                }
                page_doc.metadata["relevance_spans"].append(span)
                total_spans += 1
        
        logger.info(f"Found {total_spans} relevance spans across all pages")
        return page_docs
        
    except Exception as e:
        if isinstance(e, AnnotationError):
            raise
        logger.error(f"Annotation failed: {str(e)}")
        raise AnnotationError(f"Failed to annotate documents: {str(e)}") from e

In [None]:
# =============================================================================
# DOCUMENT CHUNKING
# =============================================================================

def split_text(docs: List[Document], chunk_size: int = 300, chunk_overlap: int = 30) -> List[Document]:
    """
    Split documents into smaller chunks for embedding.
    
    Why chunk?
    - Embeddings work better on focused, semantic units
    - Smaller chunks = more precise retrieval
    - Overlap ensures we don't split important context
    
    Why these defaults?
    - chunk_size=300: ~75 tokens, good for rule-specific content
    - chunk_overlap=30: 10% overlap preserves context at boundaries
    
    Args:
        docs: List of Document objects
        chunk_size: Target size for each chunk (characters)
        chunk_overlap: Overlap between consecutive chunks
        
    Returns:
        List of chunk Documents with start_index in metadata
        
    Raises:
        ChunkingError: If chunking process fails
    """
    try:
        if chunk_size <= 0:
            raise ChunkingError(f"chunk_size must be positive, got {chunk_size}")
        
        if chunk_overlap < 0:
            raise ChunkingError(f"chunk_overlap cannot be negative, got {chunk_overlap}")
        
        if chunk_overlap >= chunk_size:
            raise ChunkingError(
                f"chunk_overlap ({chunk_overlap}) must be less than "
                f"chunk_size ({chunk_size})"
            )
        
        logger.info(f"Splitting documents with chunk_size={chunk_size}, overlap={chunk_overlap}")
        
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,  # Use character count
            add_start_index=True  # Critical: needed for coverage calculation
        )
        
        chunks = splitter.split_documents(docs)
        logger.info(f"Created {len(chunks)} chunks")
        
        return chunks
        
    except Exception as e:
        if isinstance(e, ChunkingError):
            raise
        logger.error(f"Chunking failed: {str(e)}")
        raise ChunkingError(f"Failed to split documents: {str(e)}") from e


# =============================================================================
# COVERAGE CALCULATION
# =============================================================================

def compute_overlap(span_start: int, span_end: int, chunk_start: int, chunk_end: int) -> int:
    """
    Compute character overlap between a relevance span and a chunk.
    
    Example:
        Span:  [10, 30)  (relevant text from annotation)
        Chunk: [20, 50)  (text chunk)
        Overlap: [20, 30) = 10 characters
    
    Args:
        span_start: Start index of relevance span
        span_end: End index of relevance span (exclusive)
        chunk_start: Start index of chunk
        chunk_end: End index of chunk (exclusive)
        
    Returns:
        Number of overlapping characters
    """
    overlap_start = max(span_start, chunk_start)
    overlap_end = min(span_end, chunk_end)
    return max(0, overlap_end - overlap_start)


def generate_relevant_chunks_with_coverage(chunks: List[Document]) -> List[Document]:
    """
    Calculate coverage scores for chunks containing ground truth spans.
    
    Coverage = (overlap_length / relevance_span_length)
    
    Why coverage?
    - Measures "how much of the relevant content is in this chunk"
    - Coverage=1.0: entire relevant span is in the chunk (perfect)
    - Coverage=0.5: only half the relevant content is present
    - Coverage=0.0: chunk doesn't contain relevant content
    
    This is better than binary relevance because:
    - Distinguishes between partial and complete matches
    - Handles cases where spans cross chunk boundaries
    - Provides granular relevance scores for nDCG calculation
    
    Args:
        chunks: List of Document chunks
        
    Returns:
        List of Documents containing only relevant chunks with coverage scores
        
    Raises:
        EvaluationError: If coverage calculation fails
    """
    try:
        relevant_chunks = []
        
        for chunk_idx, chunk in enumerate(chunks):
            chunk_start = chunk.metadata.get("start_index", 0)
            chunk_end = chunk_start + len(chunk.page_content)
            relevance_spans = chunk.metadata.get("relevance_spans", [])
            
            # Skip chunks without any ground truth annotations
            if not relevance_spans:
                continue
            
            # Create copy to avoid modifying original
            annotated_chunk = copy.deepcopy(chunk)
            annotated_chunk.metadata["coverage_per_query"] = []
            
            for span in relevance_spans:
                qa_id = span["qa_id"]
                
                # Calculate how much of the span overlaps with this chunk
                overlap_len = compute_overlap(
                    span["start"], span["end"], 
                    chunk_start, chunk_end
                )
                
                relevance_len = span["end"] - span["start"]
                
                # Avoid division by zero
                if relevance_len == 0:
                    logger.warning(f"Zero-length relevance span for qa_id={qa_id}")
                    continue
                
                coverage = overlap_len / relevance_len
                
                # Skip queries with no overlap
                if coverage == 0:
                    continue
                
                annotated_chunk.metadata["coverage_per_query"].append({
                    "qa_id": qa_id,
                    "coverage": coverage
                })
            
            # Only keep chunks that have at least one relevant query
            if annotated_chunk.metadata["coverage_per_query"]:
                annotated_chunk.metadata["chunk_id"] = chunk_idx
                relevant_chunks.append(annotated_chunk)
        
        logger.info(f"Found {len(relevant_chunks)} relevant chunks out of {len(chunks)} total")
        return relevant_chunks
        
    except Exception as e:
        logger.error(f"Coverage calculation failed: {str(e)}")
        raise EvaluationError(f"Failed to calculate coverage: {str(e)}") from e


# =============================================================================
# VECTOR STORE OPERATIONS
# =============================================================================

def prepare_chunks_for_chroma(chunks: List[Document]) -> List[Document]:
    """
    Filter complex metadata for Chroma compatibility.
    
    Why needed?
    - Chroma only supports simple types (str, int, float, bool)
    - Complex types (lists, dicts) cause serialization errors
    - We keep complex metadata in separate 'relevant_chunks' list
    
    Args:
        chunks: List of Document chunks
        
    Returns:
        Documents with filtered metadata safe for Chroma
        
    Raises:
        VectorStoreError: If metadata filtering fails
    """
    try:
        retrievable_docs = []
        
        for chunk_idx, chunk in enumerate(chunks):
            # Filter to simple metadata types
            filtered_doc = filter_complex_metadata([chunk])[0]
            
            # Add chunk_id for later lookup
            filtered_doc.metadata["chunk_id"] = chunk_idx
            
            retrievable_docs.append(filtered_doc)
        
        logger.info(f"Prepared {len(retrievable_docs)} chunks for Chroma")
        return retrievable_docs
        
    except Exception as e:
        logger.error(f"Metadata filtering failed: {str(e)}")
        raise VectorStoreError(f"Failed to prepare chunks: {str(e)}") from e



def save_to_chroma(chunks: List[Document], embedding_model: str = "text-embedding-ada-002") -> Chroma:
    """
    Create and persist Chroma vector store.
    
    Note: This clears existing database!
    - Ensures fresh embeddings
    - Avoids stale data issues
    - For production, consider incremental updates
    
    Args:
        chunks: List of Document chunks (with simple metadata)
        embedding_model: OpenAI embedding model name
        
    Returns:
        Initialized Chroma vector store
        
    Raises:
        VectorStoreError: If vector store creation fails
    """
    try:
        # Create an isolated temporary directory
        tmp_dir = tempfile.mkdtemp(prefix="chroma_eval_")

        # Initialize embedding model
        embeddings = OpenAIEmbeddings(model=embedding_model)

        # Create the Chroma vector store from documents
        db = Chroma.from_documents(
            documents=chunks,
            embedding=embeddings,
            persist_directory=tmp_dir,
            collection_metadata={"hnsw:space": "l2"}
        )

        print(f"[INFO] Temporary Chroma DB created at: {tmp_dir}")
        return db, tmp_dir
        
    except Exception as e:
        logger.error(f"Vector store creation failed: {str(e)}")
        raise VectorStoreError(f"Failed to create vector store: {str(e)}") from e

def retrieve_top_k(
    db: Chroma, 
    query: str, 
    k: int = 3
) -> List[Tuple[str, str, int, float]]:
    """
    Retrieve top-k most similar chunks for a query.
    
    Returns:
        List of tuples: (source, content, chunk_id, relevance_score)
        
    Raises:
        VectorStoreError: If retrieval fails
    """
    try:
        if k <= 0:
            raise VectorStoreError(f"k must be positive, got {k}")
        
        logger.debug(f"Retrieving top-{k} chunks for query: {query[:50]}...")
        
        results = db.similarity_search_with_relevance_scores(query, k=k)
        
        formatted_results = [
            (
                doc.metadata.get("source", "unknown"),
                doc.page_content,
                doc.metadata.get("chunk_id", -1),
                score
            )
            for doc, score in results
        ]
        
        return formatted_results
        
    except Exception as e:
        logger.error(f"Retrieval failed: {str(e)}")
        raise VectorStoreError(f"Failed to retrieve documents: {str(e)}") from e


# =============================================================================
# EVALUATION METRICS
# =============================================================================

def dcg(relevance_scores: List[float]) -> float:
    """
    Calculate Discounted Cumulative Gain.
    
    Formula: DCG = Œ£(rel_i / log2(i + 2)) for i in range(len(scores))
    
    Why log2(i + 2)?
    - Position 0: log2(2) = 1 (no discount)
    - Position 1: log2(3) = 1.58 (small discount)
    - Position 2: log2(4) = 2 (larger discount)
    - Later positions are increasingly discounted
    
    Args:
        relevance_scores: List of relevance scores (coverage values)
        
    Returns:
        DCG score
        
    Raises:
        EvaluationError: If calculation fails
    """
    try:
        if not relevance_scores:
            return 0.0
        
        dcg_value = np.sum([
            rel / np.log2(idx + 2)
            for idx, rel in enumerate(relevance_scores)
        ])
        
        return float(dcg_value)
        
    except Exception as e:
        logger.error(f"DCG calculation failed: {str(e)}")
        raise EvaluationError(f"Failed to calculate DCG: {str(e)}") from e


def ndcg_at_k(relevance_scores: List[float]) -> float:
    """
    Calculate Normalized Discounted Cumulative Gain.
    
    nDCG = DCG / IDCG
    
    Why normalize?
    - Makes scores comparable across queries
    - Range: [0, 1] where 1 = perfect ranking
    - Accounts for different numbers of relevant items
    
    Args:
        relevance_scores: List of relevance scores
        
    Returns:
        nDCG score between 0 and 1
        
    Raises:
        EvaluationError: If calculation fails
    """
    try:
        if not relevance_scores:
            return 0.0
        
        # Calculate DCG with actual ranking
        dcg_value = dcg(relevance_scores)
        
        # Calculate ideal DCG (perfect ranking)
        ideal_scores = sorted(relevance_scores, reverse=True)
        idcg_value = dcg(ideal_scores)
        
        # Avoid division by zero
        if idcg_value == 0:
            logger.warning("IDCG is 0, returning nDCG=0")
            return 0.0
        
        ndcg_value = dcg_value / idcg_value
        return float(ndcg_value)
        
    except Exception as e:
        logger.error(f"nDCG calculation failed: {str(e)}")
        raise EvaluationError(f"Failed to calculate nDCG: {str(e)}") from e


def get_coverage(chunk_id: int, qa_id: str, relevant_chunks: List[Document]) -> float:
    """
    Retrieve coverage score for a specific chunk and query.
    
    This is a lookup function that connects:
    - Retrieved chunk (by chunk_id from vector search)
    - Query (by qa_id from evaluation set)
    - Ground truth coverage (pre-computed in relevant_chunks)
    
    Args:
        chunk_id: ID of the retrieved chunk
        qa_id: ID of the query being evaluated
        relevant_chunks: List of annotated chunks with coverage scores
        
    Returns:
        Coverage score (0-1), or 0 if not found
    """
    for chunk in relevant_chunks:
        if chunk.metadata.get("chunk_id") != chunk_id:
            continue
        
        for coverage_entry in chunk.metadata.get("coverage_per_query", []):
            if coverage_entry["qa_id"] == qa_id:
                return coverage_entry["coverage"]
    
    # Return 0 if chunk has no coverage for this query
    return 0.0


# =============================================================================
# MAIN EVALUATION PIPELINE
# =============================================================================

def evaluate_rag_system(pdf_path: str, training_qa_path: str,chunk_size: int = 300,chunk_overlap: int = 30,
    k: int = 3, embedding_model: str = "text-embedding-ada-002", similarity_search: str = "cosine") -> Dict[str, Any]:
    """
    Run complete RAG evaluation pipeline.
    
    Pipeline:
    1. Load and clean PDF
    2. Annotate with ground truth Q&A
    3. Chunk documents
    4. Calculate coverage for relevant chunks
    5. Create vector store
    6. For each query: retrieve top-k and calculate metrics
    7. Report average DCG and nDCG
    
    Args:
        pdf_path: Path to board game manual PDF
        training_qa_path: Path to training Q&A JSON
        chunk_size: Size of text chunks
        chunk_overlap: Overlap between chunks
        k: Number of documents to retrieve
        embedding_model: OpenAI embedding model
        
    Returns:
        Dictionary with evaluation results
        
    Raises:
        RAGEvaluationError: If any pipeline stage fails
    """
    try:
        logger.info("=" * 60)
        logger.info("Starting RAG Evaluation Pipeline")
        logger.info("=" * 60)
        
        # Step 1: Load PDF
        docs = load_documents(pdf_path)
        
        # Step 2: Annotate with ground truth
        docs_with_qa = load_training_qa_to_docs(training_qa_path, docs)
        
        # Step 3: Chunk documents
        chunks = split_text(docs_with_qa, chunk_size, chunk_overlap)
        
        # Step 4: Calculate coverage for relevant chunks
        relevant_chunks = generate_relevant_chunks_with_coverage(chunks)
        
        # Step 5: Prepare and store in vector DB
        chunks_for_chroma = prepare_chunks_for_chroma(chunks)
        db, tmp_dir  = save_to_chroma(chunks_for_chroma, embedding_model, similarity_search)

        # Step 6: Load evaluation queries
        qa_data = load_json(training_qa_path)
        evaluation_qas = qa_data.get("training_qas", [])
        
        if not evaluation_qas:
            raise EvaluationError("No evaluation queries found in JSON")
        
        logger.info(f"Evaluating on {len(evaluation_qas)} queries")
        
        # Step 7: Evaluate each query
        dcg_values = []
        ndcg_values = []
        query_results = []
        
        for qa in evaluation_qas:
            qa_id = qa.get("id")
            question = qa.get("question")
            gt_answer = qa.get("answer")
            
            if not question:
                logger.warning(f"Skipping query with missing question: {qa_id}")
                continue
            
            # Retrieve top-k chunks
            top_k_results = retrieve_top_k(db, question, k=k)
            top_k = []

            # Calculate coverage scores for retrieved chunks
            coverage_scores = []
            for source, content, chunk_id, similarity_score in top_k_results:
                print(f"Similarity score: {similarity_score}")
                coverage = get_coverage(chunk_id, qa_id, relevant_chunks)
                coverage_scores.append(coverage)
                top_k.append(content)
            
            # Calculate metrics
            query_dcg = dcg(coverage_scores)
            query_ndcg = ndcg_at_k(coverage_scores)
            
            dcg_values.append(query_dcg)
            ndcg_values.append(query_ndcg)
            
            query_results.append({
                "qa_id": qa_id,
                "question": question,
                "top_k_content": top_k,
                "gt_answer": gt_answer,
                "coverage_scores": coverage_scores,
                "dcg": query_dcg,
                "ndcg": query_ndcg
            })
        
        # Calculate averages
        avg_dcg = float(np.mean(dcg_values))
        avg_ndcg = float(np.mean(ndcg_values))
        
        logger.info("=" * 60)
        logger.info(f"Average DCG:  {avg_dcg:.4f}")
        logger.info(f"Average nDCG: {avg_ndcg:.4f}")
        logger.info("=" * 60)
        
        return {
            "avg_dcg": avg_dcg,
            "avg_ndcg": avg_ndcg,
            "num_queries": len(evaluation_qas),
            "k": k,
            "chunk_size": chunk_size,
            "chunk_overlap": chunk_overlap,
            "query_results": query_results
        }
        
    except Exception as e:
        if isinstance(e, RAGEvaluationError):
            raise
        logger.error(f"Pipeline failed: {str(e)}")
        raise RAGEvaluationError(f"Evaluation pipeline failed: {str(e)}") from e


PROMPT_TEMPLATE = """
Answer the question based only on the following context:

{context}

---

Answer the question based on the above context: {question}
"""

# Generate the answer by feeding the LLM with prompt
def generate_answer(question: str, context: List[str], isprintprompt: bool=False) :
    # Generate the prompt template with context and query
    context_text = "\n\n---\n\n".join(context)
    prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
    prompt = prompt_template.format(context=context_text, question=question)
    if isprintprompt:
        print(prompt)

    # Implement the LLM and feed it with the prompt
    model = ChatOpenAI(model="gpt-3.5-turbo")
    return model.invoke(prompt) 

# =============================================================================
# MAIN EXECUTION
# =============================================================================

if __name__ == "__main__":
    # Configuration
    PDF_PATH = "data/BoardGamesRuleBook/CATAN.pdf"
    TRAINING_QA_PATH = "data/BoardGamesRuleBook/CATAN_train_small.json"
    CHUNK_SIZES = [300]
    CHUNK_OVERLAPS = [30]
    Ks = [3]
    
    retrieval_eval_results = []
    generation_eval_results = []

    # sync interface compatible with RAGas
    llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)  # Explicitly pass to model kwargs)  
    
    for CHUNK_SIZE, CHUNK_OVERLAP, k in product(CHUNK_SIZES, CHUNK_OVERLAPS, Ks):
        logger.info(f"Evaluating with parameters: chunk={CHUNK_SIZE}, overlap={CHUNK_OVERLAP}, top-k={k}")
        try:
            # Retrieval evaluation
            results = evaluate_rag_system(
                pdf_path=PDF_PATH,
                training_qa_path=TRAINING_QA_PATH,
                chunk_size=CHUNK_SIZE,
                chunk_overlap=CHUNK_OVERLAP,
                k=k
            )

            retrieval_eval_results.append({
                "chunk_size": CHUNK_SIZE,
                "overlap": CHUNK_OVERLAP,
                "top_k": k,
                **results,
            })

            # # Generation evaluation
            # evaluation_rows = []
            # for query_result in results.get("query_results"):
            #     question = query_result.get("question")
            #     top_k_content = query_result.get("top_k_content")
            #     gt_answer = query_result.get("gt_answer")

            #     answer = generate_answer(question, top_k_content)
            #     evaluation_rows.append({
            #                 "question": question,
            #                 "contexts": top_k_content,
            #                 "answer": answer.content if hasattr(answer, 'content') else str(answer),
            #                 "reference": gt_answer,
            #             })
            # ragas_eval_dataset = Dataset.from_list(evaluation_rows)

            # # Run evaluation
            # scores = evaluate(
            #     ragas_eval_dataset,
            #     metrics=[
            #         answer_correctness,
            #         answer_relevancy,
            #         faithfulness,
            #         context_precision,
            #         context_recall,
            #     ],
            #     llm=llm,  # pass the LLM explicitly
            # )

            # generation_eval_results.append({
            #     "chunk_size": CHUNK_SIZES,
            #     "chunk_overlap": CHUNK_OVERLAPS,
            #     "embedding_model": ["text-embedding-3-small"],
            #     "top_k": Ks,
            #     "answer_correctness_mean": np.mean(scores["answer_correctness"]),
            #     "answer_correctness_std": np.std(scores["answer_correctness"]),
            #     "answer_relevancy_mean": np.mean(scores["answer_relevancy"]),
            #     "answer_relevancy_std": np.std(scores["answer_relevancy"]),
            #     "faithfulness_mean": np.mean(scores["faithfulness"]),
            #     "faithfulness_std": np.std(scores["faithfulness"]),
            #     "context_precision_mean": np.mean(scores["context_precision"]),
            #     "context_precision_std": np.std(scores["context_precision"]),
            #     "context_recall_mean": np.mean(scores["context_recall"]),
            #     "context_recall_std": np.std(scores["context_recall"]),
            # })

            # Display results
            print("\n" + "=" * 60)
            print("EVALUATION RESULTS")
            print("=" * 60 + "\n")
            
        except RAGEvaluationError as e:
            logger.error(f"Evaluation failed: {str(e)}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")
            raise

    # Convert to DataFrame for easy comparison
    df = pd.DataFrame(retrieval_eval_results)
    df.to_csv("rag_retrieval_eval.csv", index=False)
    print("üìÅ Generation results saved to rag_retrieval_eval.csv")
    # # --- Step 4: Save and inspect ---
    # df = pd.DataFrame(generation_eval_results)
    # df.to_csv("rag_generation_eval.csv", index=False)
    # print("üìÅ Generation results saved to rag_generation_eval.csv")
    

2025-11-12 12:49:23,984 - INFO - Evaluating with parameters: chunk=300, overlap=30, top-k=3
2025-11-12 12:49:23,985 - INFO - Starting RAG Evaluation Pipeline
2025-11-12 12:49:23,986 - INFO - Loading PDF from: data/BoardGamesRuleBook/CATAN.pdf
2025-11-12 12:49:32,688 - INFO - Loaded 12 pages from PDF
2025-11-12 12:49:32,692 - INFO - Loaded JSON from: data/BoardGamesRuleBook/CATAN_train_small.json
2025-11-12 12:49:32,692 - INFO - Processing 10 training Q&A pairs
2025-11-12 12:49:32,693 - INFO - Aho-Corasick automaton built successfully
2025-11-12 12:49:32,695 - INFO - Found 11 relevance spans across all pages
2025-11-12 12:49:32,696 - INFO - Splitting documents with chunk_size=300, overlap=30
2025-11-12 12:49:32,705 - INFO - Created 100 chunks
2025-11-12 12:49:32,708 - INFO - Found 15 relevant chunks out of 100 total
2025-11-12 12:49:32,709 - INFO - Prepared 100 chunks for Chroma
2025-11-12 12:49:33,704 - INFO - Anonymized telemetry enabled. See                     https://docs.trychroma

[INFO] Temporary Chroma DB created at: C:\Users\khchu\AppData\Local\Temp\chroma_eval_ae8687qy
Similarity score: 0.8093101453211591
Similarity score: 0.7819016071926154
Similarity score: 0.7776557968939006


2025-11-12 12:49:36,480 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


Similarity score: 0.7506463472739066
Similarity score: 0.7444863324829484
Similarity score: 0.7410483086827935


2025-11-12 12:49:36,865 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


Similarity score: 0.7851456079756504
Similarity score: 0.7608682645901023
Similarity score: 0.7523307460746445


2025-11-12 12:49:37,196 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


Similarity score: 0.7680616358128506
Similarity score: 0.756628017675391
Similarity score: 0.7335621854502885


2025-11-12 12:49:37,432 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


Similarity score: 0.7891994183059737
Similarity score: 0.7829297374151901
Similarity score: 0.7814907175664827


2025-11-12 12:49:37,699 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


Similarity score: 0.8073816109004219
Similarity score: 0.8071320594103889
Similarity score: 0.7885230667544951


2025-11-12 12:49:38,110 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


Similarity score: 0.7399285722849802
Similarity score: 0.7357767283850206
Similarity score: 0.7291277680044875


2025-11-12 12:49:38,433 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-11-12 12:49:38,637 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


Similarity score: 0.7710219461585588
Similarity score: 0.7194587751939063
Similarity score: 0.7118914771311693




Similarity score: 0.7198285505693166
Similarity score: 0.7183152469001087
Similarity score: 0.7139555979635661


2025-11-12 12:49:39,175 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-11-12 12:49:39,187 - INFO - Average DCG:  0.6197
2025-11-12 12:49:39,188 - INFO - Average nDCG: 0.6182


Similarity score: 0.7988127772765206
Similarity score: 0.7784446594574791
Similarity score: 0.7753542417904178

EVALUATION RESULTS

üìÅ Generation results saved to rag_retrieval_eval.csv


In [6]:
display(df)

Unnamed: 0,chunk_size,overlap,top_k,avg_dcg,avg_ndcg,num_queries,k,chunk_overlap,query_results
0,300,30,3,0.619664,0.618158,10,3,30,"[{'qa_id': 'q1', 'question': 'Can you build a ..."
