# RAG Evaluation Experiment Loop

This notebook evaluates Redbox's knowledge retention.

__This allows us to loop through a CSV of experiments in order to systematically change prompts and measure the results.__


### 1. Import required packages

In [None]:
from uuid import UUID
import json
import pandas as pd
import pickle
from dataclasses import asdict
from pathlib import Path
import jsonlines
from elasticsearch import Elasticsearch

from redbox.models import Settings
from redbox.models.settings import ElasticLocalSettings
from redbox.models import Settings

from langchain.globals import set_verbose

from elasticsearch import Elasticsearch
from langchain_community.chat_models import ChatLiteLLM
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import ConfigurableField

from redbox.models import Settings
from redbox.models.file import UUID

set_verbose(False)

from dotenv import find_dotenv, load_dotenv

_ = load_dotenv(find_dotenv())

pd.set_option("display.max_colwidth", None)

ENV = Settings(minio_host="localhost", elastic=ElasticLocalSettings(host="localhost"))

### 2. Set evaluation data version

In [None]:
DATA_VERSION = "0.2.3"


### 3. Set paths and global variables

In [None]:
ROOT = Path.cwd().parents[1]
EVALUATION_DIR = ROOT / "notebooks/evaluation"

V_ROOT = EVALUATION_DIR / f"data/{DATA_VERSION}"
V_RAW = V_ROOT / "raw"
V_SYNTHETIC = V_ROOT / "synthetic"
V_CHUNKS = V_ROOT / "chunks"
V_RESULTS = V_ROOT / "results"
V_EMBEDDINGS = V_ROOT / "embeddings"

V_ROOT.mkdir(parents=True, exist_ok=True)
V_RAW.mkdir(parents=True, exist_ok=True)
V_SYNTHETIC.mkdir(parents=True, exist_ok=True)
V_CHUNKS.mkdir(parents=True, exist_ok=True)
V_RESULTS.mkdir(parents=True, exist_ok=True)
V_EMBEDDINGS.mkdir(parents=True, exist_ok=True)

In [None]:
MODEL = ENV.embedding_model
INDEX = f"{DATA_VERSION}-{MODEL}".lower()

In [None]:
USER_UUID = UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")
S3_CLIENT = ENV.s3_client()
ES_CLIENT = ENV.elasticsearch_client()

### 4. Load embeddings into the index and get file UUIDs <a id="load-embeddings"></a>

In [None]:
ES_CLIENT.indices.delete(index=INDEX, ignore=[400, 404])

In [None]:
def load_chunks_from_jsonl_to_index(file_path: Path, es_client: Elasticsearch, index: str) -> set:

    file_uuids = set()

    with jsonlines.open(file_path, mode="r") as reader:
        for chunk_raw in reader:
            chunk = json.loads(chunk_raw)
            es_client.index(
                index=index,
                id=chunk["uuid"],
                body=chunk,
            )

            file_uuids.add(chunk["parent_file_uuid"])

    return file_uuids

In [None]:
FILE_UUIDS = load_chunks_from_jsonl_to_index(file_path=V_EMBEDDINGS / f"{MODEL}.jsonl", es_client=ES_CLIENT, index=INDEX)

### 5. Define class for getting RAG outputs and evaluating based on prompts in experiments 
Note: these can be made more efficient

In [None]:
from redbox.models import ChatRoute, Settings
from redbox.models.chain import ChainInput

from typing import Annotated
from fastapi import Depends
from tiktoken import Encoding
from langchain_core.runnables import Runnable, RunnableLambda, RunnablePassthrough
from langchain_core.vectorstores import VectorStoreRetriever
from operator import itemgetter
from langchain.schema import StrOutputParser

import os
cwd = os.getcwd()
os.chdir('../../')
from core_api.src import dependencies
from core_api.src.format import format_documents
from core_api.src.runnables import make_chat_prompt_from_messages_runnable
from langchain_community.chat_models import ChatLiteLLM
from core_api.src.dependencies import get_tokeniser
from core_api.src.retriever import ParameterisedElasticsearchRetriever
os.chdir(cwd)

from deepeval.test_case import LLMTestCase
from deepeval.metrics import KnowledgeRetentionMetric
from deepeval.test_case import ConversationalTestCase

LLM = ChatLiteLLM(
    model="gpt-4o",
    streaming=True,
)

class Experiment:

    '''Class for a evaluation experiment'''

    def __init__(self,
                 experiment_name: str, 
                 retrieval_system_prompt: str, 
                 retrieval_question_prompt: str
                 ):
        
        self.experiment_name = experiment_name
        self.retrieval_system_prompt = retrieval_system_prompt
        self.retrieval_question_prompt = retrieval_question_prompt
        self.eval_results = None

    def get_parameterised_retriever(self,
                                    env: Annotated[Settings, Depends(ENV)],
                                    es: Annotated[Elasticsearch, Depends(dependencies.get_elasticsearch_client)]
        ) -> BaseRetriever:
        """Creates an Elasticsearch retriever runnable.

        Runnable takes input of a dict keyed to question, file_uuids and user_uuid.

        Runnable returns a list of Chunks.
        """
        default_params = {
            "size": env.ai.rag_k,
            "num_candidates": env.ai.rag_num_candidates,
            "match_boost": 1,
            "knn_boost": 1,
            "similarity_threshold": 0,
        }

        return ParameterisedElasticsearchRetriever(
            es_client=es,
            index_name=INDEX,
            params=default_params,
            embedding_model=dependencies.get_embedding_model(env),
        ).configurable_fields(
            params=ConfigurableField(
                id="params", name="Retriever parameters", description="A dictionary of parameters to use for the retriever."
            )
        )

    def build_retrieval_chain(
        self,
        llm: Annotated[ChatLiteLLM, Depends(dependencies.get_llm)],
        retriever: Annotated[VectorStoreRetriever, Depends(dependencies.get_parameterised_retriever)],
        tokeniser: Annotated[Encoding, Depends(dependencies.get_tokeniser)],
        env: Annotated[Settings, Depends(dependencies.get_env)]
    ) -> Runnable:
        return (
            RunnablePassthrough.assign(documents=retriever)
            | RunnablePassthrough.assign(
                formatted_documents=(RunnablePassthrough() | itemgetter("documents") | format_documents)
            )
            | {
                "response": make_chat_prompt_from_messages_runnable(
                    system_prompt=self.retrieval_system_prompt,
                    question_prompt=self.retrieval_question_prompt,
                    input_token_budget=env.ai.context_window_size - env.llm_max_tokens,
                    tokeniser=tokeniser,
                )
                | llm
                | StrOutputParser(),
                "source_documents": itemgetter("documents"),
                "route_name": RunnableLambda(lambda _: ChatRoute.search.value),
            }
        )

    def get_rag_results(self,
                        question,
                        history
                        ) -> dict:
        
        '''Get Redbox response for a given question.'''

        retriever = self.get_parameterised_retriever(es=ES_CLIENT, env=ENV)

        chain = self.build_retrieval_chain(llm=LLM,
                                            retriever=retriever, 
                                            tokeniser=get_tokeniser(),
                                            env=ENV)
        
        response = chain.invoke(
            input=ChainInput(
                question=question,
                chat_history=history,
                file_uuids=FILE_UUIDS,
                user_uuid=USER_UUID,
            ).model_dump()
        )

        filtered_chunks = []

        for chunk in response['source_documents']:

            chunk = dict(chunk)
            filtered_chunk = {'page_content': chunk['page_content'], 'page_number': 
                                chunk['metadata']['page_number'], 
                                'parent_file_uuid': chunk['metadata']['parent_file_uuid']}
            filtered_chunks.append(filtered_chunk)

        return {"output_text": response["response"], 
                "source_documents": filtered_chunks,
                "text": response["response"]}
    
    def write_rag_results(self) -> None:
        
        '''Format and write Redbox responses to evaluation dataset.'''
     
        df = pd.read_csv(f"{V_SYNTHETIC}/retention_synthetic_data.csv")
        inputs = df["input"].tolist()

        df_function = df.copy()

        actual_output = []
        retrieval_context = []
        history = [{"text":'', "role": "user"}]

        for question in inputs:
            
            data = self.get_rag_results(question=question, 
                                        history=history)
            actual_output.append(data["output_text"])
            retrieval_context.append(data['source_documents'])
            
            history.append({'text': question, "role": "user"})
            history.append({'text': data['text'], "role": "ai"})

        df_function["actual_output"] = actual_output
        df_function["retrieval_context"] = retrieval_context

        # df_function_clean = df_function.dropna()
        df_function.to_csv(f"{V_SYNTHETIC}/{self.experiment_name}_retention_synthetic_data.csv", index=False)

    def do_evaluation(self) -> None:
        
        '''
        Calculate evaluation metrics for a synthetic RAGAS dataset, aggregate results
        and write as CSV.
        '''

        test_cases = pd.read_csv(f'{V_SYNTHETIC}/original_prompt_retention_synthetic_data.csv')
        
        messages = []

        for index, row in test_cases.iterrows():
            
            messages.append(LLMTestCase(input=row.input, actual_output=row.actual_output))

        dataset = ConversationalTestCase(messages=messages)

        metric = KnowledgeRetentionMetric(threshold=0.5,
                                        model="gpt-4o",
                                        include_reason=True)
        
        metric.measure(dataset)
        
        self.eval_results = pd.DataFrame.from_dict([{'score': metric.score, 'reason': metric.reason}])
        self.eval_results.to_csv(f"{V_RESULTS}/{self.experiment_name}_retention_results.csv", index=False)

    def write_evaluation_results(self) -> None:

        metric_type = {
        "metric_name": ["Knowledge Retention"],
        "metric_type": ["retention"]}

        evaluation = (
            pd.DataFrame.from_records(asdict(result) for result in self.eval_results)
            .explode("metrics_metadata")
            .reset_index(drop=True)
            .assign(
                metric_name=lambda df: df.metrics_metadata.apply(getattr, args=["metric"]),
                score=lambda df: df.metrics_metadata.apply(getattr, args=["score"]),
                reason=lambda df: df.metrics_metadata.apply(getattr, args=["reason"]),
            )
            .merge(pd.DataFrame(metric_type), on="metric_name")
            .drop(columns=["success", "metrics_metadata"])
        )

        evaluation.to_csv(f"{V_RESULTS}/{self.experiment_name}_retention_results.csv", index=False)
        evaluation.head()

### 6. Load CSV of experiments. See Google Drive folder 'experiment_parameters' for example.

In [None]:
experiment_parameters = pd.read_csv('data/experiment_parameters/prompt_engineering_experiment_data_v3.csv')

# # Filter by experiment name if you wish to only run certain experimental parameters
experiment_parameters = experiment_parameters[(experiment_parameters.experiment_name == 'original_prompt')]

experiment_parameters.head()

### 7. Loop through experiments and pass parameters to Experiment class, returning the concantenated evaluation results for each experiment.

In [None]:
for index, row in experiment_parameters.iterrows():

    expt = Experiment(experiment_name = row["experiment_name"],
                      retrieval_system_prompt = row["retrieval_system_prompt"],
                      retrieval_question_prompt = row["retrieval_question_prompt"])
    
    expt.write_rag_results()
    expt.do_evaluation()
    # expt.write_evaluation_results()

### 8. Load and visualise results
Note: there are some complexities that could require additional analysis, such as the uncertainty associated with each individual LLM judge score and the wide range of scores (0 to 1 for some metrics)

In [None]:
%config InlineBackend.figure_format = 'retina'
import scipy.stats as stats
import seaborn as sns
import numpy as np

experiments = []

# baseline = pd.read_csv(f"{V_RESULTS}/baseline.csv")
# baseline['experiment_name'] = 'baseline'
# experiments.append(baseline)

# Comment out if you only want to view baseline statistics
# Populate with experiment names
experiment_names = ['original_prompt', 'unhelpful_prompt']
for experiment_name in experiment_names:
    experiment = pd.read_csv(f"{V_RESULTS}/{experiment_name}_generation_eval_results.csv")
    experiment['experiment_name'] = experiment_name
    experiments.append(experiment)

experiments_df = pd.concat(experiments)

def empirical_ci(df: pd.DataFrame
                 ) -> pd.DataFrame:

    '''Calculate confidence intervals for aggregated metrics.'''

    def pct_above_threshold(x):
            return pd.Series({'pct_above_threshold' : round(((x > 0.5).mean()), 2) * 100})

    df_grouped = (df
                  .groupby(["experiment_name", "metric_name"])['score']
                  .agg(["mean", "std", 'sem', 
                        pct_above_threshold, 
                        'min', 'max', 'count'])
                  .reset_index()
                  )
        
    ci = stats.t.interval(confidence=0.95, 
                          df=df_grouped['count']-1,
                          loc=df_grouped['mean'],
                          scale=df_grouped['sem'])

    df_grouped['ci_low'] = ci[0]
    df_grouped['ci_high'] = ci[1]

    # Sort metric names by input dataframe order
    df_grouped['metric_name'] = pd.Categorical(df_grouped['metric_name'], 
                                               df['metric_name'].unique())
    
    df_grouped.sort_values(['experiment_name', 'metric_name'], inplace = True)

    return df_grouped

# Note that the confidence intervals in sns.barplot is calculated by bootstrapping. 
# See empirical_ci() above for empirical confidence interval calculation. 
sns.barplot(experiments_df, x="score", y="metric_name", hue='experiment_name', errorbar=('ci', 95))

experiment_metrics = empirical_ci(experiments_df)
experiment_metrics.to_csv(f"{V_RESULTS}/eval_results_full.csv")
experiment_metrics

In [None]:
sns.barplot(experiment_metrics, x="pct_above_threshold", y="metric_name", hue="experiment_name", palette = ['tab:blue', 'gray'])