In [1]:
%load_ext autoreload
%autoreload 2

# Langfuse + Ragas

## Prepare the Data

In [2]:
from datasets import load_dataset

fiqa_eval = load_dataset("explodinggradients/fiqa", "ragas_eval")['baseline']
fiqa_eval

Dataset({
    features: ['question', 'ground_truths', 'answer', 'contexts'],
    num_rows: 30
})

## Score with Trace

In [6]:
import json

with open("../.envvals") as f:
    env_vals = json.load(f)

In [3]:
ENV_HOST = "https://cloud.langfuse.com"
ENV_SECRET_KEY = ""
ENV_PUBLIC_KEY = ""

In [4]:
from langfuse import Langfuse
 
langfuse = Langfuse(ENV_PUBLIC_KEY, ENV_SECRET_KEY, ENV_HOST)

In [25]:
row = fiqa_eval[0]

In [28]:
from ragas.metrics import faithfulness, answer_relevancy

Score each time you make a trace with langfuse. We can also put this this the `TaskManager` and run scoring as in the other thread so that its not a blocking call for the user.

**TODO: add scoreing to `TaskManager`**

In [34]:
from langfuse.model import CreateTrace, CreateSpan, CreateGeneration, CreateEvent, CreateScore
 
trace = langfuse.trace(CreateTrace(name = "rag"))
trace.span(CreateSpan(
    name = "retrieval", input={'query': row['question']}, output={'chunks': row['contexts']}
))
trace.span(CreateSpan(
    name = "generation", input={'contexts': row['contexts']}, output={'answer': row['answer']}
))

faithfulness_score = faithfulness.score_single(
    {'question': row['question'], 'contexts': row['contexts'], 'answer': row['answer']}
)
answer_relevancy.init_model()
answer_relevancy_score = answer_relevancy.score_single(
    {'question': row['question'], 'answer': row['answer']}
)

In [38]:
trace.score(CreateScore(name='faithfulness', value=faithfulness_score))
trace.score(CreateScore(name='answer_relevancy', value=answer_relevancy_score))

<langfuse.client.StatefulClient at 0x7efd5192bf40>

# Scoring as batch

for `get_traces` I think it would be cool to add something like this to the langfuse SDK, similar to `get_generations`. Optionally there should be something to give a date window too.

In [38]:
def get_traces(page=None, limit=None, name=None, user_id=None):
    all_data = []
    page = 1

    while True:
        response = langfuse.client.trace.list(
            name=name, page=page, limit=limit, user_id=user_id
        )
        if not response.data:
            break
        page += 1
        all_data.extend(response.data)

    return all_data

In [39]:
traces = get_traces(name='rag')

len(traces)

6

In [30]:
# score on a sample
from random import sample

evaluation_batch = {
    "question": [],
    "contexts": [],
    "answer": [],
}
traces_sample = sample(traces, 3)
for t in traces_sample:
    observations = [langfuse.client.observations.get(o) for o in t.observations]
    for o in observations:
        if o.name == 'retrieval':
            question = o.input['query']
            contexts = o.output['chunks']
        if o.name=='generation':
            answer = o.output['answer']
    evaluation_batch['question'].append(question)
    evaluation_batch['contexts'].append(contexts)
    evaluation_batch['answer'].append(answer)

In [31]:
# run ragas evaluate
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy

ds = Dataset.from_dict(evaluation_batch)
r = evaluate(ds, metrics=[faithfulness, answer_relevancy])

evaluating with [faithfulness]


100%|█████████████████████████████████████████████████████████████| 1/1 [00:37<00:00, 37.81s/it]


evaluating with [answer_relevancy]


100%|█████████████████████████████████████████████████████████████| 1/1 [00:08<00:00,  8.44s/it]


In [35]:
r

{'ragas_score': 0.9880, 'faithfulness': 1.0000, 'answer_relevancy': 0.9763}

In [37]:
# return computed scores
from langfuse.model import InitialScore

for i, t in enumerate(traces_sample):
    s = r.scores[i]
    for k in s:
        langfuse.score(InitialScore(name=k, value=s[k], traceId=t.id))