In [None]:
# Copyright (c) 2025 Microsoft Corporation.

import sys

sys.path.insert(1, "../../../")

In [None]:
import nest_asyncio

nest_asyncio.apply()

In [None]:
%load_ext dotenv
%dotenv

In [None]:
import json
import logging
import os

import tiktoken

from pydantic import SecretStr
from pathlib import Path
import pandas as pd

from benchmark_qed.autod.data_processor.embedding import TextEmbedder
from benchmark_qed.autod.io.text_unit import load_text_units

from benchmark_qed.config.llm_config import LLMConfig, LLMProvider
from benchmark_qed.llm.factory import ModelFactory

from benchmark_qed.autoe.retrieval_scores.relevance_assessment.bing_rater import BingRelevanceRater
from benchmark_qed.autoq.data_model.question import Question

logging.basicConfig(level=logging.INFO)

if logging.getLogger("httpx") is not None:
    logging.getLogger("httpx").setLevel(logging.ERROR)

In [None]:
# DATA CONFIGS

INPUT_DATA_PATH = Path("./example_retrieval")
OUTPUT_DATA_PATH = Path("./output/retrieval_scores")

# tokenizer used for chunking documents into text units
ENCODING_MODEL = "o200k_base"

# MODEL CONFIGS
API_KEY = SecretStr(os.getenv("OPENAI_API_KEY", ""))
EMBEDDING_MODEL = "text-embedding-3-large"
LLM_MODEL = "gpt-4.1"
LLM_PARAMS = {
    "temperature": 0.0,
    "seed": 42,
}  # adjust this based on your model. For example, some reasoning models do not support temperature settings

EMBEDDING_LLM_CONFIG = LLMConfig(
    model=EMBEDDING_MODEL,
    api_key=API_KEY,
    llm_provider=LLMProvider.OpenAIEmbedding,
)

COMPLETION_LLM_CONFIG = LLMConfig(
        model=LLM_MODEL,
        api_key=API_KEY,
        llm_provider=LLMProvider.OpenAIChat,
        call_args=LLM_PARAMS,
    )

text_embedder = TextEmbedder(
    ModelFactory.create_embedding_model(
        EMBEDDING_LLM_CONFIG
    )
)
llm = ModelFactory.create_chat_model(
    model_config=COMPLETION_LLM_CONFIG

)

token_encoder = tiktoken.get_encoding(ENCODING_MODEL)

# METRICS CONFIGS
RAG_METHODS = ["lazygraphrag", "vector_rag"]
QUESTION_SETS = ["activity_local", "activity_global"]
NUM_CLUSTERS = 10 # set to None to enable auto tuning, which will be slow
NUM_QUESTIONS = 10 # set to None to compute for all questions
SEMANTIC_REPRESENTATIVES = 15 # Increase this to reduce classification error when generating reference context for comparison
CENTROID_REPRESENTATIVES = 5 # Increase this to reduce classification error when generating reference context for comparison
RELEVANCE_THRESHOLD = 2

relevance_rater = BingRelevanceRater(
    llm_client=llm,
    llm_config=COMPLETION_LLM_CONFIG,
    concurrent_requests=32  # Lower concurrency for cluster generation
)

## Generate Reference Context 

For each query, retrieve relevant clusters and top relevant chunks per cluster, to be used to evaluate RAG's retrieval performance

In [None]:
from benchmark_qed.autoe.retrieval_scores.reference_gen.cluster_relevance import (
    ClusterRelevanceRater,
)
from benchmark_qed.autoe.retrieval_scores.reference_gen.cluster_relevance import save_cluster_references_to_json
from benchmark_qed.autoq.data_model.question import Question

In [None]:
# Load text units from parquet file
text_df = pd.read_parquet(INPUT_DATA_PATH / "text_units.parquet")
if 'short_id' not in text_df.columns:
    text_df['short_id'] = text_df.index.astype(str)

corpus = load_text_units(text_df)
print(f"Loaded {len(corpus)} text units")

# embed text units if needed
# skip this if you already have embeddings in your corpus
print(f"Embedding {len(corpus)} text units")
corpus = await text_embedder.embed_batch(
    text_units=corpus,
    batch_size=32,
)
print(f"Embedded {len(corpus)} text units")

# Create cluster relevance rater with text units data
cluster_rater = ClusterRelevanceRater(
    text_embedder=text_embedder,
    relevance_rater=relevance_rater,
    corpus=corpus,  # Will perform clustering once and reuse for all queries
    semantic_neighbors=SEMANTIC_REPRESENTATIVES,
    centroid_neighbors=CENTROID_REPRESENTATIVES,
    num_clusters=NUM_CLUSTERS, # set to None to tune number of clusters, but might be slow
)

print(f"Cluster relevance rater initialized with {len(cluster_rater.clusters)} clusters")


for question_set in QUESTION_SETS:
    print(f"\nGenerating cluster references for question set: {question_set}")
    
    # Load questions from vector_rag retrieval results
    context_path = Path(INPUT_DATA_PATH / "vector_rag" / f"{question_set}.json")
    with open(context_path, "r") as f:
        retrieval_result_dicts = json.load(f)
    
    # Extract questions from retrieval results
    questions = [
        Question(id=result["question_id"], text=result["question_text"])
        for result in retrieval_result_dicts
    ]
    if NUM_QUESTIONS is not None:
        questions = questions[:NUM_QUESTIONS]

    print(f"Loaded {len(questions)} questions")
    
    # Generate cluster references using batch assessment
    batch_results = await cluster_rater.assess_batch(questions)
    
    print(f"Generated cluster references for {len(batch_results)} questions")
    
    # Save batch results to JSON using the correct function name
    output_path = Path(OUTPUT_DATA_PATH / "cluster_references" / f"{question_set}_cluster_references.json")
    save_cluster_references_to_json(
        batch_results, 
        output_path,
        include_clusters=True,
        clusters=cluster_rater.clusters
    )

    print(f"Saved cluster references to {output_path}")

print("\nâœ“ Cluster reference generation completed for all question sets")

## Retrieval Relevance Score

In [None]:
from benchmark_qed.autoe.retrieval_scores.relevance_assessment.bing_rater import BingRelevanceRater
from benchmark_qed.autoe.data_model.retrieval_result import load_retrieval_results_from_dicts
from benchmark_qed.autoe.retrieval_scores.scoring.retrieval_relevance import assess_batch_relevance


In [None]:
for rag_method in RAG_METHODS:
    print(f"Evaluating RAG method: {rag_method}")

    for question_set in QUESTION_SETS:
        print(f" Evaluating question set: {question_set}")

        # load context from json file
        context_path = Path(INPUT_DATA_PATH / rag_method / f"{question_set}.json")
        with open(context_path, "r") as f:
            retrieval_result_dicts = json.load(f)

            retrieval_results = load_retrieval_results_from_dicts(
                data=retrieval_result_dicts, 
                context_id_key="source_id",
                context_text_key="text",
                auto_transform_context=True  # This will ensure short_id is generated
            )
            if NUM_QUESTIONS is not None:
                retrieval_results = retrieval_results[:NUM_QUESTIONS]

            relevance_results = await assess_batch_relevance(
                retrieval_results=retrieval_results,
                relevance_rater=relevance_rater
            )

            # save relevance results to json file
            output_path = Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_relevance.json")
            relevance_results.save_to_json(output_path)

## Calculate scores

### Precision

In [None]:
from benchmark_qed.autoe.retrieval_scores.scoring.precision import get_precision_summary
from benchmark_qed.autoe.retrieval_scores.scoring.retrieval_relevance import BatchRelevanceResult

for rag_method in RAG_METHODS:
    for question_set in QUESTION_SETS:
        # load relevance results from json file
        relevance_results = BatchRelevanceResult.load_from_json(Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_relevance.json"))

        # compute precision summary
        precision_summary = get_precision_summary(relevance_results, relevance_threshold=RELEVANCE_THRESHOLD)

        # save precision summary to json file
        output_path = Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_precision.json")
        with open(output_path, "w") as f:
           json.dump(precision_summary, f)


### Recall

In [None]:
from benchmark_qed.autoe.retrieval_scores.scoring.recall import calculate_recall
from benchmark_qed.autoe.retrieval_scores.reference_gen.cluster_relevance import load_cluster_references_from_json
from benchmark_qed.autoe.retrieval_scores.scoring.retrieval_relevance import BatchRelevanceResult


for rag_method in RAG_METHODS:
    for question_set in QUESTION_SETS:
        print(f"Calculating recall for {rag_method} - {question_set}")
        
        # Load relevance results (QueryRelevanceResult objects)
        relevance_results = BatchRelevanceResult.load_from_json(
            Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_relevance.json")
        )
        
        # Load cluster references and clusters
        cluster_references_path = Path(OUTPUT_DATA_PATH / "cluster_references" / f"{question_set}_cluster_references.json")
        cluster_references, clusters = load_cluster_references_from_json(cluster_references_path)
        
        print(f"  Loaded {len(relevance_results.results)} relevance results")
        print(f"  Loaded {len(cluster_references)} cluster references")
        
        # Calculate recall metrics with cluster classification error statistics
        recall_results = calculate_recall(
            query_relevance_results=relevance_results.results,
            retrieval_references=cluster_references,
            relevance_threshold=RELEVANCE_THRESHOLD,
            clusters=clusters,
            use_text_unit_short_id=True
        )
        
        # Save recall results to JSON file
        output_path = Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_recall.json")
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        with open(output_path, "w") as f:
            json.dump(recall_results, f, indent=2, default=str)
        
        print(f"  âœ… Saved recall results to {output_path}")

### Fidelity

Fidelity measures how similar the distribution of relevant text units is between reference clusters and query relevance results. It uses Jensen-Shannon divergence to quantify the similarity between these distributions:

- **Fidelity = 1.0 - JS_Divergence**  
- **Higher fidelity** = More similar distributions = Better retrieval performance
- **Lower fidelity** = Different distributions = Retrieval may be missing key clusters or focusing on wrong areas


In [None]:
from benchmark_qed.autoe.retrieval_scores.scoring.fidelity import calculate_fidelity
from benchmark_qed.autoe.retrieval_scores.reference_gen.cluster_relevance import load_cluster_references_from_json
from benchmark_qed.autoe.retrieval_scores.scoring.retrieval_relevance import BatchRelevanceResult


for rag_method in RAG_METHODS:
    for question_set in QUESTION_SETS:
        print(f"Calculating fidelity for {rag_method} - {question_set}")
        
        # Load relevance results (QueryRelevanceResult objects)
        relevance_results = BatchRelevanceResult.load_from_json(
            Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_relevance.json")
        )
        
        # Load cluster references and clusters
        cluster_references_path = Path(OUTPUT_DATA_PATH / "cluster_references" / f"{question_set}_cluster_references.json")
        cluster_references, clusters = load_cluster_references_from_json(cluster_references_path)
        
        print(f"  Loaded {len(relevance_results.results)} relevance results")
        print(f"  Loaded {len(cluster_references)} cluster references")
       
        # Calculate fidelity metrics using Jensen-Shannon divergence
        fidelity_results = calculate_fidelity(
            query_relevance_results=relevance_results.results,
            retrieval_references=cluster_references,
            relevance_threshold=RELEVANCE_THRESHOLD,
            clusters=clusters,
            use_text_unit_short_id=True
        )
        
        # Save fidelity results to JSON file
        output_path = Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_fidelity.json")
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        with open(output_path, "w") as f:
            json.dump(fidelity_results, f, indent=2, default=str)
        
        print(f"  âœ… Saved fidelity results to {output_path}")
        
print("\nâœ“ Fidelity calculation completed for all RAG methods and question sets")

## Summary Comparison

Compare precision, recall, and fidelity metrics across all RAG methods and question sets.

In [None]:
# Create comprehensive comparison across all metrics
import pandas as pd

comparison_data = []

for rag_method in RAG_METHODS:
    for question_set in QUESTION_SETS:
        row_data = {
            "RAG_Method": rag_method,
            "Question_Set": question_set
        }
        
        # Load and summarize precision metrics
        try:
            with open(Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_precision.json"), "r") as f:
                precision_data = json.load(f)
                # Use both binary and graded precision from the actual precision output
                row_data["Binary_Precision"] = precision_data["summary"].get("macro_averaged_binary_precision", 0)
                row_data["Graded_Precision"] = precision_data["summary"].get("macro_averaged_graded_precision", 0)
        except FileNotFoundError:
            row_data["Binary_Precision"] = 0
            row_data["Graded_Precision"] = 0
        
        # Load and summarize recall metrics
        try:
            with open(Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_recall.json"), "r") as f:
                recall_data = json.load(f)
                # Use the correct field names from the actual recall output
                row_data["Recall"] = recall_data.get("macro_averaged_recall", 0)
                row_data["Cluster_Classification_Error"] = recall_data.get("macro_averaged_classification_error", 0)
        except FileNotFoundError:
            row_data["Recall"] = 0
            row_data["Cluster_Classification_Error"] = 0
            
        # Load and summarize fidelity metrics
        try:
            with open(Path(OUTPUT_DATA_PATH / rag_method / f"{question_set}_fidelity.json"), "r") as f:
                fidelity_data = json.load(f)
                # Use the correct field names from the actual fidelity output
                row_data["Fidelity"] = fidelity_data.get("macro_averaged_fidelity", 0)
                row_data["JS_Divergence"] = fidelity_data.get("macro_averaged_js_divergence", 0)
        except FileNotFoundError:
            row_data["Fidelity"] = 0
            row_data["JS_Divergence"] = 0
        
        comparison_data.append(row_data)

# Create comparison DataFrame
comparison_df = pd.DataFrame(comparison_data)

print("ðŸ“Š Metrics Comparison")
print("=" * 80)
print(comparison_df.round(4).to_string(index=False))

# Save comparison to CSV for further analysis
comparison_output_path = Path(OUTPUT_DATA_PATH / "metrics_comparison.csv")
comparison_df.to_csv(comparison_output_path, index=False)
print(f"\nðŸ’¾ Saved comparison to {comparison_output_path}")
