# Retrieval Analysis and Equivalence Evaluation

This notebook analyzes the performance of a retrieval-augmented generation (RAG) agent on a set of QA tasks. It includes:
- Loading and mapping gold document IDs to datastore document IDs
- Running retrieval queries and collecting retrieved and attributed document IDs
- Evaluating retrieval metrics (recall, precision, nDCG, etc.)
- Using an LLM to evaluate the equivalence of agent responses to gold answers, with robust JSON enforcement and validation

In [1]:
import json
import pandas as pd
import numpy as np
import math
import asyncio
import csv
import tempfile
import os
import time
from typing import Dict, List, Tuple, Optional
from datasets import load_dataset

# RAG evaluation
from ragas.dataset_schema import SingleTurnSample
from ragas.metrics import NonLLMContextRecall, NonLLMContextPrecisionWithReference

# API clients
from contextual import ContextualAI
import anthropic

# Data validation
from pydantic import BaseModel, ValidationError

## Load Data

We load the annotated QA dataset and the mapping file that links gold document IDs to internal document IDs used by the retrieval system.

This annotated data comes from the RAG QA Arena benchmark: From: https://github.com/awslabs/rag-qa-arena

In [None]:
#df = pd.read_json('/Users/rajivshah/Code/rag-qa-arena/data/annotations_fiqa_with_citation.jsonl', lines=True)
#df

In [None]:
dataset = load_dataset("rajistics/rag-qa-arena")
df = dataset['fiqa'].to_pandas()
df.head()

## Map Gold Document IDs to Internal Document IDs

To compare retrieval results with ground truth, we map the gold document IDs to the internal document IDs used by the the Contextual RAG datastore.

In [None]:
mapping = pd.read_csv('hf_document_mapping.csv')
mapping

In [5]:
# Create a mapping dictionary from original_id to datastore_document_id
id_map = dict(zip(mapping['original_id'], mapping['datastore_document_id']))

# Function to map a list of gold_doc_ids to their corresponding datastore_document_ids
def map_doc_ids(gold_ids):
    return [id_map.get(gid) for gid in gold_ids if gid in id_map]

# Apply the function to the gold_doc_ids column
df['datastore_document_ids'] = df['gold_doc_ids'].apply(map_doc_ids)

In [None]:
df

## Run Retrieval Queries

We define functions to query the retrieval system for each question, collecting both the retrieved document IDs and the document IDs actually cited in the agent's answer (attributions).

In [None]:
client = ContextualAI(api_key="key-")
agent_id = "7a5f50ff-d205-4033-a7e5-529531230cb5"

### 🔍 Function Overview: `run_query(user_input)`

This function runs a retrieval-augmented generation (RAG) query against a custom agent and returns two levels of results:

1. **`retrieved_chunks`** – All retrieved chunks, including their document IDs and relevance scores.
2. **`attribution_doc_ids`** – A deduplicated list of document IDs that were actually cited in the model's final answer (i.e., the attributions), mapped from content-level IDs.
3.  **`response`** – The generated response from the model based on the retrievals.

This helps distinguish between *what was retrieved* and *what was used* in the final response, enabling fine-grained evaluation of retrieval and grounding quality.

In [7]:
def run_query(user_input):
    """
    Run the retrieval agent for the given user_input.
    Returns:
        - retrieved_chunks: list of dicts with 'doc_id' and 'score'
        - attribution_doc_ids: list of unique document IDs from attributions
    """
    query_result = client.agents.query.create(
        agent_id=agent_id,
        messages=[{
            "content": user_input,
            "role": "user"
        }],
        include_retrieval_content_text=True,
        retrievals_only=False
    )

    # Extract mapping from content_id to doc_id
    content_to_doc = {
        rc.content_id: rc.doc_id
        for rc in query_result.retrieval_contents
    }

    # Extract retrieved_chunks
    retrieved_chunks = [
        {"doc_id": rc.doc_id}
        for rc in query_result.retrieval_contents
    ]

    response = query_result.message.content

    # Get document IDs referenced in attributions
    attribution_doc_ids = set()
    seen_attr = set()
    for attr in query_result.attributions:
        key = tuple(attr.content_ids)
        if key not in seen_attr:
            seen_attr.add(key)
            for cid in attr.content_ids:
                doc_id = content_to_doc.get(cid)
                if doc_id:
                    attribution_doc_ids.add(doc_id)

    return retrieved_chunks, list(attribution_doc_ids), response

def get_doc_ids(row):
    retrieved, attributed, response = run_query(row['question'])
    # Flatten retrieved to just doc_id strings
    retrieved_ids = [r['doc_id'] for r in retrieved]
    return pd.Series({
        'retrievals': retrieved_ids,
        'attributions': attributed,        
        'response' : response,
    })



Run a test and make sure you are retrievals are aligned

In [None]:
print (run_query(df['question'][1]))
print(df['datastore_document_ids'][1])
print(df['gold_doc_ids'][1])

In [None]:
# Apply to your DataFrame
df[['retrievals', 'attributions','response']] = df.head(10).apply(get_doc_ids, axis=1)
df

## Sample and Save Results

To speed up evaluation, we sample 200 questions and run the retrieval and attribution pipeline, saving the results for further analysis.
It can take about 40 minutes to query 200 questions.

In [10]:
# Sample 200 rows reproducibly
df_sample_200 = df.sample(n=200, random_state=42).copy()

# Apply the function
df_sample_200[['retrievals', 'attributions','response']] = df_sample_200.apply(get_doc_ids, axis=1)

In [11]:
df_sample_200.to_csv('fiqa_sample_200_with_retrievals.csv', index=False)

In [None]:
df_sample_200 = pd.read_csv('fiqa_sample_200_with_retrievals.csv')
df_sample_200

## Equivalence Evaluation with LLM Tool Call

We use an LLM (Anthropic Claude) with a tool schema to robustly evaluate whether the agent's response is equivalent to the gold answer. The tool call enforces a strict JSON output, which is then validated with Pydantic for reliability.

In [None]:
anthropic_client = anthropic.Anthropic(api_key="sk-ant-") #update with your key

In [10]:
from pydantic import BaseModel, ValidationError
from typing import Optional

class EquivalenceResult(BaseModel):
    score: Optional[float]
    rationale: str

In [14]:
equivalence_tool = {
    "name": "evaluate_equivalence",
    "description": "Evaluate if the agent response is equivalent to the gold response.",
    "input_schema": {
        "type": "object",
        "properties": {
            "score": {
                "type": "number",
                "enum": [0.0, 1.0],
                "description": "1.0 if equivalent, 0.0 if not equivalent"
            },
            "rationale": {
                "type": "string",
                "description": "One sentence justification for the score"
            }
        },
        "required": ["score", "rationale"]
    }
}

In [15]:
equivalence_prompt = """You are an expert that can identify similar responses. You will be given a set of questions, a gold response that correctly answers the questions and an agent response that is generated by a system. We are evaluating the gold response against the agent response.

You are to compare the gold response with the agent response and rationalize whether the agent response is equivalent to the gold response.

When testing equivalence, consider the following:
- If the agent response is exactly the same as or very similar to the gold response, then they are equivalent.
- If the agent response contains all of the information in the gold response and some additional information, then they are equivalent.
- If the agent response contains an alternative good response that is grounded in the knowledge, then they are equivalent.
- Do not bias towards verbose or succinct responses, the verbosity of agent response does not matter as long as it comprehensively includes all the information from the gold response.

Given the question, gold response and agent response, come up with a step by step plan to analyse if the agent response is equivalent to the gold response. If you consider the agent response as a good alternative to the gold, you MUST justify your decision by citing the knowledge in your rationale. Think through this plan and end your output with a json.
"""
output_format = """
IMPORTANT: Format your evaluation as a JSON object with two keys:
{
    "rationale": "[Your one sentence justification of why the responses are equivalent or not]",
    "score": [1.0 or 0.0]
}

Where 1.0 means "equivalent" and 0.0 means "not equivalent"

IMPORTANT: Only output the JSON object, and nothing else.
"""

In [16]:
def get_equivalence_score_and_rationale(question, gold, agent, prompt_template):
    prompt = prompt_template + f"""
Question: {question}

Gold response: {gold}

Agent response: {agent}
""" + output_format
    
    response = anthropic_client.messages.create(
        model="claude-3-7-sonnet-latest",
        temperature=0,
        max_tokens=512,
        tools=[equivalence_tool],
        tool_choice={"type": "tool", "name": "evaluate_equivalence"},
        messages=[{"role": "user", "content": prompt}]
    )
    try:
        if response.content[0].type == "tool_use":
            result_dict = response.content[0].input
            result = EquivalenceResult(**result_dict)
            return result.score, result.rationale
        else:
            return None, f"Unexpected response type: {response.content[0].type}"
    except Exception as e:
        return None, f"Error parsing response: {e}\nRaw: {response.content[0]}"

## Apply Equivalence Evaluation

We apply the equivalence evaluation to each row in our sample, storing both the equivalence score (1.0 for equivalent, 0.0 for not) and a rationale.

In [None]:
def apply_equivalence(row):
    score, rationale = get_equivalence_score_and_rationale(
        row['question'],
        row['answer'],
        row['response'],
        equivalence_prompt
    )
    return pd.Series({'equivalence_score': score, 'equivalence_rationale': rationale})

df_sample_200[['equivalence_score', 'equivalence_rationale']] = df_sample_200.apply(apply_equivalence, axis=1)
df_sample_200

In [18]:
df_sample_200.to_csv('fiqa_sample_200_with_equivalence.csv', index=False)

In [None]:
df_sample_200 = pd.read_csv('fiqa_sample_200_with_equivalence.csv')
df_sample_200

## Get Comparison Against LFQRA / Human Answer

We use an LLM (Anthropic Claude) with to evaluate whether the agent's response is prefereable to the gold answer written by a human.

## Get Comparison against human answer 

In [14]:
comparison_prompt = """  system_prompt: |
    We will show you a query and a pair of answers to the query. You need to provide your preference over this pair of answers.

    First, try your best to determine whether the information in an answer can help truthfully answer the query. Then rate your preference based on Helpfulness and Truthfulness.
    - Helpfulness: information that is helpful/relevant to answer the query. An ideal answer consists of only information that is helpful/relevant to answer the query.
    - Truthfulness: information that you believe is correct to answer the query. By our definition, truthful information should be helpful information. If you find it difficult to determine the truthfulness of some information, consider it untruthful. Often time, this is due to not enough context provided in the answer. Another source of untruthfulness is when conflicting information presented, and the answer does not reconcile them in a coherent way.

    <rubric>
    Here is how you judge (in the order of importance),
    - If one answer has all truthful information while the other has some untruthful information, prefer the all truthful one.
    - If both have some untruthful information, prefer the one with less untruthful information.
    - If both have all truthful information, prefer the one with more truthful or helpful information.
    - If two answers look equally good, or it is too hard to judge using the 3 cases above, then you are our "not sure" which one is better.
    </rubric>

    ### Examples:

    **Example 1**
    User:
    <query>
    difference between publicly and publically.
    </query>

    <answer 1>
    Both 'publicly' and 'publically' bear no difference in meaning, as they are essentially alternative spellings of the same concept. Publicly is more widely used, but the existence of 'publically' in reputable sources like the OED means it cannot be dismissed as simply incorrect. Some opinions hold that 'publicly' is the older irregular form, still preached by a lot of grammars, and 'publically,' on the other hand, is the newer and regular form.
    </answer 1>

    <answer 2>
    There is no difference in meaning between 'publicly' and 'publically'; they are alternative spellings of the same word.
    </answer 2>

    Assistant:
    <thinking>Both <answer 1> and <answer 2> are truthful. However, <answer 1> provides more truthful information as the context to compare the two terms. Therefore, <answer 1> is better.</thinking>
    <rating>1</rating>

    **Example 2**
    User:
    <query>
    what did European/American historical cooks do with the egg whites?
    </query>

    <answer 1>
    Historical European and American cooks used egg whites for making egg white omelettes and egg white pasta, as well as for stiffening clothing, similar to how starch is used today.
    </answer 1>

    <answer 2>
    Egg whites have found their place in various non-culinary applications throughout history, such as in the clarification of beer and wine, in the conservation of books through bookbinding and gilding, and in makeup as an ancient form of nail polish. They were also utilized historically as a stiffening agent for clothing, similar to how starch is used today. The culinary landscape was not left untouched, with egg whites making their way into recipes for omelettes and pastas as early as the 15th century.
    </answer 2>

    Assistant:
    <thinking>Both <answer 1> and <answer 2> provide several usages of egg whites for European/American cooks. <answer 2> clearly provides more options with fully explained details. Therefore, <answer 2> is better.</thinking>
    <rating>2</rating>

    **Example 3**
    User:
    <query>
    should utf-16 be considered harmful?
    </query>

    <answer 1>
    The question of whether UTF-16 should be considered harmful is subject to differing opinions. One perspective suggests that UTF-16 is harmful due to a historical misconception about character encoding and recommends UTF-8 as the superior choice for various text interfaces. Another viewpoint argues that UTF-16 is not inherently harmful, emphasizing its utility in specific scenarios where it serves as a compromise between simplicity and compactness. The choice between UTF-16 and other encodings like UTF-8 depends on the specific requirements of the application, such as compatibility with ASCII or the need to efficiently encode certain character sets.
    </answer 1>

    <answer 2>
    UTF-16 should not be considered harmful. However, contrasting views argue that UTF-16 should indeed be considered harmful. Some argue that the very reason UTF-16 exists is because some time ago there used to be a misguided belief that WideChar is going to be what UCS-4 now is. Additionally, the harmfulness of UTF-16 is tied to issues with exercising code.
    </answer 2>

    Assistant:
    <thinking>Both <answer 1> and <answer 2> reconcile the two conflicting views with detailed explanation. I am not sure which one is better.</thinking>
    <rating>0</rating>
"""

user_prompt = """
    Query is in the <query></query> tags. Answer 1 is in <answer 1></answer 1>, and Answer 2 is in <answer 2></answer 2>.

    <query>
    {question}
    </query>

    <answer 1>
    {response1}
    </answer 1>

    <answer 2>
    {response2}
    </answer 2>

    Review the rubric in <rubric> tags,
    - if you prefer <answer 1>, output 1.
    - if you prefer <answer 2>, output 2.
    - if you are not sure, output 0.

    First, think step by step, put your thinking in <thinking></thinking> tags. Your thinking must be shorter than 50 words. Then, provide your rating inside <rating></rating> tags. Remember your rating should be 0 if you are not sure, and your rating must be either 0, 1, or 2.

    Return a single JSON object with two keys: `"rating"` containing your score and `"evaluation"` containing your thinking, but remove the tags.
"""

In [None]:
def get_preference_rating(question, response1, response2):
    # Fill in the user prompt with the actual question and responses

    prompt = comparison_prompt + f"""
        Question: {question}

        Answer 1: {response1}

        Answer 2: {response2}
        """ + user_prompt
    
    prompt = user_prompt.format(
        question=question,
        response1=response1,
        response2=response2
    )
    # Compose the full prompt for the LLM
    messages = [
        {"role": "user", "content": prompt}
    ]
    # Call the LLM (Anthropic, OpenAI, etc.)
    response = anthropic_client.messages.create(
        model="claude-3-7-sonnet-latest",
        temperature=0,
        max_tokens=512,
        messages=messages
    )
    # Parse the output (should be a JSON object)
    import json
    try:
        # If the model returns the JSON as a string, parse it
        output = response.content[0].text if hasattr(response.content[0], "text") else response.content[0]
        result = json.loads(output)
        return result["rating"], result["evaluation"]
    except Exception as e:
        return None, f"Error parsing response: {e}\nRaw: {response.content[0]}"

In [25]:
def apply_preference(row):
    rating, evaluation = get_preference_rating(
        row['question'],
        row['answer'],
        row['response']
    )
    return pd.Series({'preference_rating': rating, 'preference_evaluation': evaluation})


In [None]:
df_sample_200[['preference_rating', 'preference_evaluation']] = df_sample_200.apply(apply_preference, axis=1)
df_sample_200

In [30]:
df_sample_200.to_csv('fiqa_sample_200_with_comparison.csv', index=False)

## Calculate Retrieval Metrics

We compute standard retrieval metrics (recall@k, precision@k, nDCG@k, etc.) for each question, using both retrieved and attributed document IDs.

In [None]:
df_sample_200

In [31]:
def recall_at_k(retrieved_ids, ground_truth_ids, k):
    retrieved_top_k = set(retrieved_ids[:k])
    return len(retrieved_top_k & ground_truth_ids) / len(ground_truth_ids) if ground_truth_ids else 0.0

def precision_at_k(retrieved_ids, ground_truth_ids, k):
    retrieved_top_k = retrieved_ids[:k]
    return sum(1 for cid in retrieved_top_k if cid in ground_truth_ids) / k if k else 0.0

def ndcg_at_k(retrieved_chunks, ground_truth_ids, k=10):
    top_k = retrieved_chunks[:k]
    relevances = [1 if chunk in ground_truth_ids else 0 for chunk in top_k]
    dcg = sum((2**rel - 1) / np.log2(idx + 2) for idx, rel in enumerate(relevances))
    ideal_relevances = sorted([1]*min(len(ground_truth_ids), k) + [0]*(k - min(len(ground_truth_ids), k)), reverse=True)
    idcg = sum((2**rel - 1) / np.log2(idx + 2) for idx, rel in enumerate(ideal_relevances))
    return dcg / idcg if idcg > 0 else 0.0

def idcg(matched_chunk_positions):
    total_relevant = len(matched_chunk_positions)
    if total_relevant == 0:
        return 0.0
    return sum(1 / math.log2(i + 2) for i in range(total_relevant))

def precision_at_r(retrieved_ids, ground_truth_ids):
    r = len(ground_truth_ids)
    retrieved_top_r = retrieved_ids[:r]
    return sum(1 for cid in retrieved_top_r if cid in ground_truth_ids) / r if r else 0.0

def hit_rate_at_k(retrieved_ids, ground_truth_ids, k):
    return int(any(cid in ground_truth_ids for cid in retrieved_ids[:k]))

def reciprocal_rank_at_k(retrieved_ids, ground_truth_ids, k):
    for idx, cid in enumerate(retrieved_ids[:k]):
        if cid in ground_truth_ids:
            return 1.0 / (idx + 1)
    return 0.0

In [37]:
import ast

def safe_list(val):
    if isinstance(val, list):
        return val
    if isinstance(val, str):
        try:
            return ast.literal_eval(val)
        except Exception:
            return []
    return []

In [38]:
def evaluate_single_query(row, k=10, use_attributions=False):
    index_df = int(row['qid'])
    user_input = row["question"]
    answer = row["answer"]
    response = row["response"]
    equivalence_rationale = row["equivalence_rationale"]
    preference_rationale = row["preference_evaluation"]
    ground_truth_ids = set(safe_list(row["datastore_document_ids"]))
    retriever_ids = safe_list(row["retrievals"])

    if use_attributions == True:
        attributions_ids = row["attributions"]
        retrieved_ids = attributions_ids
    else:
        retrieved_ids = retriever_ids

    if not ground_truth_ids:
        # handle the empty case, e.g., skip, or set all metrics to 0 or np.nan
        recall = precision = ndcg = icdg = precision_r = hit_rate = mrr = 0.0
    else:
        recall = recall_at_k(retrieved_ids, ground_truth_ids, k)
        precision = precision_at_k(retrieved_ids, ground_truth_ids, k)
        ndcg = ndcg_at_k(retrieved_ids, ground_truth_ids, k)
        icdg = idcg(set(range(min(len(ground_truth_ids), k))))
        precision_r = precision_at_r(retrieved_ids, ground_truth_ids)
        hit_rate = hit_rate_at_k(retrieved_ids, ground_truth_ids, k)
        mrr = reciprocal_rank_at_k(retrieved_ids, ground_truth_ids, k)

    # --- RAGAS metrics ---
    reference_contexts = list(ground_truth_ids)
    retrieved_contexts = retrieved_ids

    if not reference_contexts or not retrieved_contexts:
        recall_score, precision_score = 0.0, 0.0  # or np.nan, or skip this row
    else:
        sample = SingleTurnSample(
            user_input=user_input,
            reference_contexts=reference_contexts,
            retrieved_contexts=retrieved_contexts,
        )
        async def run_ragas_metrics(sample):
            context_recall = NonLLMContextRecall()
            context_precision = NonLLMContextPrecisionWithReference()
            recall_score = await context_recall.single_turn_ascore(sample)
            precision_score = await context_precision.single_turn_ascore(sample)
            return recall_score, precision_score
        recall_score, precision_score = asyncio.run(run_ragas_metrics(sample))


    return {
        "index": index_df,
        "user_input": user_input,
        "answer": answer,
        "response": response,
        "equivalence_rationale": equivalence_rationale,
        "preference_rationale": preference_rationale,
        "k": k,
        "recall@k": recall,
        "precision@k": precision,
        "precision@R": precision_r,
        "nDCG@k": float(ndcg),
        "iDCG@k": float(icdg),
        "hit_rate@k": hit_rate,
        "mrr@k": mrr,
        "ragas_context_recall": recall_score,
        "ragas_context_precision": precision_score,
        "ground_truth_ids": list(ground_truth_ids),
        "retriever_ids": retriever_ids,
       # "attributions": attributions_ids,
    }

## Aggregate and Summarize Results

We aggregate the results across all questions to report mean retrieval and equivalence metrics, providing an overall view of system performance.

In [None]:
# Pick the first example (row 0) from your DataFrame
row = df_sample_200.iloc[0]

# Run the evaluation for this single example
result = evaluate_single_query(row, k=10,use_attributions=False)

# Print the results
print(result)

In [None]:
results = []
for idx, row in df_sample_200.iterrows():
    try:
       # print(row)
        result = evaluate_single_query(row, k=15,use_attributions=False)
        score = row.get('equivalence_score', None)
        preference_score = row.get('preference_rating', None)
        try:
            score = int(float(score)) if score is not None else None
        except Exception:
            score = None
        result['equivalence_score'] = score
        result['preference_rating'] = preference_score
        results.append(result)
    except Exception as e:
        print(f"Error on row {idx}: {e}")
        results.append({
          "ragas_context_recall": 0.0,
          "ragas_context_precision": 0.0,
      })
        # Optionally, append a placeholder or skip
        continue

metrics = [
    'recall@k',
    'precision@k',
    'precision@R',
    'nDCG@k',
    'iDCG@k',
    'ragas_context_recall',
    'ragas_context_precision',
    'equivalence_score',
    'preference_rating',
]

# Final save
final = pd.DataFrame(results)
final.to_csv("retrieval_eval_results_final.csv", index=False)

means = final[metrics].mean()
print(means)
print(final.shape)

## Conclusion

This notebook provides a robust, end-to-end evaluation of a RAG system, including both retrieval quality and answer equivalence, with enforced structured outputs for reliability and reproducibility.