Imports

In [1]:
# --- dependencies  ---
from dotenv import load_dotenv
import os, json, math, statistics, uuid, time
from pathlib import Path
from typing import List, Dict, Any, Tuple
from collections import defaultdict, OrderedDict
from functools import lru_cache

# LangChain + community
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema import Document

from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain.load import dumps, loads

# Groq LLM
from langchain_groq import ChatGroq


from operator import itemgetter


Loads common libs.

load_dotenv pulls env vars from .env.

Path for file paths; typing for type hints.

defaultdict/OrderedDict help with scoring & result ordering.

lru_cache caches function outputs (for speed).

PDF loader → turns PDFs into Documents.

Splitter → breaks docs into chunks for vector search.

OllamaEmbeddings → local embedding model.

FAISS → vector index for similarity search.

Prompt templating and string parsing for LLM I/O.

dumps/loads used for (de)duping docs by serializing.

The Groq-hosted LLM wrapper (generator + judge).

#  ENV & CONFIG

In [2]:
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY")


DATA_DIR = "data"
INDEX_DIR = "faiss_index"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
TOP_K = 4                
N_QUERIES = 5            

# Models
GEN_MODEL = "llama3-8b-8192"        
JUDGE_MODEL = "llama3-8b-8192"   


Loads the API key at runtime; asserts it’s present.
Tuning knobs: where PDFs are, how to chunk, how many chunks to retrieve (TOP_K), and how many reformulated queries to generate.

LOAD DOCS & BUILD / LOAD INDEX

In [3]:
def load_corpus_and_index() -> Tuple[List[Document], FAISS]:
    pdf_paths = list(Path(DATA_DIR).glob("*.pdf"))
    docs: List[Document] = []
    for path in pdf_paths:
        loader = PyPDFLoader(str(path))
        docs.extend(loader.load())

    splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    chunks = splitter.split_documents(docs)
    print(f"Total Chunks: {len(chunks)}")

    embedding = OllamaEmbeddings(model="nomic-embed-text")
    if not Path(INDEX_DIR).exists():
        db = FAISS.from_documents(chunks, embedding=embedding)
        db.save_local(INDEX_DIR)

    db = FAISS.load_local(INDEX_DIR, embeddings=embedding, allow_dangerous_deserialization=True)
    return chunks, db

corpus_chunks, faiss_db = load_corpus_and_index()
retriever = faiss_db.as_retriever(search_kwargs={"k": TOP_K})
embedding = OllamaEmbeddings(model="nomic-embed-text")

Total Chunks: 23


  embedding = OllamaEmbeddings(model="nomic-embed-text")


Defines a function to read all PDFs from data/ and turn each page into Documents. 
Splits docs into overlapping chunks (improves retrieval granularity).
Creates embeddings and builds a FAISS index if it doesn’t already exist, then saves it.
Loads (or just created) FAISS index; returns both the chunk list and the FAISS store.
Calls load_corpus_and_index() and sets up a retriever wrapper (always fetches k=TOP_K chunks).

Also instantiates a (reusable) embedding model for other computations (e.g., eval metric).

An index is needed to do vector similarity search; this block ensures it exists and exposes a simple retriever for later.

2) LLMs (generator + judge)

In [4]:
llm = ChatGroq(groq_api_key=GROQ_API_KEY, model_name=GEN_MODEL, temperature=0.0)
judge_llm = ChatGroq(groq_api_key=GROQ_API_KEY, model_name=JUDGE_MODEL, temperature=0.0)

Two model roles: generator (answers questions) and judge (scores answers).
Two LLM clients: one to generate answers; another to score them.

temperature=0.0 for deterministic-ish outputs and stable scoring.

3) UTILS (caching, formatting)

In [None]:
@lru_cache(maxsize=4096)
def embed_query_cached(text: str) -> List[float]:
    return embedding.embed_query(text)

def embed_documents(texts: List[str]) -> List[List[float]]:
    # small manual cache to reduce duplicate doc embeddings
    cache: Dict[str, List[float]] = {}
    out = []
    for t in texts:
        if t not in cache:
            cache[t] = embedding.embed_query(t)  # for FAISS, doc/query vector in same space
        out.append(cache[t])
    return out

def cosine(a: List[float], b: List[float]) -> float:
    dot = sum(x*y for x, y in zip(a, b))
    na = math.sqrt(sum(x*x for x in a))
    nb = math.sqrt(sum(y*y for y in b))
    return 0.0 if na == 0 or nb == 0 else dot / (na * nb)

def docs_to_context(docs: List[Document]) -> str:
    parts = []
    for i, d in enumerate(docs[:TOP_K]):
        meta = d.metadata or {}
        src = meta.get("source", "")
        page = meta.get("page", "")
        header = f"[DOC {i+1} | {Path(src).name if src else 'unknown'} | page {page}]"
        body = d.page_content or ""
        parts.append(f"{header}\n{body}")
    return "\n\n".join(parts)

def serialize_doc_id(doc: Document) -> str:
    # stable identity for fusion/dedup
    meta = doc.metadata or {}
    key = f"{meta.get('source','')}-{meta.get('page','')}-{hash(doc.page_content)}"
    return key

def dedupe_docs_preserve_order(docs: List[Document]) -> List[Document]:
    seen = set()
    out = []
    for d in docs:
        sid = serialize_doc_id(d)
        if sid not in seen:
            seen.add(sid)
            out.append(d)
    return out


Caches query embeddings so repeated calls don’t recompute (speeds up eval).
Embeds a list of texts with tiny per-call caching (reduces duplicate work within the call).
Plain cosine similarity for vectors (used in retrieval relevance metric).
Converts retrieved docs into a readable context string for the generator prompt (includes [DOC i | source | page] headers).
Creates a stable-ish key for a chunk to help dedup across different retrieval strategies.
Removes duplicate doc chunks while preserving order so as to keep top-ranked items first.

 4) QUERY TRANSLATION TEMPLATES

In [6]:
MULTI_Q_TEMPLATE = """Generate {n} distinct reformulations of this question to help vector retrieval.
Return one per line, no numbering.
Question: {question}"""

DECOMP_TEMPLATE = """Decompose the following question into 3-5 concrete sub-questions that, if answered, together answer the original.
Return one per line, no numbering.
Question: {question}"""

STEPBACK_TEMPLATE = """Rewrite the question into a more general, step-back abstraction that captures the core goal concisely.
Return only the abstracted question, no commentary.
Question: {question}"""

HYDE_TEMPLATE = """Write a short, factual hypothetical answer to the question below (even if you don't actually know).
Keep it 3-5 sentences, neutral tone, no sources.
Question: {question}"""

def gen_lines(prompt_tmpl: str, **kwargs) -> List[str]:
    p = ChatPromptTemplate.from_template(prompt_tmpl)
    chain = p | llm | StrOutputParser()
    raw = chain.invoke(kwargs)
    lines = [ln.strip() for ln in raw.split("\n") if ln.strip()]
    return lines

These strings prompt the LLM to produce alternative queries used by the different strategies.
gen_lines defines a helper to:

build a prompt from a template,

call the generator LLM,

split into one-line outputs (list of strings).

Used by all retrieval strategies that need multiple queries (multi, fusion, decomposition, step-back, hyde).

5) RETRIEVAL STRATEGIES

Each strategy returns a list of Documents (the top retrieval set). They’re later fed to the generator to answer.

In [7]:
def retrieve_base(question: str, k: int = TOP_K) -> List[Document]:
    return retriever.get_relevant_documents(question)[:k]

def retrieve_multi_query(question: str, n: int = N_QUERIES, k: int = TOP_K) -> List[Document]:
    alts = gen_lines(MULTI_Q_TEMPLATE, n=n, question=question)
    docs: List[Document] = []
    for q in alts:
        docs.extend(retriever.get_relevant_documents(q))
    return dedupe_docs_preserve_order(docs)[:max(k, len(docs))][:k]

def retrieve_rag_fusion(question: str, n: int = N_QUERIES, k: int = TOP_K, rrf_k:int=60) -> List[Document]:
    """Reciprocal Rank Fusion over results from multiple queries."""
    alts = gen_lines(MULTI_Q_TEMPLATE, n=n, question=question)
    rankings: List[List[Document]] = []
    for q in alts:
        rankings.append(retriever.get_relevant_documents(q))

    scores: Dict[str, float] = defaultdict(float)
    doc_by_id: Dict[str, Document] = {}
    for rank_list in rankings:
        for r, d in enumerate(rank_list, start=1):
            did = serialize_doc_id(d)
            scores[did] += 1.0 / (rrf_k + r)
            doc_by_id[did] = d

    # order by fused score
    ordered = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    fused_docs = [doc_by_id[did] for did, _ in ordered]
    return fused_docs[:k]

def retrieve_decomposition(question: str, k: int = TOP_K) -> List[Document]:
    subs = gen_lines(DECOMP_TEMPLATE, question=question)
    docs: List[Document] = []
    for sq in subs:
        docs.extend(retriever.get_relevant_documents(sq))
    # Prioritize documents appearing across many sub-queries
    counts: Dict[str, int] = defaultdict(int)
    by_id: Dict[str, Document] = {}
    for d in docs:
        did = serialize_doc_id(d)
        counts[did] += 1
        by_id[did] = d
    ordered = sorted(counts.items(), key=lambda x: (x[1], len(by_id[x[0]].page_content)), reverse=True)
    return [by_id[did] for did, _ in ordered][:k]

def retrieve_step_back(question: str, k: int = TOP_K) -> List[Document]:
    abstract_qs = gen_lines(STEPBACK_TEMPLATE, question=question)
    abstract = abstract_qs[0] if abstract_qs else question
    combined_docs = retriever.get_relevant_documents(question) + retriever.get_relevant_documents(abstract)
    return dedupe_docs_preserve_order(combined_docs)[:k]

def retrieve_hyde(question: str, k: int = TOP_K) -> List[Document]:
    hyp = gen_lines(HYDE_TEMPLATE, question=question)
    hyp_text = hyp[0] if hyp else question
    # Use hypothetical answer as retrieval query (optionally mix with original)
    docs = retriever.get_relevant_documents(hyp_text) + retriever.get_relevant_documents(question)
    return dedupe_docs_preserve_order(docs)[:k]

RETRIEVERS = OrderedDict({
    "multi_query": retrieve_multi_query,
    "rag_fusion": retrieve_rag_fusion,
    "decomposition": retrieve_decomposition,
    "step_back": retrieve_step_back,
    "hyde": retrieve_hyde,
})


Simple baseline .
Retrieve Multi- Query : Generate N query variants, retrieve for each, concat all hits, dedupe, then take top k.
Retrieve RAG Fusion: run multi queries, then apply Reciprocal Rank Fusion.
Each doc gets a higher score if it appears high across many lists; final top k returned.
Decomposition: break the question into sub-questions.Rank docs by how often they appear across sub-queries (tie-breaker: longer content).Return top k.
Step-back: generate a more abstract version of the question; retrieve for both the original and the abstract; merge & dedupe, then top k.
HyDE: generate a hypothetical answer first, use it as a query to retrieve, also add retrieval on original, merge & dedupe, top k.
A registry mapping names → functions.

RETRIEVERS is Used later by the evaluation loop to run all strategies. 


6) RAG ANSWER PROMPT

In [8]:
RAG_TEMPLATE = """Answer the following question based ONLY on this context. If the answer
is not contained in the context, say "I don't have enough information in the provided context."

{context}

Question: {question}
"""
rag_prompt = ChatPromptTemplate.from_template(RAG_TEMPLATE)

# Correct wiring: extract fields explicitly
rag_chain = (
    {"context": itemgetter("context"), "question": itemgetter("question")}
    | rag_prompt
    | llm
    | StrOutputParser()
)

def generate_answer(question: str, docs: List[Document]) -> Tuple[str, str]:
    context_str = docs_to_context(docs)
    answer = rag_chain.invoke({"context": context_str, "question": question})
    return answer, context_str


RAG_TEMPLATE: Defines the instruction the generator uses to answer from context only.
rag_chain is A LangChain runnable pipeline:
itemgetter extracts "context" and "question" fields from the input dict,
fills rag_prompt,
calls the generator (llm),
parses the output to a string.
generate_answer: Called in the evaluation loop:
Converts retrieved docs → context string,
Runs the RAG chain to get the model answer,
Returns both the answer and the context used.

7) EVAL: LLM-AS-JUDGE PROMPTS

In [9]:
CORRECTNESS_PROMPT = ChatPromptTemplate.from_template("""
You are grading an answer for correctness against a reference solution.

Question:
{question}

Model Answer:
{answer}

Reference Answer (the ground truth):
{reference}

Return strict JSON: {{"score": <1-5 integer>, "justification": "<=30 words"}}
Only output JSON.
""".strip())

GROUNDED_PROMPT = ChatPromptTemplate.from_template("""
You are grading how well the answer is supported by the provided context (groundedness).

Context:
{context}

Question:
{question}

Model Answer:
{answer}

Grade 1-5 (5 = fully supported, 1 = not supported). Penalize claims not in context.
Return JSON: {{"score": <1-5 integer>, "justification": "<=30 words"}}
Only output JSON.
""".strip())

RELEVANCE_PROMPT = ChatPromptTemplate.from_template("""
You are grading answer relevance to the question.

Question:
{question}

Model Answer:
{answer}

Grade 1-5 (5 = directly answers; 1 = off-topic).
Return JSON: {{"score": <1-5 integer>, "justification": "<=30 words"}}
Only output JSON.
""".strip())

These templates instruct the judge model on how to score.Each asks for strict JSON with a 1–5 score and a short justification.

 7)Judge Functions

In [10]:
def parse_json_safe(text: str) -> Dict[str, Any]:
    try:
        return json.loads(text.strip())
    except Exception:
        s = text.rfind("{"); e = text.rfind("}")
        if s != -1 and e != -1 and e > s:
            try:
                return json.loads(text[s:e+1])
            except Exception:
                pass
    return {"score": None, "justification": "parse_error"}

def judge_correctness(q: str, a: str, ref: str) -> Dict[str, Any]:
    out = CORRECTNESS_PROMPT | judge_llm | StrOutputParser()
    res = out.invoke({"question": q, "answer": a, "reference": ref})
    return parse_json_safe(res)

def judge_groundedness(q: str, a: str, ctx: str) -> Dict[str, Any]:
    out = GROUNDED_PROMPT | judge_llm | StrOutputParser()
    res = out.invoke({"question": q, "answer": a, "context": ctx})
    return parse_json_safe(res)

def judge_relevance(q: str, a: str) -> Dict[str, Any]:
    out = RELEVANCE_PROMPT | judge_llm | StrOutputParser()
    res = out.invoke({"question": q, "answer": a})
    return parse_json_safe(res)

def retrieval_relevance_embedding(question: str, docs: List[Document]) -> float:
    qv = embed_query_cached(question)
    if not docs:
        return 0.0
    texts = [d.page_content or "" for d in docs]
    dvs = embed_documents(texts)
    sims = [cosine(qv, dv) for dv in dvs]
    return float(sum(sims)/len(sims))

parse_json_safe: Tries to parse judge outputs as JSON; has a fallback that extracts the last {...} block if the model added extra text.
judge_correctness: Called per example: sends question, model answer, and reference answer to the judge.
judge_groundedness: Called per example: checks if the answer is supported by the retrieved context only.
judge_relevance: Called per example: evaluates how on-topic the answer is to the question.
retrieval_relevance_embedding: Embedding-based retrieval relevance metric in [0..1]: average cosine between question and retrieved chunks.
Called per example: gives a numeric signal of how well the retriever matched the question.




8) Dataset of Questions

In [11]:
monopoly_dataset = [
    {"question": "What happens when a player lands on an unowned property?",
     "answer": "The player may buy it from the Bank at its printed price. If they don’t buy it, the property is auctioned to the highest bidder."},
    {"question": "Can you collect rent while in Jail?",
     "answer": "Yes, a player may collect rent while in Jail."},
    {"question": "What are the three ways to get out of Jail?",
     "answer": "Rolling doubles, using a 'Get Out of Jail Free' card, or paying $50 before your next turn."},
    {"question": "What happens when you land on or pass GO?",
     "answer": "You collect $200 from the Bank."},
    {"question": "How is rent affected by owning all properties in a color group?",
     "answer": "Rent is doubled on unimproved properties when a player owns all properties in a color group."},
    {"question": "When can you build houses on your properties?",
     "answer": "You must own all properties in a color group and build evenly, one house at a time on each property."},
    {"question": "What happens if you don’t have enough cash to pay a debt?",
     "answer": "You must mortgage properties or sell houses/hotels to raise the funds. If you still can't pay, you’re bankrupt."},
    {"question": "Can a mortgaged property earn rent?",
     "answer": "No, rent cannot be collected on mortgaged properties."},
    {"question": "What is the cost of each house and hotel?",
     "answer": "Each house and hotel has a price listed on the property deed card."},
    {"question": "How do you win Monopoly?",
     "answer": "You win by being the last player remaining after all others have gone bankrupt."}
]


Boardgame Q/A set for evaluation.
Used by the eval loop for every method.

9) EVALUATION LOOP

In [12]:
def avg_safe(vals):
    vals = [v for v in vals if v is not None]
    return round(float(statistics.mean(vals)), 3) if vals else None

def eval_method(method_name: str, retrieve_fn, dataset: List[Dict[str, str]]) -> Dict[str, Any]:
    print(f"\n=== Evaluating method: {method_name} ===")
    results = []
    for i, item in enumerate(dataset, 1):
        q = item["question"]; ref = item["answer"]

        docs = retrieve_fn(q)
        ans, ctx = generate_answer(q, docs)

        corr = judge_correctness(q, ans, ref)
        grnd = judge_groundedness(q, ans, ctx)
        relv = judge_relevance(q, ans)
        retr_rel = retrieval_relevance_embedding(q, docs)

        rec = {
            "id": i,
            "method": method_name,
            "question": q,
            "reference_answer": ref,
            "model_answer": ans,
            "correctness": corr,
            "groundedness": grnd,
            "relevance": relv,
            "retrieval_relevance_embedding": retr_rel,
            "retrieved_context_preview": ctx[:1200],
        }
        results.append(rec)

        print(f"[{i}] Correctness: {corr.get('score')} | Grounded: {grnd.get('score')} | "
              f"Relevance: {relv.get('score')} | RetrRel: {retr_rel:.3f}")

    summary = {
        "method": method_name,
        "n": len(results),
        "avg_correctness_1to5": avg_safe([r["correctness"].get("score") for r in results]),
        "avg_groundedness_1to5": avg_safe([r["groundedness"].get("score") for r in results]),
        "avg_relevance_1to5":   avg_safe([r["relevance"].get("score")   for r in results]),
        "avg_retrieval_relevance_0to1": avg_safe([r["retrieval_relevance_embedding"] for r in results]),
        "generator_model": GEN_MODEL,
        "judge_model": JUDGE_MODEL,
    }
    return {"summary": summary, "results": results}


Helper to average scores safely (ignores None).
eval_method: Defines the core evaluation for one method:
    Calls that method’s retriever function (retrieve_fn) to get docs.
    Calls generate_answer to produce the model’s answer from those docs.
    Calls all judge functions to score the answer.
    Builds a result record (keeps a preview of context).
    Prints a per-item score line.
    Computes averages across the dataset and returns both the summary and results.

10) Run evaluation

In [13]:
def main():
    ts = time.strftime("%Y%m%d-%H%M%S")
    all_summaries = []
    out_dir = Path(f"rag_eval_{ts}")
    out_dir.mkdir(parents=True, exist_ok=True)

    for name, fn in RETRIEVERS.items():
        eval_out = eval_method(name, fn, monopoly_dataset)
        summary = eval_out["summary"]; results = eval_out["results"]
        all_summaries.append(summary)

        # Save per-method artifacts
        with (out_dir / f"{name}_results.jsonl").open("w", encoding="utf-8") as f:
            for r in results:
                f.write(json.dumps(r, ensure_ascii=False) + "\n")

        with (out_dir / f"{name}_summary.json").open("w", encoding="utf-8") as f:
            json.dump(summary, f, indent=2, ensure_ascii=False)

    # Combined summary
    combined = {
        "methods": all_summaries,
        "best_by_correctness": max(all_summaries, key=lambda s: s["avg_correctness_1to5"] or 0.0),
        "best_by_groundedness": max(all_summaries, key=lambda s: s["avg_groundedness_1to5"] or 0.0),
        "best_by_relevance":   max(all_summaries, key=lambda s: s["avg_relevance_1to5"] or 0.0),
        "best_by_retrieval_relevance": max(all_summaries, key=lambda s: s["avg_retrieval_relevance_0to1"] or 0.0),
    }
    with (out_dir / "combined_summary.json").open("w", encoding="utf-8") as f:
        json.dump(combined, f, indent=2, ensure_ascii=False)

    print("\n=== Combined Summary ===")
    print(json.dumps(combined, indent=2))

Calls eval_method once per retrieval strategy in the registry, accumulates per-method summaries.
Writes detailed per-item results to JSONL and a per-method summary JSON.
Aggregates all method summaries, picks best per metric, writes a combined summary JSON, and prints it.

11) Run

Load PDFs → chunk → embed → index (load_corpus_and_index), create a retriever.

Define five retrieval strategies that transform the question (multi, fusion, decomposition, step-back, hyde) → return top k chunks.

RAG answer pipeline (rag_chain) takes {context, question} → model answer.

Judge metrics: correctness vs reference; groundedness vs context; relevance vs question; retrieval relevance by cosine.

Evaluate each strategy over every Q/A, print & save per-item results, per-method summary, and a combined “best of” summary.

In [14]:
if __name__ == "__main__":
    main()


=== Evaluating method: multi_query ===


  docs.extend(retriever.get_relevant_documents(q))


[1] Correctness: 2 | Grounded: 1 | Relevance: 2 | RetrRel: 0.693
[2] Correctness: 2 | Grounded: 1 | Relevance: 2 | RetrRel: 0.566
[3] Correctness: 2 | Grounded: 1 | Relevance: 1 | RetrRel: 0.519
[4] Correctness: 1 | Grounded: 1 | Relevance: 5 | RetrRel: 0.504
[5] Correctness: 4 | Grounded: 5 | Relevance: 5 | RetrRel: 0.631
[6] Correctness: 2 | Grounded: 2 | Relevance: 2 | RetrRel: 0.674
[7] Correctness: 2 | Grounded: None | Relevance: 1 | RetrRel: 0.596
[8] Correctness: 4 | Grounded: 5 | Relevance: 5 | RetrRel: 0.661
[9] Correctness: 1 | Grounded: 1 | Relevance: 5 | RetrRel: 0.564
[10] Correctness: 2 | Grounded: 1 | Relevance: 1 | RetrRel: 0.601

=== Evaluating method: rag_fusion ===
[1] Correctness: 2 | Grounded: 2 | Relevance: 2 | RetrRel: 0.711
[2] Correctness: 4 | Grounded: 5 | Relevance: 5 | RetrRel: 0.638
[3] Correctness: 4 | Grounded: 5 | Relevance: 5 | RetrRel: 0.668
[4] Correctness: 4 | Grounded: None | Relevance: 5 | RetrRel: 0.588
[5] Correctness: 4 | Grounded: 5 | Relevance