In [1]:
# Run this cell first if you are running this in a Jupyter notebook
import nest_asyncio

nest_asyncio.apply()

In [2]:
import os
from dotenv import load_dotenv

load_dotenv()

True

In [5]:
import opik

opik.configure(use_local=True, url="http://trainingvm:5173")

OPIK: Existing Opik clients will not use updated values for "url", "api_key", "workspace".


In [6]:
# Import the metric
from ragas.metrics import AnswerRelevancy

# Import some additional dependencies
from langchain_openai.chat_models import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from ragas.llms import LangchainLLMWrapper
from ragas.embeddings import LangchainEmbeddingsWrapper

# Initialize the Ragas metric
llm = LangchainLLMWrapper(ChatOpenAI())
emb = LangchainEmbeddingsWrapper(OpenAIEmbeddings())

answer_relevancy_metric = AnswerRelevancy(llm=llm, embeddings=emb)

In [7]:
import asyncio
from ragas.integrations.opik import OpikTracer
from ragas.dataset_schema import SingleTurnSample
import os

os.environ["OPIK_PROJECT_NAME"] = "ragas-integration"


# Define the scoring function
def compute_metric(metric, row):
    row = SingleTurnSample(**row)

    opik_tracer = OpikTracer(tags=["ragas"])

    async def get_score(opik_tracer, metric, row):
        score = await metric.single_turn_ascore(row, callbacks=[opik_tracer])
        return score

    # Run the async function using the current event loop
    loop = asyncio.get_event_loop()

    result = loop.run_until_complete(get_score(opik_tracer, metric, row))
    return result


# Score a simple example
row = {
    "user_input": "What is the capital of France?",
    "response": "Paris",
    "retrieved_contexts": ["Paris is the capital of France.", "Paris is in France."],
}

score = compute_metric(answer_relevancy_metric, row)
print("Answer Relevancy score:", score)

Answer Relevancy score: 1.0000000000000002


In [8]:
from opik import track, opik_context


@track
def retrieve_contexts(question):
    # Define the retrieval function, in this case we will hard code the contexts
    return ["Paris is the capital of France.", "Paris is in France."]


@track
def answer_question(question, contexts):
    # Define the answer function, in this case we will hard code the answer
    return "Paris"


@track(name="Compute Ragas metric score", capture_input=False)
def compute_rag_score(answer_relevancy_metric, question, answer, contexts):
    # Define the score function
    row = {"user_input": question, "response": answer, "retrieved_contexts": contexts}
    score = compute_metric(answer_relevancy_metric, row)
    return score


@track
def rag_pipeline(question):
    # Define the pipeline
    contexts = retrieve_contexts(question)
    answer = answer_question(question, contexts)

    score = compute_rag_score(answer_relevancy_metric, question, answer, contexts)
    opik_context.update_current_trace(
        feedback_scores=[{"name": "answer_relevancy", "value": round(score, 4)}]
    )

    return answer


rag_pipeline("What is the capital of France?")

'Paris'

In [9]:
from datasets import load_dataset
from opik.evaluation.metrics import base_metric, score_result
import opik


opik_client = opik.Opik()

# Create a small dataset
fiqa_eval = load_dataset("explodinggradients/fiqa", "ragas_eval")

# Reformat the dataset to match the schema expected by the Ragas evaluate function
hf_dataset = fiqa_eval["baseline"].select(range(3))
dataset_items = hf_dataset.map(
    lambda x: {
        "user_input": x["question"],
        "reference": x["ground_truths"][0],
        "retrieved_contexts": x["contexts"],
    }
)
dataset = opik_client.get_or_create_dataset("ragas-demo-dataset")
dataset.insert(dataset_items)


# Create an evaluation task
def evaluation_task(x):
    return {
        "user_input": x["question"],
        "response": x["answer"],
        "retrieved_contexts": x["contexts"],
    }


# Create scoring metric wrapper
class AnswerRelevancyWrapper(base_metric.BaseMetric):
    def __init__(self, metric):
        self.name = "answer_relevancy_metric"
        self.metric = metric

    async def get_score(self, row):
        row = SingleTurnSample(**row)
        score = await self.metric.single_turn_ascore(row)
        return score

    def score(self, user_input, response, **ignored_kwargs):
        # Run the async function using the current event loop
        loop = asyncio.get_event_loop()

        result = loop.run_until_complete(self.get_score(row))

        return score_result.ScoreResult(value=result, name=self.name)


scoring_metric = AnswerRelevancyWrapper(answer_relevancy_metric)
opik.evaluation.evaluate(
    dataset,
    evaluation_task,
    scoring_metrics=[scoring_metric],
    task_threads=1,
)

Generating baseline split: 100%|██████████| 30/30 [00:00<00:00, 2404.35 examples/s]
Map: 100%|██████████| 3/3 [00:00<00:00, 102.97 examples/s]
OPIK: Created a "ragas-demo-dataset" dataset at http://trainingvm:5173/api/v1/session/redirect/datasets/?dataset_id=0195c56f-80a2-7680-9d9a-72b70cf3a842&path=aHR0cDovL3RyYWluaW5ndm06NTE3My9hcGkv.
Evaluation: 100%|██████████| 3/3 [00:10<00:00,  3.39s/it]


EvaluationResult(experiment_id='0195c56f-810b-7cf0-935b-21b12b95ef99', experiment_name='silky_kingfisher_2564', test_results=[TestResult(test_case=TestCase(trace_id='0195c56f-81a3-7adb-8353-0ee44ee54c51', dataset_item_id='0195c56f-80f5-7d53-82c3-8a326a1ec43c', scoring_inputs={'reference': "You're confusing a lot of things here. Company B LLC will have it's sales run under Company A LLC, and cease operating as a separate entity These two are contradicting each other. If B LLC ceases to exist - it is not going to have it's sales run under A LLC, since there will be no sales to run for a non-existent company. What happens is that you merge B LLC into A LLC, and then convert A LLC into S Corp. So you're cancelling the EIN for B LLC, you're cancelling the EIN for A LLC - because both entities cease to exist. You then create a EIN for A Corp, which is the converted A LLC, and you create a DBA where A Corp DBA B Shop. You then go to the bank and open the account for A Corp DBA B Shop with the

In [10]:
from datasets import load_dataset
from ragas.metrics import context_precision, answer_relevancy, faithfulness
from ragas import evaluate

fiqa_eval = load_dataset("explodinggradients/fiqa", "ragas_eval")

# Reformat the dataset to match the schema expected by the Ragas evaluate function
dataset = fiqa_eval["baseline"].select(range(3))

dataset = dataset.map(
    lambda x: {
        "user_input": x["question"],
        "reference": x["ground_truths"][0],
        "retrieved_contexts": x["contexts"],
    }
)

opik_tracer_eval = OpikTracer(tags=["ragas_eval"], metadata={"evaluation_run": True})

result = evaluate(
    dataset,
    metrics=[context_precision, faithfulness, answer_relevancy],
    callbacks=[opik_tracer_eval],
)

print(result)

Evaluating: 100%|██████████| 9/9 [00:18<00:00,  2.07s/it]


{'context_precision': 1.0000, 'faithfulness': 0.6857, 'answer_relevancy': 0.9608}
