<div align="center">
<a href="https://rapidfire.ai/"><img src="https://raw.githubusercontent.com/RapidFireAI/rapidfireai/main/docs/images/RapidFire - Blue bug -white text.svg" width="115"></a>
<a href="https://discord.gg/6vSTtncKNN"><img src="https://raw.githubusercontent.com/RapidFireAI/rapidfireai/main/docs/images/discord-button.svg" width="145"></a>
<a href="https://oss-docs.rapidfire.ai/"><img src="https://raw.githubusercontent.com/RapidFireAI/rapidfireai/main/docs/images/documentation-button.svg" width="125"></a>
<br/>
Join Discord if you need help + ‚≠ê <i>Star us on <a href="https://github.com/RapidFireAI/rapidfireai">GitHub</a></i> ‚≠ê
<br/>
To install RapidFire AI on your own machine, see the <a href="https://oss-docs.rapidfire.ai/en/latest/walkthrough.html">Install and Get Started</a> guide in our docs.
</div>

### RapidFire AI RAG/Context Engineering Tutorial Use Case: SciFact Q&A Chatbot

In [None]:
OPENAI_API_KEY = input("Enter your OpenAI API key: ")

In [None]:
from rapidfireai import Experiment
from rapidfireai.evals.automl import (
    List,
    RFLangChainRagSpec,
    RFOpenAIAPIModelConfig,
    RFPromptManager,
    RFGridSearch,
)
import re, json
from typing import List as listtype, Dict, Any

##### ‚ö†Ô∏è API Cost Considerations
This notebook runs 4 configurations concurrently on a downsampled dataset of 256 examples.
Estimated Costs:
- Current run (downsampled): \$5 
- Full set: \$45

> üí° **Tip:** Monitor your API usage to avoid unexpected charges.

### Load Dataset and Rename Columns

In [None]:
import pandas as pd
from datasets import Dataset

# Dataset directory is now in tutorial_notebooks/evals/datasets
data = []
with open("datasets/scifact/queries.jsonl", "r") as f:
    for line in f:
        data.append(json.loads(line))

for d in data:
    if d["metadata"]:
        for info in d["metadata"].values():
            tags = set([meta["label"] for meta in info])
            assert len(tags) == 1
            d["label"] = tags.pop()  # SUPPORT or CONTRADICT
    else:
        d["label"] = "NOINFO"

scifact_dataset = {
    "query": [d["text"] for d in data],
    "query_id": [d["_id"] for d in data],
    "label": [d["label"] for d in data],
}
scifact_dataset = Dataset.from_dict(scifact_dataset).shuffle(seed=42).select(range(256))

qrels = pd.read_csv("datasets/scifact/qrels.tsv", sep="\t")
qrels = qrels.rename(
    columns={"query-id": "query_id", "corpus-id": "corpus_id", "score": "relevance"}
)
qrels.head()

### Create Experiment

In [None]:
experiment = Experiment(experiment_name="exp1-scifact-full-evaluation", mode="evals")

### Define Partial Multi-Config Knobs for LangChain part of RAG Pipeline using RapidFire AI Wrapper APIs

In [None]:
from langchain_community.document_loaders import DirectoryLoader, JSONLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_classic.retrievers.document_compressors import CrossEncoderReranker
from langchain_openai import OpenAIEmbeddings
from typing import Dict

batch_size = 32


def metadata_func(record: Dict, metadata: Dict):
    metadata["corpus_id"] = int(record.get("_id"))
    metadata["title"] = record.get("title")
    return metadata


def custom_template(doc: Document) -> str:
    return f"{doc.metadata['title']}: {doc.page_content}"


# CPU-based RAG
rag_cpu = RFLangChainRagSpec(
    document_loader=DirectoryLoader(
        path="datasets/scifact/",
        glob="corpus.jsonl",
        loader_cls=JSONLoader,
        loader_kwargs={
            "jq_schema": ".",
            "content_key": "text",
            "metadata_func": metadata_func,  # store the document id
            "json_lines": True,
            "text_content": False,
        },
        sample_seed=1337,
    ),
    text_splitter=RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        encoding_name="gpt2", chunk_size=512, chunk_overlap=32
    ),
    embedding_cls=OpenAIEmbeddings,
    embedding_kwargs={"model": "text-embedding-3-small", "api_key": OPENAI_API_KEY},
    vector_store=None,  # uses FAISS by default
    search_type=List(["similarity", "mmr"]),  # 2 different search types
    search_kwargs={"k": 10},
    reranker_cls=CrossEncoderReranker,
    reranker_kwargs={
        "model_name": "cross-encoder/ms-marco-MiniLM-L6-v2",
        "model_kwargs": {"device": "cpu"},
        "top_n": 5,
    },
    enable_gpu_search=False,
    document_template=custom_template,
)

### Define Data Processing and Postprocessing Functions

In [None]:
INSTRUCTIONS = """
You are a helpful assistant that can verify scientific claims. You will be given a scientific claim and a list of documents that are potentially relevant to the claim. Your job is to determine whether the claim is supported, contradicted, or not addressed by the evidence. You will do so by responding with one of the following options:

- SUPPORT: If the evidence supports the claim.
- CONTRADICT: If the evidence contradicts the claim.
- NOINFO: If the evidence does not provide enough information to determine whether the claim is supported or contradicted.

You will output your final answer after reasoning through the evidence. The final answer should be one of the three options and should be formatted as follows:

Reasoning for the answer #### ANSWER

Here is an example:

claim: High cardiopulmonary fitness causes increased mortality rate.

evidence:
One consequence of inactivity, low cardiorespiratory fitness, is an established risk factor for cardiovascular disease (CVD) morbidity and mortality, but the prevalence of cardiorespiratory fitness has not been quantified in representative US population samples.

Cardiosphere-derived cells transplanted into chick embryos migrated to the truncus arteriosus and cardiac outflow tract and contributed to dorsal root ganglia, spinal nerves, and aortic smooth muscle cells. Lineage studies using double transgenic mice encoding protein 0\u2013Cre/Floxed-EGFP revealed undifferentiated and differentiated neural crest-derived cells in the fetal myocardium

Patients undergoing dialysis have a substantially increased risk of cardiovascular mortality and morbidity. Although several trials have shown the cardiovascular benefits of lowering blood pressure in the general population, there is uncertainty about the efficacy and tolerability of reducing blood pressure in patients on dialysis

Response: The evidence suggests that low cardiorespiratory fitness is a known risk factor for cardiovascular disease and therefore the claim is contradicted. #### CONTRADICT
"""

In [None]:
def sample_preprocess_fn(
    batch: Dict[str, listtype], rag: RFLangChainRagSpec, prompt_manager: RFPromptManager
) -> Dict[str, listtype]:
    """Function to prepare the final inputs given to the generator model"""

    all_context = rag.get_context(batch_queries=batch["query"], serialize=False)
    retrieved_documents = [
        [doc.metadata["corpus_id"] for doc in docs] for docs in all_context
    ]
    serialized_context = rag.serialize_documents(all_context)
    batch["query_id"] = [int(query_id) for query_id in batch["query_id"]]

    return {
        "prompts": [
            [
                {"role": "system", "content": INSTRUCTIONS},
                {
                    "role": "user",
                    "content": f"\nClaim:\n{question}. \nEvidence:\n{context}. \nYour response:",
                },
            ]
            for question, context in zip(batch["query"], serialized_context)
        ],
        "retrieved_documents": retrieved_documents,
        **batch,
    }


def extract_solution(answer):
    solution = re.search(r"####\s*(SUPPORT|CONTRADICT|NOINFO)", answer, re.IGNORECASE)
    if solution is None:
        return "INVALID"
    return solution.group(1).upper()


def sample_postprocess_fn(batch: Dict[str, listtype]) -> Dict[str, listtype]:
    """Function to postprocess outputs produced by generator model"""
    # Get ground truth documents for each query; can be done in preprocess_fn too but done here for clarity
    batch["ground_truth_documents"] = [
        qrels[qrels["query_id"] == query_id]["corpus_id"].tolist()
        for query_id in batch["query_id"]
    ]
    batch["answer"] = [extract_solution(answer) for answer in batch["generated_text"]]
    return batch

### Define Custom Eval Metrics Functions

In [None]:
import math


def compute_ndcg_at_k(retrieved_docs: set, expected_docs: set, k=3):
    """Utility function to compute NDCG@k"""
    relevance = [1 if doc in expected_docs else 0 for doc in list(retrieved_docs)[:k]]
    dcg = sum(rel / math.log2(i + 2) for i, rel in enumerate(relevance))

    # IDCG: perfect ranking limited by min(k, len(expected_docs))
    ideal_length = min(k, len(expected_docs))
    ideal_relevance = [3] * ideal_length + [0] * (k - ideal_length)
    idcg = sum(rel / math.log2(i + 2) for i, rel in enumerate(ideal_relevance))

    return dcg / idcg if idcg > 0 else 0.0


def compute_rr(retrieved_docs: set, expected_docs: set):
    """Utility function to compute Reciprocal Rank (RR) for a single query"""
    rr = 0
    for i, retrieved_doc in enumerate(retrieved_docs):
        if retrieved_doc in expected_docs:
            rr = 1 / (i + 1)
            break
    return rr

def compute_accuracy(predictions, ground_truth):
    """Label prediction accuracy: SUPPORT, CONTRADICT, NOINFO"""
    return sum(1 for pred, gt in zip(predictions, ground_truth) if pred == gt) / len(predictions)

def sample_compute_metrics_fn(batch: Dict[str, listtype]) -> Dict[str, Dict[str, Any]]:
    """Function to compute all eval metrics based on retrievals and/or generations"""

    true_positives, precisions, recalls, f1_scores, ndcgs, rrs, acc = 0, [], [], [], [], [], []
    total_queries = len(batch["query"])

    for pred, gt in zip(batch["retrieved_documents"], batch["ground_truth_documents"]):
        expected_set = set(gt)
        retrieved_set = set(pred[:3])

        true_positives = len(expected_set.intersection(retrieved_set))
        precision = true_positives / len(retrieved_set) if len(retrieved_set) > 0 else 0
        recall = true_positives / len(expected_set) if len(expected_set) > 0 else 0
        f1 = (
            2 * precision * recall / (precision + recall)
            if (precision + recall) > 0
            else 0
        )

        precisions.append(precision)
        recalls.append(recall)
        f1_scores.append(f1)
        ndcgs.append(compute_ndcg_at_k(retrieved_set, expected_set, k=3))
        rrs.append(compute_rr(retrieved_set, expected_set))
    
    accuracy = compute_accuracy(batch["answer"], batch["label"])
        

    return {
        "Total": {"value": total_queries},
        "Precision": {"value": sum(precisions) / total_queries},
        "Recall": {"value": sum(recalls) / total_queries},
        "F1 Score": {"value": sum(f1_scores) / total_queries},
        "NDCG@3": {"value": sum(ndcgs) / total_queries},
        "MRR": {"value": sum(rrs) / total_queries},
        "Accuracy": {"value": accuracy}
    }


def sample_accumulate_metrics_fn(
    aggregated_metrics: Dict[str, listtype],
) -> Dict[str, Dict[str, Any]]:
    """Function to accumulate eval metrics across all batches"""

    num_queries_per_batch = [m["value"] for m in aggregated_metrics["Total"]]
    total_queries = sum(num_queries_per_batch)
    algebraic_metrics = ["Precision", "Recall", "F1 Score", "NDCG@3", "MRR", "Accuracy"]

    return {
        "Total": {"value": total_queries},
        **{
            metric: {
                "value": sum(
                    m["value"] * queries
                    for m, queries in zip(
                        aggregated_metrics[metric], num_queries_per_batch
                    )
                )
                / total_queries,
                "is_algebraic": True,
                "value_range": (0, 1),
            }
            for metric in algebraic_metrics
        },
    }

### Define Partial Multi-Config Knobs for OpenAI Generator part of RAG Pipeline using RapidFire AI Wrapper APIs

In [None]:
# 2 openai configs with different sizes of generator models and different reasoning levels
openai_config1 = RFOpenAIAPIModelConfig(
    client_config={"api_key": OPENAI_API_KEY, "max_retries": 2},
    model_config={
        "model": "gpt-5-mini",
        "max_completion_tokens": 4096,
        "reasoning_effort": "high",
    },
    rpm_limit=10_000, # Request per minute (RPM) needs to be set based on your account tier and the specific model used
    tpm_limit=10_000_000, # Token per minute (TPM) needs to be set based on your account tier and the specific model used
    rag=rag_cpu,
    prompt_manager=None,
)

openai_config2 = RFOpenAIAPIModelConfig(
    client_config={"api_key": OPENAI_API_KEY, "max_retries": 2},
    model_config={
        "model": "gpt-4o",
        "max_completion_tokens": 1024,
    },
    rpm_limit=10_000, # Request per minute (RPM) needs to be set based on your account tier and the specific model used
    tpm_limit=2_000_000, # Token per minute (TPM) needs to be set based on your account tier and the specific model used
    rag=rag_cpu,
    prompt_manager=None,
)


config_set = {
    "openai_config": List(
        [openai_config1, openai_config2]
    ),  # Each represents 2 configs
    "batch_size": batch_size,
    "preprocess_fn": sample_preprocess_fn,
    "postprocess_fn": sample_postprocess_fn,
    "compute_metrics_fn": sample_compute_metrics_fn,
    "accumulate_metrics_fn": sample_accumulate_metrics_fn,
    "online_strategy_kwargs": {
        "strategy_name": "normal",
        "confidence_level": 0.95,
        "use_fpc": True,
    },
}

### Create Config Group

In [None]:
# Simple grid search across all sets of config knob values = 4 combinations in total
config_group = RFGridSearch(config_set)

### Run Multi-Config Evals

In [None]:
# Launch evals of all RAG configs in the config_group with swap granularity of 4 chunks
results = experiment.run_evals(
    config_group=config_group,
    dataset=scifact_dataset,
    num_actors=2,
    num_shards=4,
    seed=42,
)

### View Results

In [None]:
# Convert results dict to DataFrame
results_df = pd.DataFrame([
    {k: v['value'] if isinstance(v, dict) and 'value' in v else v for k, v in {**metrics_dict, 'run_id': run_id}.items()}
    for run_id, (_, metrics_dict) in results.items()
])

results_df

### End Experiment

In [None]:
experiment.end()

### View RapidFire AI Log Files

In [None]:
# Get the experiment-specific log file
log_file = experiment.get_log_file_path()

print(f"üìÑ Log File: {log_file}")
print()

if log_file.exists():
    print("=" * 80)
    print(f"Last 30 lines of {log_file.name}:")
    print("=" * 80)
    with open(log_file, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        for line in lines[-30:]:
            print(line.rstrip())
else:
    print(f"‚ùå Log file not found: {log_file}")