# Evaluate Retrieval-Augmented Generation (RAG) pipelines with Ragas and Langfuse

In this notebook we'll explore ways to evaluate the quality of Retrieval-Augmented Generation (RAG) pipelines with the opensource tools like [RAGAS](https://docs.ragas.io/en/v0.1.21/index.html) and leverage the features in [Langfuse](https://langfuse.com/) to manage and trace the RAG pipelines with traces and spans. We will use the Bedrock knowledge base created in previous lab and the RAG batch generation results to show offline evaluation and scoring. Both at solution development time with open-source tools like Ragas, and at run-time with checks built in to Bedrock itself.

### Prerequisites

#### Additional permissions for Amazon OpenSearch

To complete the manual Bedrock Knowledge setup steps in this notebook, your **AWS Console user/role** will need:

- [Permissions to work with Amazon OpenSearch vector collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-vector-search.html)
- Permission to **create IAM roles** and attach policies to them, including: `iam:AttachRolePolicy`, `iam:CreateRole`, `iam:DetachRolePolicy`, `iam:GetRole`, `iam:PassRole`, `iam:CreatePolicy`, `iam:CreatePolicyVersion`, and `iam:DeletePolicyVersion`.

> ℹ️ **Note:** In testing, we saw `NetworkError` issues when attempting to create Bedrock KBs using only the above-linked `aoss` policy statements. This was resolved by granting `aoss:*` on `*` instead, but you should consider reducing these permissions before using in production environments.

If you're in an instructor-led workshop using temporary accounts provided by AWS, this setup should already have been completed for you. If not, refer to the [AWS Console for Identity and Access Management (IAM)](https://console.aws.amazon.com/iam/home?#/home) to grant permissions to your user or role.

#### Setup and python dependencies

In [None]:
%pip install datasets ragas llama_index python-dotenv langchain-aws boto3 --upgrade
%pip install langfuse==2.54.1 

Connect to self-hosted or cloud Langfuse environment.

In [2]:
# Define the environment variables
import os
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-97eba3c3-2e25-4ccc-9f7e-a7c90efd385c" # Your Langfuse project secret key
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-6e1b61bd-9f28-4844-9f6c-0c11c996644d" # Your Langfuse project public key
os.environ["LANGFUSE_HOST"] = "http://langfu-loadb-step7dkkf764-813580692.us-west-2.elb.amazonaws.com" # Region-specific Langfuse domain

See Langfuse documentation for more details: https://langfuse.com/docs

### Bedrock Converse API
Next, let's import the libraries that'll be used in the rest of the notebook - and set some configurations that we'll use later:

- An Amazon S3 bucket_name is required to store our document corpus. 

- A folder prefix under the bucket where artifacts will be stored.

- Setup Bedrock converse API that will be used to access the Foundation Models (FMs) on Amazon Bedrock.

In [None]:
import json

from typing import List, Dict, Optional, Any

from langfuse import Langfuse
from langfuse.client import PromptClient
from langfuse.decorators import observe, langfuse_context
from botocore.exceptions import ClientError

# External Dependencies:
import boto3  # General Python SDK for AWS (including Bedrock)
import pandas as pd  # For working with tabular data
from tqdm.notebook import tqdm  # Progress bars

botosess = boto3.Session(region_name="us-west-2")
region = botosess.region_name
account_id = boto3.client('sts').get_caller_identity()['Account']
bucket_name = f'eval-{account_id}-{region}'
s3_prefix = "bedrock-rag-eval"

# check if s3 bucket exists or not, if not, create bucket
s3 = boto3.client('s3')
try:
    s3.head_bucket(Bucket=bucket_name)
    print(f"Bucket {bucket_name} exists")
except ClientError:
    print(f"Creating bucket {bucket_name}")
    s3.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={'LocationConstraint': region}
    )

# langfuse client
langfuse = Langfuse()
langfuse.auth_check()

In [None]:
# used to access Bedrock configuration
bedrock = boto3.client(
    service_name="bedrock",
    region_name="us-west-2"
)
 
# used to invoke the Bedrock Converse API
bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name="us-west-2"
)

bedrock_agent_runtime = boto3.client(
    service_name="bedrock-agent-runtime",
    region_name="us-west-2"
)

# Check which models are available in your account
models = bedrock.list_inference_profiles()
for model in models["inferenceProfileSummaries"]:
  print(model["inferenceProfileName"] + " - " + model["inferenceProfileId"])

#### Create the knowledge base

> ⚠️ ***Watch out:** This section includes steps you'll need to take manually, not just running the code cells!*

First, we'll need to upload the sample documents to Amazon S3 - for which you can just run the code cell below:

WHERE IS THE STEP TO CREATE THIS BUCKET????

In [None]:
corpus_s3uri = f"s3://{bucket_name}/{s3_prefix}/corpus"

print(f"Syncing corpus to:\n{corpus_s3uri}/")

!aws s3 sync --quiet ./datasets/corpus {corpus_s3uri}/

The simplest way to set up the actual Bedrock Knowledge Base for testing will be **manually through the AWS Console**:

▶️ First, **open** the [AWS Console for Amazon Bedrock](https://console.aws.amazon.com/bedrock/home?#/knowledge-bases) and select *Orchestration > Knowledge bases* from the left sidebar menu, as shown in the screenshot below:

> ⚠️ **Check** you're working in the correct *AWS Region* in the top right corner of the UI

![](images/bedrock-kbs/01-bedrock-kb-console.png "Screenshot of AWS Console for Amazon Bedrock Knowledge Bases, showing 'Create knowledge base' action button")

▶️ **Click** the *Create knowledge base* button and sleect *Knowledge Base with vector *. In the screen that opens:

- For **knowledge base name**, enter `example-squad-kb`
- For **knowledge base description**, you can provide (something like) `Demo knowledge base for question answering evaluation`
- Leave the other settings as default (allow creating a new execution role, and no tags)
- Please chose Amazon S3 as the data source (default)

Your configuration should be as shown below:

![](images/bedrock-kbs/02a-create-kb-basics.png "Screenshot of step 1 in Bedrock Knowledge Base creation workflow: with KB name, description, (create new) execution role, and (empty) tags configured. At the end of the form, a 'Next' button is visible.")

▶️ In the **Next** screen, you'll configure the S3 data source:

Please the data source name as it is and then select the bucket and prefix per you created in the previous step and use Amazon Bedrock default parser. The bucket name in the screenshot is just an example.

![](images/bedrock-kbs/02b-create-kb-data-source.png "Screenshot of Knowledge Base vector index settings including Cohere Embed Multilingual embedding model, and quick-create vector store. 'Next' button is visible.")

▶️ In the **Next** screen, you'll configure the vector index:

- For **embeddings model**, select `Cohere Embed Multilingual`

> ⚠️ **Check** in the [Amazon Bedrock Model Access console](https://console.aws.amazon.com/bedrock/home?#/modelaccess) that you've enabled access to this model in the current region.
>
> If needed, you should be able to select an alternative embedding model instead... But we haven't tested all options for this walkthrough.

- For **Vector database**, select `Quick create a new vector store`

You can find more information from this screen or the [Amazon Bedrock Developer Guide](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-setup.html) about the different vector stores Bedrock Knowledge Bases support. This default option will create a new [Amazon OpenSearch Serverless](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-overview.html) cluster

Leave other settings at their defaults as shown below, and you should be ready to proceed:

![](images/bedrock-kbs/02c-create-kb-index.png "Screenshot of Knowledge Base vector index settings including Cohere Embed Multilingual embedding model, and quick-create vector store. 'Next' button is visible.")

▶️ Click **Next** to review your configuration, and then **Create knowledge base** to complete the process.

> ⏰ It might take **a few minutes** for the creation to complete. A progress indicator banner should be visible if you scroll up. Alternatively in a separate tab, you could check the [Amazon OpenSearch Serverless Collections console](https://console.aws.amazon.com/aos/home?#opensearch/collections) - where you should see the underlying vector collection being created.

Once your Knowledge Base is completed successfully, you'll be directed to the its detail screen as shown below:

![](images/bedrock-kbs/03-kb-detail-page.png "Detail screen for the created Amazon Bedrock Knowledge Base, showing creation success banner. Includes sections 'Knowledge base overview' (containing the KB ID, name, and other details); 'Tags' (empty); 'Data source' (one Amazon S3 data source listed); 'Embeddings model' (Cohere Embed); and an interactive 'Test knowledge base' chat sidebar on the right with a warning that some data sources have not been synced.")

As mentioned in the alert box shown ahead, your new knowledge base will not yet contain your documents until we **sync** the data source:

▶️ **Select** your S3 data source using the radio button to the left of it's name in the data sources list, and **click the Sync button** above to start the sync.

The sync should only take a few seconds, after which your data source's *Status* will return to `Available`

![](images/bedrock-kbs/04a-kb-data-source-after-sync.png "Screenshot of KB 'data source' section after running sync, with the data source selected and status showing as 'available'")

With the sync completed, your Knowledge Base should be ready to use.

Optionally, you can click through to your data source name to check the sync `Added` the 20 files as expected:

![](images/bedrock-kbs/04b-kb-data-sync-details.png "Data source details screen showing sync completed successfully with 20 files detected and added to the index, and 0 files failed").

#### Test out your Knowledge Base

Before we discuss evaluation at scale, let's run a couple of test queries to check the KB is working properly.Let's back to the main page of the knowledge base

![](images/bedrock-kbs/04c-kb-main-page.png "Screenshot of the main page of the knowledge base")

You can find the knowledge base id is `Z746ERZP5X` (please check your own knowledge base id) on the top of the page.

▶️ **Replace** the below placeholder with your knowledge base's unique ID, and run the cells below to continue:

In [10]:
knowledge_base_id = "TO FILL"  # Something like "55GUAMQYUT"


With the ID identified, you can use the Bedrock runtime RetrieveAndGenerate API (see corresponding boto3 doc page for Python) to query your knowledge base.

As in the manual example, you'll also need to select which text generation model to use - which we've pre-populated below for Amazon Nova Pro model. Please replace the account id with your own account id.

In [None]:
rag_resp = bedrock_agent_runtime.retrieve_and_generate(
    input={"text": "In what country is Normandy located?"},
    retrieveAndGenerateConfiguration={
        "knowledgeBaseConfiguration": {
            "knowledgeBaseId": knowledge_base_id,
            "modelArn": f"arn:aws:bedrock:us-west-2:{account_id}:inference-profile/us.amazon.nova-pro-v1:0",
        },
        "type": "KNOWLEDGE_BASE",
    },
    # Optional session ID can help improve results for follow-up questions:
    # sessionId='string'
)

print("Plain text response:")
print("--------------------")
print(rag_resp["output"]["text"], end="\n\n\n")

print("Full API output:")
print("----------------")
rag_resp

In [None]:
rag_resp = bedrock_agent_runtime.retrieve_and_generate(
    input={"text": "In what country is Normandy located?"},
    retrieveAndGenerateConfiguration={
        "knowledgeBaseConfiguration": {
            "knowledgeBaseId": knowledge_base_id,
            "modelArn": f"arn:aws:bedrock:us-west-2:891377141484:inference-profile/us.amazon.nova-pro-v1:0",
        },
        "type": "KNOWLEDGE_BASE",
    },
    # Optional session ID can help improve results for follow-up questions:
    # sessionId='string'
)

print("Plain text response:")
print("--------------------")
print(rag_resp["output"]["text"], end="\n\n\n")

print("Full API output:")
print("----------------")
rag_resp

As shown in the full API response from the above cell, the `RetrieveAndGenerate` action provides:

- The final text answer
- The `retrievedReferences` from the search engine
- Specific `citations` localizing which references should be cited by different parts of the text answer


It's also possible to run **only the retrieval** through the API, and skip the generative answer synthesis step - as shown below:

In [None]:
retrieve_resp = bedrock_agent_runtime.retrieve(
    knowledgeBaseId=knowledge_base_id,
    retrievalQuery={"text": "In what country is Normandy located?"},
)
print(json.dumps(retrieve_resp["retrievalResults"], indent=2))

## RAG evaluation example with dummy dataset

#### Load Dataset

For this example, we are going to use a dataset that has already been prepared by querying a RAG system and gathering its outputs. See below for instruction on how to fetch your production data from Langfuse.

The dataset contains the following columns:

- `question`: list[str] - These are the questions your RAG pipeline will be evaluated on.

- `contexts`: list[list[str]] - The contexts which were passed into the LLM to answer the question.

- `answer`: list[str] - The answer generated from the RAG pipeline and given to the user.

- `ground_truths`: list[list[str]] - The ground truth answer to the questions. However, this can be ignored for online evaluations since we will not have access to ground-truth data in our case.

For the details of this dataset, please refer to [Exploding Gradients Dataset](https://huggingface.co/datasets/explodinggradients/fiqa/viewer/ragas_eval)


In [None]:
from datasets import load_dataset
 
fiqa_eval = load_dataset("explodinggradients/fiqa", "ragas_eval")['baseline']
fiqa_eval


### The eval metrics
For going to measure the following aspects of a RAG system. These metric are from the Ragas library:

- [Faithfulness](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/faithfulness/): This measures the factual consistency of the generated answer against the given context.
- [Response relevancy](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/answer_relevance/): The ResponseRelevancy metric measures how relevant a response is to the user input.
- [Context precision](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/context_precision/): Context Precision is a metric that evaluates whether all of the ground-truth relevant items present in the contexts are ranked high. Ideally all the relevant chunks must appear at the top ranks.

Checkout the [RAGAS documentation](https://docs.ragas.io/en/stable/concepts/metrics/available_metrics/) to know more about these metrics and how they work.

In [12]:
# import metrics
from ragas.metrics import (
    Faithfulness,
    ResponseRelevancy,
    LLMContextPrecisionWithoutReference,
)
 
# metrics you chose
metrics = [
    Faithfulness(),
    ResponseRelevancy(),
    LLMContextPrecisionWithoutReference(),
]

Now you have to initialize the metrics with LLMs and Embeddings of your choice. In this example we are going to use the Bedrock Nova LLMs and Titan embedding models.

In [13]:
from ragas.run_config import RunConfig
from ragas.metrics.base import MetricWithLLM, MetricWithEmbeddings


# util function to init Ragas Metrics
def init_ragas_metrics(metrics, llm, embedding):
    for metric in metrics:
        if isinstance(metric, MetricWithLLM):
            print(metric.name + " llm")
            metric.llm = llm
        if isinstance(metric, MetricWithEmbeddings):
            print(metric.name + " embedding")
            metric.embeddings = embedding
        run_config = RunConfig()
        metric.init(run_config)

In [None]:
from langchain_aws import ChatBedrockConverse
from langchain_aws import BedrockEmbeddings
from ragas.llms import LangchainLLMWrapper
from ragas.embeddings import LangchainEmbeddingsWrapper

config = {
    "region_name": "us-west-2",  # E.g. "us-east-1"
    "llm": "us.amazon.nova-pro-v1:0",  # E.g you can also use the claude models "anthropic.claude-3-5-sonnet-20241022-v2:0"
    "embeddings": "cohere.embed-english-v3",  # E.g or "amazon.titan-embed-text-v2:0"
    "temperature": 0.4,
}

evaluator_llm = LangchainLLMWrapper(ChatBedrockConverse(
    region_name=config["region_name"],
    base_url=f"https://bedrock-runtime.{config['region_name']}.amazonaws.com",
    model=config["llm"],
    temperature=config["temperature"],
))

evaluator_embeddings = LangchainEmbeddingsWrapper(BedrockEmbeddings(
    region_name=config["region_name"],
    model_id=config["embeddings"],
))


init_ragas_metrics(
    metrics,
    llm=evaluator_llm,
    embedding=evaluator_embeddings,
)

## Trace RAGAS eval results on Langfuse

You can use model-based evaluation with Ragas in 2 ways:
1. Score each Trace: This means you will run the evaluations for each trace item. This gives you much better idea of how each call made to your RAG pipelines is performing, but please be mindful of the cost.

2. Score as Batch: In this method we will take a random sample of traces on a periodic basis and score them. This brings down the cost and gives you a rough estimate the performance of your app but may miss out on important samples.

In this example, we will demonstrate both the solutions using both prebuilt dataset and a live RAG pipeline with Bedrock Knowlegebase.

#### Score with Trace

Lets take a small example of a single trace and see how you can score that with Ragas. We first define a utility function to score your trace with the metrics you chose.

In [15]:
from ragas.dataset_schema import SingleTurnSample
 
async def score_with_ragas(query, chunks, answer):
    scores = {}
    for metric in metrics:
        sample = SingleTurnSample(
            user_input=query,
            retrieved_contexts=chunks,
            response=answer,
        )
        print(f"calculating {metric.name}")
        scores[metric.name] = await metric.single_turn_ascore(sample)
    return scores

You compute the score with each request. Below we will go through a dummy application that does the following steps:

- gets a question from the user

- fetch context from the database or vector store that can be used to answer the question from the user

- pass the question and the contexts to the LLM to generate the answer

All these step are logged as spans in a single trace in langfuse. You can read more about the traces and spans, which are the low-level SDK, from the [langfuse documentation](https://langfuse.com/docs/sdk/python/low-level-sdk).

In [None]:
# start a new trace when you get a question
row = fiqa_eval[0]
question = row['question']
trace = langfuse.trace(name = "rag")
 
# retrieve the relevant chunks
# chunks = get_similar_chunks(question)
contexts = row['contexts']
# pass it as span
trace.span(
    name = "retrieval", input={'question': question}, output={'contexts': contexts}
)
 
# use llm to generate a answer with the chunks
# answer = get_response_from_llm(question, chunks)
answer = row['answer']
trace.span(
    name = "generation", input={'question': question, 'contexts': contexts}, output={'answer': answer}
)

# compute scores for the question, context, answer tuple
ragas_scores = await score_with_ragas(question, contexts, answer)
ragas_scores

Now you can see this is traced in langfuse but with no score attached, check it in the Langfuse ui.
![](images/bedrock-kbs/04d-langfuse-single-eval-trace-no-score.png)
You can then attach the scores to the trace by running the following

In [17]:
# send the scores
for m in metrics:
    trace.score(name=m.name, value=ragas_scores[m.name])


Now the score is attached
![](images/bedrock-kbs/04e-langfuse-single-eval-trace-score.png)

### Evaluate Bedrock Knowledge Base RAG pipelines using RAGAS and Langfuse
We have already setup the Bedrock Knowledge Base in the first section, we will now **evaluate** the quality of its results against a test dataset - to help us **optimize** the configuration for high quality and low cost.

First, let's load the sample dataset of questions, reference answers, and their source documents (to find more of how to prepare this dataset, please see more details in [this github](https://github.com/aws-samples/llm-evaluation-methodology/blob/main/datasets/Prepare-SQuAD.ipynb)):


In [None]:
dataset_df = pd.read_json("datasets/qa.manifest.jsonl", lines=True)
dataset_df.head(10)

Records in this dataset include:

- (`doc`) The full text of the source document for this example
- (`doc_id`) A unique identifier for the source document
- (`question`) The user question to be asked
- (`question_id`) A unique identifier for the question
- (`answers`) A list of (possibly multiple) reference 'correct' answers, supported by the document

#### Run the knowledge base against our test set

As shown in [Ragas' API Reference](https://docs.ragas.io/en/latest/references/evaluation.html), records in Ragas evaluation datasets typically include:

- The `question` that was asked
- The `answer` the system generated
- The actual text `contexts` the answer was based on (i.e. snippets of document text retrieved by the search engine)
- The `ground_truth` answer(s)

Here we will integrate [Langfuse Tracking](https://langfuse.com/docs/tracing) into the RAG pipeline with the Langfuse Python SDK using the `@observe()` decorator.

We can run our example questions through the Bedrock KB RAG as shown below, to fetch the outputs ready to calculate metrics:
Please fill in the *account id* in the following code


In [19]:
# First import tqdm explicitly
from tqdm.notebook import tqdm
# Then import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from langfuse.decorators import observe, langfuse_context
from datasets import Dataset  # For use with Ragas

@observe(as_type="generation", name="retrieve_generate_kb")
def retrieve_and_generate(
    question: str,
    kb_id: str,
    generate_model_arn: str = f"arn:aws:bedrock:us-west-2:{account_id}:inference-profile/us.amazon.nova-pro-v1:0",
    **kwargs,
):
    rag_resp = bedrock_agent_runtime.retrieve_and_generate(
        input={"text": question},
        retrieveAndGenerateConfiguration={
            "knowledgeBaseConfiguration": {
                "knowledgeBaseId": kb_id,
                "modelArn": generate_model_arn,
            },
            "type": "KNOWLEDGE_BASE",
        },
    )
    answer = rag_resp["output"]["text"]
    # Fetch flat list of references from the nested citations->retrievedReferences:
    all_refs = [r for cite in rag_resp["citations"] for r in cite["retrievedReferences"]]
    contexts = [r["content"]["text"] for r in all_refs]
    ref_s3uris = [r["location"]["s3Location"]["uri"] for r in all_refs]
    # Map e.g. 's3://.../doc_id.txt' to 'doc_id':
    ref_ids = [uri.rpartition("/")[2].rpartition(".")[0] for uri in ref_s3uris]
    langfuse_context.update_current_observation(
        input={"question":question,
               "contexts": contexts},
        output=answer,
        model=generate_model_arn,
        tags=["dev"],
        metadata=kwargs
    )
    trace_id=langfuse_context.get_current_trace_id()
    return {
        "answer": answer,
        "retrieved_doc_ids": ref_ids,
        "retrieved_doc_texts": contexts,
        "trace_id": trace_id
    }



In [None]:
rag_generated_outputs = [
    retrieve_and_generate(question=rec.question, 
                          kb_id=knowledge_base_id, 
                          kwargs={"database":"Bedrock_kb",
                                    "kb_id": knowledge_base_id}
                    )
    for _, rec in dataset_df.iterrows()
]
rag_generated_outputs[0]

In [None]:
outputs_df = pd.DataFrame(rag_generated_outputs, columns=["answer", "retrieved_doc_ids", "retrieved_doc_texts", "trace_id"])
# Combine & clarify the column names for a nice tabular representation:
results_df = pd.concat((dataset_df, outputs_df), axis=1).rename(
    columns={
        "answer": "model_answer",
        "answers": "gt_answers",
        "doc_id": "gt_doc_id",
        "doc": "gt_doc_text",
    }
)
results_df.head()

In [None]:
# to re-initiate the ragas metrics
init_ragas_metrics(
    metrics,
    llm=evaluator_llm,
    embedding=evaluator_embeddings,
)

In [24]:
from langfuse.decorators import observe, langfuse_context
from typing import Optional
from asyncio import run
@observe(as_type="generation", name="br_kb_rag")
def rag_pipeline(question,
                 user_id: Optional[str] = None,
                 session_id: Optional[str] = None,
                 kb_id: Optional[str] = None,
                 metrics: Optional[Any] = None
    ):

    generated_answer = retrieve_and_generate(question=question, 
                                             kb_id=kb_id, 
                                             kwargs={"database":"Bedrock_kb", 
                                                     "kb_id": knowledge_base_id}
                                            )
    contexts= generated_answer["retrieved_doc_texts"]
    answer= generated_answer["answer"]
    trace_id= generated_answer["trace_id"]

    score = run(score_with_ragas(question, contexts, answer=answer))
    langfuse_context.update_current_trace(
        user_id=user_id,
        session_id=session_id,
        tags=[kb_id],
    )
    for s in score:
        langfuse.score(name=s, value=score[s], trace_id=trace_id)
    return generated_answer

In [None]:
response = rag_pipeline(dataset_df.iloc[0]["question"], kb_id=knowledge_base_id)
response

### Scoring as batch

Scoring each production trace can be time-consuming and costly depending on your application architecture and traffic. In that case, it's better to start off with a batch scoring method. Decide a timespan you want to run the batch process and the number of traces you want to sample from that time slice. Create a dataset and call ragas.evaluate to analyze the result.

You can run this periodically to keep track of how the scores are changing across timeslices and figure out if there are any discrepancies.

We will evaluate the existing results generated previously by the `retrieve_and_generate()` function.

In [32]:
for index, interaction in results_df.head(10).iterrows():
    trace = langfuse.trace(name = "br_kb_rag_span")
    trace.span(
        name = "retrieval",
        input={"question": interaction["question"]},
        output={"contexts": interaction["retrieved_doc_texts"]},
        metadata={"comments":"offline batch update"}
    )
    trace.span(
        name = "generation",
        input={"question": interaction["question"], 'contexts': interaction["retrieved_doc_texts"]},
        output={"answer": interaction["model_answer"]},
        metadata={"comments":"offline batch update"}
    )

# await that Langfuse SDK has processed all events before trying to retrieve it in the next step
langfuse.flush()

Now that the results are uploaded to langfuse you can retrieve it as needed with this handy function.

In [33]:
def get_traces(name=None, limit=None, user_id=None):
    all_data = []
    page = 1

    while True:
        response = langfuse.client.trace.list(
            name=name, page=page, user_id=user_id
        )
        if not response.data:
            break
        page += 1
        all_data.extend(response.data)
        if len(all_data) > limit:
            break

    return all_data[:limit]

In [None]:
from random import sample

NUM_TRACES_TO_SAMPLE = 3
traces = get_traces(name='br_kb_rag_span', limit=5)
traces_sample = sample(traces, NUM_TRACES_TO_SAMPLE)

len(traces_sample)

Now lets make a batch and score it. Ragas uses huggingface dataset object to build the dataset and run the evaluation. If you run this on your own production data, use the right keys to extract the question, contexts and answer from the trace

In [37]:
# score on a sample
from random import sample

evaluation_batch = {
    "question": [],
    "contexts": [],
    "answer": [],
    "trace_id": [],
}

for t in traces_sample:
    observations = [langfuse.client.observations.get(o) for o in t.observations]
    for o in observations:
        if o.name == 'retrieval':
            question = o.input['question']
            contexts = o.output['contexts']
        if o.name=='generation':
            answer = o.output['answer']
    evaluation_batch['question'].append(question)
    evaluation_batch['contexts'].append(contexts)
    evaluation_batch['answer'].append(answer)
    evaluation_batch['trace_id'].append(t.id)

In [None]:
# run ragas evaluate
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import Faithfulness, ResponseRelevancy

ds = Dataset.from_dict(evaluation_batch)
r = evaluate(ds, 
             llm=evaluator_llm, 
             embeddings=evaluator_embeddings, 
             metrics=[Faithfulness(), ResponseRelevancy()]
             )

And that is it! You can see the scores over a time period.

In [None]:
r

You can also push the scores back into Langfuse or use the exported pandas dataframe to run further analysis.

In [None]:
df = r.to_pandas()

# add the langfuse trace_id to the result dataframe
df["trace_id"] = ds["trace_id"]

df.head()

In [41]:
for _, row in df.iterrows():
    for metric_name in ["faithfulness", "answer_relevancy"]:
        langfuse.score(
            name=metric_name,
            value=row[metric_name],
            trace_id=row["trace_id"]
        )

You can now go back to the Langfuse console and check the updated scores in the traces.