## Monitoring your RAG applications using RAGAS


In the following example, we'll plug in retrieval and augmented generation strategies to our RAG app and evaluate these different strategies using [RAGAS](https://docs.ragas.io/en/stable/concepts/index.html).

First, make sure you've followed the setup directions in the README. Then, install LlamaIndex, which we'll use to build our RAG workflow.


In [None]:
from dotenv import load_dotenv, find_dotenv

load_dotenv(dotenv_path="../.env", override=True)

### Prepare your knowledge base


Let's download the raw Markdown documents and convert them to LlamaIndex nodes, which represent chunks of our source Markdown documents.


In [None]:
from llama_index.core import Document
from llama_index.core.node_parser import MarkdownNodeParser
from llama_index.core import Document
from llama_index.core import VectorStoreIndex
from tqdm import tqdm

import os

folder_path = "mdfiles"  # Replace with the actual path if different

raw_doc_texts = []

# Iterate through all files in the specified folder
for filename in os.listdir(folder_path):
    if filename.endswith(".md"):
        file_path = os.path.join(folder_path, filename)
        with open(file_path, "r", encoding="utf-8") as file:
            markdown_content = file.read()
        raw_doc_texts.append(
            Document(text=markdown_content, metadata={"filename": filename})
        )

parser = MarkdownNodeParser()
base_nodes = parser.get_nodes_from_documents(raw_doc_texts)

### Create a baseline retriever


Next, initialize a baseline retriever that fetches the top-k raw text nodes based on embedding similarity to an input query.


In [None]:
from llama_index.embeddings.bedrock import BedrockEmbedding

model = BedrockEmbedding(
    model_name="cohere.embed-multilingual-v3",
    credentials_profile_name="myprofile",
    region_name="eu-west-3",
)

In [None]:
# TEST
model.get_text_embedding("hello world!")

In [None]:
TOP_K = 2
base_index = VectorStoreIndex(base_nodes, embed_model=model)
base_retriever = base_index.as_retriever(similarity_top_k=TOP_K)

Initialize a response synthesizer to help generate the answer to a question based on retrieved context documents.


In [None]:
from llama_index.core.response_synthesizers import ResponseMode
from llama_index.core import get_response_synthesizer
from llama_index.llms.bedrock.base import Bedrock

llm = Bedrock(
    model="anthropic.claude-3-haiku-20240307-v1:0",
    profile_name="myprofile",
    region_name="eu-west-3",
    temperature=0,
    max_tokens=3000,
)

response_synthesizer_compact = get_response_synthesizer(
    response_mode=ResponseMode.COMPACT, llm=llm
)

Let's define a `retrieve_nodes` function that uses the `base_retriever` to fetch the most relevant context documents given a query. We'll also define an `ask_docs` workflow that combines the retrieval and augmented generation step to return a final answer for a given query.

To instrument the retrieval step, we

- Decorate the `retrieve_nodes` step with the `retrieval` decorator.
- Annotate the span's `input_data` as the input query.
- Annotate the span's `output_data` as a list of dictionaries which each represent a single chunk.
- Annotate the span's `metadata` with our `top_k` setting.
- Tag our retrieval step with the retriever we are using

Note that we also return the result of `LLMObs.export_span()` at the end of the `ask_docs` function. We'll need the exported span for later when we submit evaluation results to Datadog.


In [None]:
def retrieve_nodes(query, retriever=base_retriever):
    nodes = retriever.retrieve(query)
    return nodes


def ask_docs(
    query, retriever=base_retriever, response_synthesizer=response_synthesizer_compact
):
    nodes = retrieve_nodes(query, retriever=retriever)
    response = response_synthesizer.synthesize(query, nodes=nodes)
    return response

Our RAG workflow is ready! Try a question about LLM Observability. What do you think of the answer quality?


In [None]:
STARTER_QUESTION = "What AWS Bedrock and what are its features?"

answer = ask_docs(STARTER_QUESTION, retriever=base_retriever)

print("Answer: {}".format(answer))
print("Context: {}".format([reference.text for reference in answer.source_nodes]))

### Recursive retriever


Now let's implement a recursive retriever and plug that into our `ask_docs` workflow.

A recursive retriever first builds a graph of small chunks that have references to larger parent chunks. At query-time, smaller chunks are retrieved first, and then we follow references to bigger chunks. This enhances the context we pass to the augmented generation step. For more information on recursive retrieval, see LlamaIndex's [recursive retrieval](https://docs.llamaindex.ai/en/stable/examples/retrievers/recursive_retriever_nodes/) guide.

Since our raw documents are in Markdown, there's already an implicit parent-child relationship between different text chunks. LlamaIndex provides helpful utility functions to automatically parse these relationships and form an index that is searchable using their `RecursiveRetriever` module.


In [None]:
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import IndexNode
from llama_index.core.retrievers import RecursiveRetriever


sub_chunk_sizes = [256, 512]
sub_node_parsers = [
    SentenceSplitter(chunk_size=c, chunk_overlap=20) for c in sub_chunk_sizes
]

all_nodes = []
for base_node in tqdm(base_nodes):
    for n in sub_node_parsers:
        sub_nodes = n.get_nodes_from_documents([base_node])
        sub_inodes = [
            IndexNode.from_text_node(sn, base_node.node_id) for sn in sub_nodes
        ]
        all_nodes.extend(sub_inodes)

    # also add original node to node
    original_node = IndexNode.from_text_node(base_node, base_node.node_id)
    all_nodes.append(original_node)

print("Nodes Created")
all_nodes_dict = {n.node_id: n for n in all_nodes}

print("Creating Vector Store")
vector_index_chunk = VectorStoreIndex(all_nodes, embed_model=model)


print("Creating Retriever")
vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=TOP_K)

recursive_retriever = RecursiveRetriever(
    "vector",
    retriever_dict={"vector": vector_retriever_chunk},
    node_dict=all_nodes_dict,
    verbose=True,
)

Let's see if the answer improved from our earlier step...


In [None]:
answer = ask_docs(STARTER_QUESTION, retriever=recursive_retriever)

print("Answer: {}".format(answer))
print("Context: {}".format([reference.text for reference in answer.source_nodes]))

How does the context differ for our two different retrieval strategies, and ultimately, which response do you think is better?


### RAGAS Setup

Suppose you wanted to deploy both the baseline retriever and recursive retriever and evaluate how well each retrieval strategy is doing in a production environment. Our LLM Observability SDK enables this through the `submit_evaluation` function.

As an example, we'll use the RAGAS open source library to evaluate our RAG workflow. It's powered by LLM-assisted evaluations that measure the performance of your retrievals, augmented generation, and RAG workflow end-to-end.


First, we'll define a list of questions we'll ask our RAG app. Some RAGAS evaluations also require ground truth answers in relation to a target question, so we'll have to define those as well.


In [None]:
eval_questions = [
    "How do I get started?",
    "I have a complex chatbot, what root span should I use to represent this bot?",
    "I have a summarization LLM service with some simple pre-and-post processing steps, what root span should I use to represent this bot?",
    "I don't want to manually instrument my app. Can I still use LLM Observability?",
    "What's the ml app tag?",
    "How can I enable user session tracking?",
]

eval_ground_truths = [
    "To get started with LLM Observability, you can build a simple example with the Quickstart, or follow the guide for instrumenting your LLM application. Make sure to grab your Datadog API Key",
    "You should use an agent root span to represent your complex chatbot.",
    "You should use a workflow root span to represent your complex chatbot.",
    "LLM Observability has supported integrations for openai, bedrock, and langchain and these libraries will automatically be traced",
    "The name of your LLM application, service, or project, under which all traces and spans are grouped. This helps distinguish between different applications or experiments.",
    "When starting a root span for a new trace or span in a new process, specify the session_id argument with the string ID of the underlying user session. You can also set the session_id field when submitting spans via API.",
]

Import RAGAS metrics


In [None]:
from ragas.metrics import (
    Faithfulness,
    ResponseRelevancy,
    LLMContextPrecisionWithReference,
)

We'll need to enrich each of the RAGAS metrics with some metadata that will be relevant when we submit results to Datadog.

We'll split out RAGAS metrics into two categories - `production` and `dev`.

`production` evaluations don't require ground truths to compute the final score, meaning they can be continously run against production data, while `dev` evaluations require a ground truth.

We also specify that the metric type is type `score`, which tells Datadog the evaluation metric has the value of a continuous float.


In [None]:
from typing_extensions import TypedDict, Callable
from ragas.embeddings import LangchainEmbeddingsWrapper
from ragas.embeddings import LangchainEmbeddingsWrapper
from langchain_aws import ChatBedrockConverse
from langchain_aws import BedrockEmbeddings
from ragas.llms import LangchainLLMWrapper
from ragas.embeddings import LangchainEmbeddingsWrapper
from ragas.metrics import (
    LLMContextPrecisionWithReference,
    Faithfulness,
    ResponseRelevancy,
)
import boto3

session = boto3.Session(
    profile_name="myprofile",
    region_name="eu-west-3",
)

bedrock_llm = ChatBedrockConverse(
    client=session.client("bedrock-runtime"),
    region_name="eu-west-3",
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    base_url="https://bedrock-runtime.eu-west-3.amazonaws.com",
    temperature=0.5,
)

bedrock_embeddings = BedrockEmbeddings(
    client=session.client("bedrock-runtime"),
    model_id="cohere.embed-multilingual-v3",
    region_name="eu-west-3",
)


bedrock_llm = LangchainLLMWrapper(bedrock_llm)
bedrock_embeddings = LangchainEmbeddingsWrapper(bedrock_embeddings)


class RagasMetric(TypedDict):
    function: Callable
    category: str
    metric_type: str


ragas_metrics = {
    Faithfulness.name: RagasMetric(
        function=Faithfulness(llm=bedrock_llm), category="prod", metric_type="score"
    ),
    ResponseRelevancy.name: RagasMetric(
        function=ResponseRelevancy(llm=bedrock_llm),
        category="prod",
        metric_type="score",
    ),
    LLMContextPrecisionWithReference.name: RagasMetric(
        function=LLMContextPrecisionWithReference(llm=bedrock_llm),
        category="dev",
        metric_type="score",
    ),
}

Initialize an `EvaluationData` class where we'll save our inference results to later evaluate on. We want to keep track of question, answer, and contexts as inputs to the RAGAS evaluations.

We also track

1. An exported span so we can tie each evaluation to a specific run of our RAG workflow
2. Tags on our evaluations


In [None]:
class EvaluationData(TypedDict):
    question: str
    answer: str
    contexts: list[str]
    tags: dict[str, str]

The following `run_simulation` function will take a list of evaluation questions and run our RAG app using the specified RAG configuration.


In [None]:
def run_simulation(
    questions,
    ground_truths,
    retrievers=[base_retriever, recursive_retriever],
    response_modes=["compact"],
):

    simulation_results = []

    for mode in response_modes:

        response_synthesizer = response_synthesizer_compact

        for retrieval_strategy in retrievers:

            for question, ground_truth in tqdm(
                zip(questions, ground_truths), total=len(questions)
            ):

                answer = ask_docs(
                    question,
                    retriever=retrieval_strategy,
                    response_synthesizer=response_synthesizer,
                )

                simulation_results.append(
                    EvaluationData(
                        question=question,
                        answer=str(answer),
                        ground_truth=ground_truth,
                        contexts=[r.text for r in answer.source_nodes],
                        tags={
                            "retriever": (
                                "recursive"
                                if retrieval_strategy == recursive_retriever
                                else "base"
                            ),
                            "response_mode": mode,
                            "top_k": TOP_K,
                        },
                    )
                )
    return simulation_results

Get the evaluation results using both our baseline and recursive retriever.


In [None]:
evaluation_data = run_simulation(
    eval_questions, eval_ground_truths, retrievers=[base_retriever, recursive_retriever]
)

It's time to run RAGAS evaluations and submit the evaluations to Datadog

We use the `submit_evaluation` function to send custom evaluation metric data to Datadog.

1. Since each evaluation is tied to a span, we used the exported span returned from the earlier function call and pass that into `submit_evaluation`.
2. You have to specify the metric type as `score` or `categorical` for each metric you submit to Datadog. So far, all the RAGAS metrics we've used are `score` metrics. However, RAGAS [aspect critiques](https://docs.ragas.io/en/stable/concepts/metrics/critique.html) would be submitted as categorical type evaluation metrics.
3. We also tag our evaluation metric with some metadata about the RAG strategy and metric category.


In [None]:
import math
from datasets import Dataset
from ragas import evaluate
import pandas as pd


def run_ragas(evaluation_data, ragas_metrics):
    """
    Run Ragas evaluation and generate a comprehensive summary DataFrame.

    Args:
        evaluation_data (list): List of evaluation data dictionaries
        ragas_metrics (dict): Dictionary of Ragas metrics to evaluate

    Returns:
        tuple: A pair of pandas DataFrames (detailed metrics, aggregated metrics)
    """
    # Initialize lists to collect metric results
    results_list = []

    for span_data in tqdm(evaluation_data, desc="Running Ragas Evaluation"):
        # Prepare input dataset
        ragas_input = Dataset.from_dict(
            {
                "question": [span_data["question"]],
                "answer": [span_data["answer"]],
                "contexts": [span_data["contexts"]],
                "ground_truth": [span_data.get("ground_truth", "")],
            }
        )

        # Evaluate metrics
        results = evaluate(
            ragas_input,
            [metric["function"] for metric in ragas_metrics.values()],
            llm=bedrock_llm,
            embeddings=bedrock_embeddings,
        )

        # Convert results to dictionary
        results_df = results.to_pandas().to_dict("index")[0]

        # Prepare a dictionary to store metric results
        metrics_row = {
            "question": span_data["question"],
            "retriever": span_data["tags"]["retriever"],
            "response_mode": span_data["tags"]["response_mode"],
            "top_k": span_data["tags"]["top_k"],
        }

        # Collect non-NaN metrics
        for metric_name, metric_config in ragas_metrics.items():
            metric_val = results_df[metric_name]
            if not math.isnan(metric_val):
                metrics_row[metric_name] = metric_val
                metrics_row[f"{metric_name}_category"] = metric_config["category"]
                metrics_row[f"{metric_name}_type"] = metric_config["metric_type"]

        results_list.append(metrics_row)

    # Create a DataFrame from the collected results
    summary_df = pd.DataFrame(results_list)

    # Compute overall summary statistics
    aggregation_columns = ["retriever", "response_mode", "top_k"]
    metric_columns = [col for col in summary_df.columns if col in ragas_metrics.keys()]

    summary_stats = summary_df.groupby(aggregation_columns)[metric_columns].agg(
        ["mean", "std"]
    )

    return summary_df, summary_stats


# Optional: Enhanced display function for better readability
def display_ragas_results(summary_df, summary_stats):
    """
    Print Ragas evaluation results in a formatted manner.

    Args:
        summary_df (pd.DataFrame): Detailed metrics DataFrame
        summary_stats (pd.DataFrame): Aggregated metrics DataFrame
    """
    print("Detailed Metrics:")
    print(summary_df)
    print("\nAggregated Metrics:")
    print(summary_stats)

In [None]:
# Usage example:
full_metrics_df, aggregated_metrics_df = run_ragas(evaluation_data, ragas_metrics)
display_ragas_results(full_metrics_df, aggregated_metrics_df)