# RAG

Implement a base RAG module in DSPy. 
Given a question, retrieve the top-k documents in a list of HTML documents, then pass them as context to an LLM.

Refer to https://dspy.ai/tutorials/rag/. 


In [1]:
import dspy
from sentence_transformers import SentenceTransformer

# Load an extremely efficient local model for retrieval
model = SentenceTransformer("sentence-transformers/static-retrieval-mrl-en-v1", device="cuda")

# Create an embedder using the model's encode method
embedder = dspy.Embedder(model.encode)

# Traverse a directory and read html files - extract text from the html files
import os
from bs4 import BeautifulSoup

def read_html_files(directory, sources_root="../PragmatiCQA-sources"):
    """
    Read .html files from a path. Accepts:
      - absolute path
      - relative path (from current working directory)
      - topic name or folder name (will be resolved under sources_root)
    Returns a list of document texts. If path not found, returns [].
    """
    texts = []

    # Normalize and resolve path
    if os.path.isabs(directory):
        path = directory
    else:
        # prefer the provided relative path if it exists
        if os.path.exists(directory):
            path = directory
        else:
            # treat `directory` as a topic name under the default sources root
            path = os.path.join(sources_root, directory)

    path = os.path.abspath(path)

    if not os.path.exists(path):
        # not throwing here so notebook flow is uninterrupted; caller can handle empty list
        print(f"Directory not found: {path}")
        return texts

    for filename in os.listdir(path):
        if filename.endswith(".html"):
            file_path = os.path.join(path, filename)
            try:
                with open(file_path, 'r', encoding='utf-8') as file:
                    soup = BeautifulSoup(file, 'html.parser')
                    texts.append(soup.get_text())
            except Exception as e:
                # skip problematic files but report minimal info
                print(f"Warning: could not read {file_path}: {e}")
    return texts

In [2]:
# Provide either an absolute path, a relative path, or a topic folder name.
# Examples:
#   directory = '/full/path/to/PragmatiCQA-sources/The Legend of Zelda'
#   directory = '../PragmatiCQA-sources/The Legend of Zelda'
#   directory = 'The Legend of Zelda'           # treated as topic name under sources_root

directory = "../PragmatiCQA-sources/The Legend of Zelda"
corpus = read_html_files(directory)
if not corpus:
    print(f"No documents loaded from: {os.path.abspath(directory)}")
else:
    print(f"Loaded {len(corpus)} documents. Will encode them below.")

Loaded 406 documents. Will encode them below.


In [3]:
# Parameters for the retriever
max_characters = 10000  # for truncating >99th percentile of documents
topk_docs_to_retrieve = 5  # number of documents to retrieve per search query

search = dspy.retrievers.Embeddings(embedder=embedder, corpus=corpus, k=topk_docs_to_retrieve)



In [4]:
# lm = dspy.LM('ollama_chat/devstral', api_base='http://localhost:11434', api_key='')
lm = dspy.LM('xai/grok-3-mini')
dspy.configure(lm=lm)

In [5]:
class RAG(dspy.Module):
    def __init__(self):
        self.respond = dspy.ChainOfThought('context, question -> response')

    def forward(self, question):
        context = search(question).passages
        return self.respond(context=context, question=question)
    
rag = RAG()

In [6]:
answer = rag(question="What is the main plot of The Legend of Zelda?")  # Example query

print(answer.response)  # Print the response from the RAG model

The main plot of The Legend of Zelda revolves around a young hero named Link who must save the kingdom of Hyrule from the evil Ganon, the Prince of Darkness. Ganon steals the Triforce of Power and seeks to conquer Hyrule, prompting Princess Zelda to break the Triforce of Wisdom into eight fragments and hide them across the land. Zelda sends her nursemaid, Impa, to find a brave warrior to stop Ganon. Link embarks on a quest to collect the fragments, explore dungeons, battle enemies, and ultimately confront Ganon to rescue Princess Zelda and restore peace to Hyrule.


In [7]:
q = 'What year did the Legend of Zelda come out?' 

print(rag(question=q).response)

The Legend of Zelda was first released in 1986.


Part 4.3 — Traditional QA baseline (start)

Plan:
- Use the existing retriever `search` to obtain retrieved passages for a question.
- Use Hugging Face's 'distilbert-base-cased-distilled-squad' extractive QA pipeline to answer the question given:
  1) Literal spans (from dataset),
  2) Pragmatic spans (from dataset),
  3) Retrieved context (from `search`).
- Evaluate these three configurations with dspy.evaluate.SemanticF1 on the first question of each conversation.

In [None]:
# 4.3 Part 1: Traditional QA Baseline Implementation
from transformers import pipeline
import json, os
from tqdm.notebook import tqdm
import pandas as pd
import dspy
from dspy.evaluate import SemanticF1

def setup_qa_pipeline(device=None):
    """Initialize a robust QA pipeline.
    Tries a list of reliable HF models, detects CUDA availability,
    avoids float16 on CPU, and falls back gracefully to a CPU pipeline.
    Accepts device as: 'cuda', 0, -1, or None.
    """
    import torch
    from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline as _pipeline

    # Reliable candidate models
    model_candidates = [
        'distilbert/distilbert-base-cased-distilled-squad',
        'deepset/roberta-base-squad2'
    ]

    last_err = None
    # Normalize device intent
    want_cuda = False
    if isinstance(device, str) and device.lower().startswith('cuda'):
        want_cuda = True
    elif isinstance(device, int) and device >= 0:
        want_cuda = True

    for model_name in model_candidates:
        try:
            tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
            # Only use GPU if available
            use_cuda = want_cuda and torch.cuda.is_available()
            if use_cuda:
                model = AutoModelForQuestionAnswering.from_pretrained(model_name).half().to('cuda')
                device_id = 0
            else:
                model = AutoModelForQuestionAnswering.from_pretrained(model_name)
                device_id = -1
            model.eval()
            return _pipeline('question-answering', model=model, tokenizer=tokenizer, device=device_id)
        except Exception as e:
            last_err = e
            # try next candidate
            continue

    # Final fallback: let transformers download and create a CPU pipeline from a known model id
    try:
        return _pipeline('question-answering', model=model_candidates[0], device=-1)
    except Exception as e:
        print('Error setting up QA pipeline: ', last_err)
        print('Final fallback failed: ', e)
        # Re-raise to make failure visible to the caller
        raise

def load_validation_data(filename="val.jsonl", dataset_dir="../PragmatiCQA/data"):
    """Load and filter validation data to first questions only."""
    try:
        with open(os.path.join(dataset_dir, filename), 'r') as f:
            return [json.loads(line) for line in f]
    except Exception as e:
        print(f"Error loading validation data: {e}")
        return []

def evaluate_qa_approaches(val_data, search, qa_pipeline, metric, max_samples=None):
    """
    Evaluate three QA approaches on first questions:
    1. Using literal spans
    2. Using pragmatic spans
    3. Using retrieved context
    """
    results = []
    samples = val_data[:max_samples] if max_samples else val_data
    
    for doc in tqdm(samples, desc="Evaluating QA approaches"):
        if not doc.get("qas"):
            continue
            
        # Get first question data
        qa = doc["qas"][0]
        question = qa["q"]
        gold = qa["a"]
        
        # Setup contexts
        literal_spans = [s["text"] for s in qa["a_meta"].get("literal_obj", [])]
        pragmatic_spans = [s["text"] for s in qa["a_meta"].get("pragmatic_obj", [])]
        literal_context = " ".join(literal_spans).strip()
        pragmatic_context = " ".join(pragmatic_spans).strip()
        
        try:
            retrieved_passages = search(question).passages
            retrieved_context = " ".join(retrieved_passages).strip()
        except Exception as e:
            print(f"Retrieval error for question: {question}\nError: {e}")
            continue

        # Generate answers
        def get_answer(context):
            if not context:
                return ""
            try:
                result = qa_pipeline(question=question, context=context)
                return result["answer"] if isinstance(result, dict) else ""
            except Exception as e:
                print(f"QA pipeline error: {e}")
                return ""

        answers = {
            "literal": get_answer(literal_context),
            "pragmatic": get_answer(pragmatic_context),
            "retrieved": get_answer(retrieved_context)
        }

        # Compute scores
        gold_ex = dspy.Example(question=question, response=gold, 
                             inputs={"context": retrieved_context})
        
        scores = {}
        for approach, answer in answers.items():
            pred_ex = dspy.Example(question=question, response=answer,
                                 inputs={"context": retrieved_context})
            try:
                scores[approach] = metric(gold_ex, pred_ex)
            except Exception as e:
                print(f"Scoring error for {approach}: {e}")
                scores[approach] = 0.0

        results.append({
            "topic": doc.get("topic", "unknown"),
            "question": question,
            "gold": gold,
            **answers,
            **scores
        })
        
    return pd.DataFrame(results)

# Setup components
print("Setting up evaluation pipeline...")
qa_pipeline = setup_qa_pipeline(device='cuda')
metric = SemanticF1(decompositional=True)
val_data = load_validation_data()
num_samples = 100
if not val_data:
    print("No validation data loaded!")
else:
    print(f"Loaded {len(val_data)} validation documents")
    
    # Run evaluation
    print("\nEvaluating first questions from validation set...")
    results_df = evaluate_qa_approaches(val_data, search, qa_pipeline, metric,max_samples=num_samples)
    
    # Show aggregate results
    print("\nAggregate Results:")
    agg_scores = results_df[["literal", "pragmatic", "retrieved"]].mean()
    print("\nMean scores across all questions:")
    print(agg_scores)
    
    # Show sample predictions
    print("\nSample Predictions (first 3):")
    for _, row in results_df.head(3).iterrows():
        print("\nQuestion:", row["question"])
        print("Gold:", row["gold"][:200], "..." if len(row["gold"]) > 200 else "")
        print(f"Literal ({row['literal']:.2f}):", row["literal"])
        print(f"Pragmatic ({row['pragmatic']:.2f}):", row["pragmatic"])
        print(f"Retrieved ({row['retrieved']:.2f}):", row["retrieved"])
        print("-" * 80)

# Save results
results_df.to_csv("traditional_qa_results.csv", index=False)
print("\nFull results saved to traditional_qa_results.csv")

Setting up evaluation pipeline...


Device set to use cuda:0


Loaded 179 validation documents

Evaluating first questions from validation set...


Evaluating QA approaches:   0%|          | 0/100 [00:00<?, ?it/s]

You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset
