In [None]:
try:
    import rapidfireai
    print("‚úÖ rapidfireai already installed")
except ImportError:
    !pip install rapidfireai  # Takes 1 min
    !rapidfireai init --evals # Takes 1 min

In [4]:
import os
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'

from rapidfireai import Experiment
from rapidfireai.evals.automl import List, RFLangChainRagSpec, RFvLLMModelConfig, RFPromptManager, RFGridSearch
import re, json
from typing import List as listtype, Dict, Any

# NB: If you get "AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'" from Colab, just rerun this cell

### Set the current working dir for next data processing instructions

In [None]:
!git clone https://github.com/sasubillis/rag-arguana-rapidfire.git
%cd rag-arguana-rapidfire

### Dataset sanity checks

RAG eval pipelines assume ID consistency.
If any of these are strings while others are integers, bad things happen silently or catastrophically:

queries.jsonl -> query_id
corpus.jsonl -> _id
qrels.tsv -> query_id, corpus_id

RapidFire expects integer IDs (because it hashes, indexes, and joins on them)

#### Dataset normalization contract
This notebook produces a canonical set of dataset files (*.final.*) that are guaranteed to use integer identifiers.

If the source dataset already satisfies this constraint, files are copied unchanged. Otherwise, deterministic ID normalization is applied.

In [None]:
from pathlib import Path
import json
import pandas as pd
import shutil

def prepare_final_dataset(
    dataset_dir: Path,
    corpus_file="corpus.jsonl",
    queries_file="queries.jsonl",
    qrels_file="qrels.tsv",
):
    """
    Produces canonical files:
      - corpus.final.jsonl
      - queries.final.jsonl
      - qrels.final.tsv

    If IDs are already integers, originals are copied unchanged.
    If IDs are strings, deterministic normalization is applied.
    """

    corpus_path = dataset_dir / corpus_file
    queries_path = dataset_dir / queries_file
    qrels_path = dataset_dir / qrels_file

    final_corpus = dataset_dir / "corpus.final.jsonl"
    final_queries = dataset_dir / "queries.final.jsonl"
    final_qrels  = dataset_dir / "qrels.final.tsv"

    # --------------------
    # Load inputs
    # --------------------
    with open(corpus_path) as f:
        corpus = [json.loads(l) for l in f]

    with open(queries_path) as f:
        queries = [json.loads(l) for l in f]

    qrels = pd.read_csv(
        qrels_path,
        sep="\t",
        header=None,
        names=["query_id", "corpus_id", "relevance"]
    )

    # --------------------
    # Detect ID types
    # --------------------
    corpus_id_type = type(corpus[0].get("_id"))
    query_id_type  = type(queries[0].get("query_id"))

    print("üîç ID type check")
    print(f"  corpus _id: {corpus_id_type}")
    print(f"  query query_id: {query_id_type}")

    # --------------------
    # Case A: already normalized ‚Üí copy
    # --------------------
    if corpus_id_type is int and query_id_type is int:
        shutil.copy(corpus_path, final_corpus)
        shutil.copy(queries_path, final_queries)
        shutil.copy(qrels_path, final_qrels)
        print("\n IDs already integers.")
        print("‚û° Copied originals to:")
        print("   - corpus.final.jsonl")
        print("   - queries.final.jsonl")
        print("   - qrels.final.tsv")
        return

    # --------------------
    # Case B: normalize ‚Üí write finals
    # --------------------
    print("\n String IDs detected. Normalizing deterministically...")

    # Build deterministic maps
    corpus_id_map = {doc["_id"]: i for i, doc in enumerate(corpus)}
    query_id_map  = {q["query_id"]: i for i, q in enumerate(queries)}

    # Rewrite corpus
    for doc in corpus:
        doc["_id"] = corpus_id_map[doc["_id"]]

    # Rewrite queries
    for q in queries:
        q["query_id"] = query_id_map[q["query_id"]]

    # Rewrite qrels
    qrels["query_id"]  = qrels["query_id"].map(query_id_map)
    qrels["corpus_id"] = qrels["corpus_id"].map(corpus_id_map)

    if qrels.isnull().any().any():
        raise ValueError(" Qrels reference IDs not present in corpus/queries.")

    # Write canonical finals
    with open(final_corpus, "w") as f:
        for doc in corpus:
            f.write(json.dumps(doc) + "\n")

    with open(final_queries, "w") as f:
        for q in queries:
            f.write(json.dumps(q) + "\n")

    qrels.to_csv(final_qrels, sep="\t", index=False, header=False)

    print("\n Normalization complete.")
    print("‚û° Wrote canonical files:")
    print("   - corpus.final.jsonl")
    print("   - queries.final.jsonl")
    print("   - qrels.final.tsv")


from pathlib import Path

DATASET_DIR = Path.cwd() / "datasets" / "arguana"
prepare_final_dataset(DATASET_DIR)


In [None]:
from datasets import load_dataset
import pandas as pd
import json, random
from pathlib import Path

# ----------------
# Project + dataset root
# ----------------
PROJECT_ROOT = Path.cwd()
dataset_dir = PROJECT_ROOT / "datasets" / "arguana"

# ----------------
# Load queries (canonical final)
# ----------------
arguana_dataset = load_dataset(
    "json",
    data_files=str(dataset_dir / "queries.final.jsonl"),
    split="train"
)

# Rename only if needed
if "text" in arguana_dataset.column_names:
    arguana_dataset = arguana_dataset.rename_columns({"text": "query"})

# ----------------
# Load qrels (canonical final)
# ----------------
qrels = pd.read_csv(
    dataset_dir / "qrels.final.tsv",
    sep="\t",
    header=None,
    names=["query_id", "corpus_id", "relevance"]
)

# ----------------
# Downsample queries + corpus jointly
# ----------------
sample_fraction = 0.01   # set to 1.0 for full eval
rseed = 1
random.seed(rseed)

# Step 1: Sample queries
sample_size = max(1, int(len(arguana_dataset) * sample_fraction))
arguana_dataset = arguana_dataset.shuffle(seed=rseed).select(range(sample_size))

# IDs are guaranteed to be integers
query_ids = set(arguana_dataset["query_id"])

# Step 2: Filter qrels to sampled queries
qrels_filtered = qrels[qrels["query_id"].isin(query_ids)]
relevant_corpus_ids = set(qrels_filtered["corpus_id"].tolist())

print(f"Using {len(arguana_dataset)} queries")
print(f"Found {len(relevant_corpus_ids)} relevant documents for these queries")

# ----------------
# Step 3: Load corpus (canonical final) and filter
# ----------------
input_file = dataset_dir / "corpus.final.jsonl"
output_file = dataset_dir / "corpus_sampled.jsonl"

with open(input_file, "r") as f:
    all_corpus = [json.loads(line) for line in f]

sampled_corpus = [
    doc for doc in all_corpus
    if doc["_id"] in relevant_corpus_ids
]

with open(output_file, "w") as f:
    for doc in sampled_corpus:
        f.write(json.dumps(doc) + "\n")

print(f"Sampled {len(sampled_corpus)} documents from {len(all_corpus)} total")
print(f"Saved to: {output_file}")
print(f"Filtered qrels to {len(qrels_filtered)} relevance judgments")

# Update qrels to match sampled dataset
qrels = qrels_filtered.reset_index(drop=True)


In [None]:
experiment = Experiment(experiment_name="exp1-arguana-rag-colab", mode="evals")

In [None]:
from langchain_community.document_loaders import DirectoryLoader, JSONLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_classic.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder

# Per-Actor batch size for hardware efficiency
batch_size = 50

# 2 chunk sizes x 2 reranking top-n = 4 combinations in total
rag_gpu = RFLangChainRagSpec(
    document_loader=DirectoryLoader(
        path=str(dataset_dir / "arguana"),
        glob="corpus_sampled.jsonl",
        loader_cls=JSONLoader,
        loader_kwargs={
            "jq_schema": ".",
            "content_key": "text",
            "metadata_func": lambda record, metadata: {
                "corpus_id": int(record.get("_id"))
            },  # store the document id
            "json_lines": True,
            "text_content": False,
        },
        sample_seed=42,
    ),
    # 2 chunking strategies with different chunk sizes
    text_splitter=List([
            RecursiveCharacterTextSplitter.from_tiktoken_encoder(
                encoding_name="gpt2", chunk_size=256, chunk_overlap=32
            ),
            RecursiveCharacterTextSplitter.from_tiktoken_encoder(
                encoding_name="gpt2", chunk_size=128, chunk_overlap=32
            ),
            RecursiveCharacterTextSplitter.from_tiktoken_encoder(
                encoding_name="gpt2", chunk_size=64, chunk_overlap=32
            ),
        ],
    ),
    embedding_cls=HuggingFaceEmbeddings,
    embedding_kwargs={
        "model_name": "sentence-transformers/all-MiniLM-L6-v2",
        "model_kwargs": {"device": "cuda:0"},
        "encode_kwargs": {"normalize_embeddings": True, "batch_size": batch_size},
    },
    vector_store=None,  # uses FAISS by default
    search_type="similarity",
    search_kwargs={"k": 8},
    # 2 reranking strategies with different top-n values
    reranker_cls=CrossEncoderReranker,
    reranker_kwargs={
        "model_name": "cross-encoder/ms-marco-MiniLM-L6-v2",
        "model_kwargs": {"device": "cpu"},
        "top_n": List([2]),
        #"top_n": List([2, 5]),
    },
    enable_gpu_search=True,
)

In [9]:
def sample_preprocess_fn(
    batch: Dict[str, listtype], rag: RFLangChainRagSpec, prompt_manager: RFPromptManager
) -> Dict[str, listtype]:
    """Function to prepare the final inputs given to the generator model"""

    INSTRUCTIONS = "Utilize your financial knowledge, give your answer or opinion to the input question or subject matter."

    # Perform batched retrieval over all queries; returns a list of lists of k documents per query
    all_context = rag.get_context(batch_queries=batch["query"], serialize=False)

    # Extract the retrieved document ids from the context
    retrieved_documents = [
        [doc.metadata["corpus_id"] for doc in docs] for docs in all_context
    ]

    # Serialize the retrieved documents into a single string per query using the default template
    serialized_context = rag.serialize_documents(all_context)
    batch["query_id"] = [int(query_id) for query_id in batch["query_id"]]

    # Each batch to contain conversational prompt, retrieved documents, and original 'query_id', 'query', 'metadata'
    return {
        "prompts": [
            [
                {"role": "system", "content": INSTRUCTIONS},
                {
                    "role": "user",
                    "content": f"Here is some relevant context:\n{context}. \nNow answer the following question using the context provided earlier:\n{question}",
                },
            ]
            for question, context in zip(batch["query"], serialized_context)
        ],
        "retrieved_documents": retrieved_documents,
        **batch,
    }


def sample_postprocess_fn(batch: Dict[str, listtype]) -> Dict[str, listtype]:
    """Function to postprocess outputs produced by generator model"""
    # Get ground truth documents for each query; can be done in preprocess_fn too but done here for clarity
    batch["ground_truth_documents"] = [
        qrels[qrels["query_id"] == query_id]["corpus_id"].tolist()
        for query_id in batch["query_id"]
    ]
    return batch

In [10]:
import math


def compute_ndcg_at_k(retrieved_docs: set, expected_docs: set, k=5):
    """Utility function to compute NDCG@k"""
    relevance = [1 if doc in expected_docs else 0 for doc in list(retrieved_docs)[:k]]
    dcg = sum(rel / math.log2(i + 2) for i, rel in enumerate(relevance))

    # IDCG: perfect ranking limited by min(k, len(expected_docs))
    ideal_length = min(k, len(expected_docs))
    ideal_relevance = [3] * ideal_length + [0] * (k - ideal_length)
    idcg = sum(rel / math.log2(i + 2) for i, rel in enumerate(ideal_relevance))

    return dcg / idcg if idcg > 0 else 0.0


def compute_rr(retrieved_docs: set, expected_docs: set):
    """Utility function to compute Reciprocal Rank (RR) for a single query"""
    rr = 0
    for i, retrieved_doc in enumerate(retrieved_docs):
        if retrieved_doc in expected_docs:
            rr = 1 / (i + 1)
            break
    return rr


def sample_compute_metrics_fn(batch: Dict[str, listtype]) -> Dict[str, Dict[str, Any]]:
    """Function to compute all eval metrics based on retrievals and/or generations"""

    true_positives, precisions, recalls, f1_scores, ndcgs, rrs = 0, [], [], [], [], []
    total_queries = len(batch["query"])

    for pred, gt in zip(batch["retrieved_documents"], batch["ground_truth_documents"]):
        expected_set = set(gt)
        retrieved_set = set(pred)

        true_positives = len(expected_set.intersection(retrieved_set))
        precision = true_positives / len(retrieved_set) if len(retrieved_set) > 0 else 0
        recall = true_positives / len(expected_set) if len(expected_set) > 0 else 0
        f1 = (
            2 * precision * recall / (precision + recall)
            if (precision + recall) > 0
            else 0
        )

        precisions.append(precision)
        recalls.append(recall)
        f1_scores.append(f1)
        ndcgs.append(compute_ndcg_at_k(retrieved_set, expected_set, k=5))
        rrs.append(compute_rr(retrieved_set, expected_set))

    return {
        "Total": {"value": total_queries},
        "Precision": {"value": sum(precisions) / total_queries},
        "Recall": {"value": sum(recalls) / total_queries},
        "F1 Score": {"value": sum(f1_scores) / total_queries},
        "NDCG@5": {"value": sum(ndcgs) / total_queries},
        "MRR": {"value": sum(rrs) / total_queries},
    }


def sample_accumulate_metrics_fn(
    aggregated_metrics: Dict[str, listtype],
) -> Dict[str, Dict[str, Any]]:
    """Function to accumulate eval metrics across all batches"""

    num_queries_per_batch = [m["value"] for m in aggregated_metrics["Total"]]
    total_queries = sum(num_queries_per_batch)
    algebraic_metrics = ["Precision", "Recall", "F1 Score", "NDCG@5", "MRR"]

    return {
        "Total": {"value": total_queries},
        **{
            metric: {
                "value": sum(
                    m["value"] * queries
                    for m, queries in zip(
                        aggregated_metrics[metric], num_queries_per_batch
                    )
                )
                / total_queries,
                "is_algebraic": True,
                "value_range": (0, 1),
            }
            for metric in algebraic_metrics
        },
    }

In [11]:
vllm_config1 = RFvLLMModelConfig(
    model_config={
        "model": "Qwen/Qwen2.5-0.5B-Instruct",
        "dtype": "half",
        "gpu_memory_utilization": 0.25,
        "tensor_parallel_size": 1,
        "distributed_executor_backend": "mp",
        "enable_chunked_prefill": False,
        "enable_prefix_caching": False,
        "max_model_len": 3000,
        "disable_log_stats": True,  # Disable vLLM progress logging
        "enforce_eager": True,
        "disable_custom_all_reduce": True,
    },
    sampling_params={
        "temperature": 0.8,
        "top_p": 0.95,
        "max_tokens": 128,
    },
    rag=rag_gpu,
    prompt_manager=None,
)

batch_size = 3 # Smaller batch size for generation
config_set = {
    "vllm_config": vllm_config1,  # Only 1 generator, but it represents 4 full configs
    "batch_size": batch_size,
    "preprocess_fn": sample_preprocess_fn,
    "postprocess_fn": sample_postprocess_fn,
    "compute_metrics_fn": sample_compute_metrics_fn,
    "accumulate_metrics_fn": sample_accumulate_metrics_fn,
    "online_strategy_kwargs": {
        "strategy_name": "normal",
        "confidence_level": 0.95,
        "use_fpc": True,
    },
}

In [15]:
# Simple grid search across all config combinations: 4 total (2 chunkers √ó 2 rerankers)
config_group = RFGridSearch(config_set)

In [None]:
# Display the Ray dashboard in the Colab notebook
from google.colab import output
output.serve_kernel_port_as_iframe(8855)

In [None]:
# Launch evals of all RAG configs in the config_group with swap granularity of 4 chunks
# NB: If your machine has more than 1 GPU, set num_actors to that number
results = experiment.run_evals(
    config_group=config_group,
    dataset=arguana_dataset,
    num_actors=1,
    num_shards=4,
    seed=42,
)

In [None]:
# Convert results dict to DataFrame
results_df = pd.DataFrame([
    {k: v['value'] if isinstance(v, dict) and 'value' in v else v for k, v in {**metrics_dict, 'run_id': run_id}.items()}
    for run_id, (_, metrics_dict) in results.items()
])

results_df

In [None]:
from google.colab import output
from IPython.display import display, HTML

display(HTML('''
<button id="continue-btn" style="padding: 10px 20px; font-size: 16px;">Click to End Experiment</button>
'''))

# eval_js blocks until the Promise resolves
output.eval_js('''
new Promise((resolve) => {
    document.getElementById("continue-btn").onclick = () => {
        document.getElementById("continue-btn").disabled = true;
        document.getElementById("continue-btn").innerText = "Continuing...";
        resolve("clicked");
    };
})
''')

# Actually end the experiment after the button is clicked
experiment.end()
print("Done!")

In [None]:
# Get the experiment-specific log file
log_file = experiment.get_log_file_path()

print(f"üìÑ Log File: {log_file}")
print()

if log_file.exists():
    print("=" * 80)
    print(f"Last 30 lines of {log_file.name}:")
    print("=" * 80)
    with open(log_file, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        for line in lines[-30:]:
            print(line.rstrip())
else:
    print(f"‚ùå Log file not found: {log_file}")

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Select only the relevant columns
plot_df = results_df[[
    "chunk_size",
    "Precision",
    "Recall",
    "F1 Score",
    "NDCG@5",
    "MRR",
    "Processing Time"
]].copy()

# Sort by chunk size for clean plots
plot_df = plot_df.sort_values("chunk_size")

# Create subplots
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
axes = axes.flatten()

metrics = [
    ("Precision", "Precision vs Chunk Size"),
    ("Recall", "Recall vs Chunk Size"),
    ("F1 Score", "F1 Score vs Chunk Size"),
    ("NDCG@5", "NDCG@5 vs Chunk Size"),
    ("MRR", "MRR vs Chunk Size"),
    ("Processing Time", "Processing Time vs Chunk Size (seconds)")
]

for ax, (metric, title) in zip(axes, metrics):
    ax.plot(plot_df["chunk_size"], plot_df[metric], marker="o")
    ax.set_title(title)
    ax.set_xlabel("Chunk Size")
    ax.set_ylabel(metric)
    ax.set_xticks(plot_df["chunk_size"])   # üëà THIS LINE
    ax.grid(True)
    ax.axvline(x=64, linestyle="--", alpha=0.5)

plt.tight_layout()
plt.show()
