# Initial Framework RAG Model Support

This notebook shows how the developer framework supports RAG Models. It introduces a new `RAGModel` class which is a pipeline model made up of 3 component models: `EmbeddingModel`, `RetrievalModel` and `GenerationModel`. This allows developers to test the individual component models as well as the entire e2e pipeline.

## Pre-requisites

In [None]:
%pip install -q qdrant-client

In [None]:
# load openai api key
import os

from dotenv import load_dotenv
load_dotenv()

if not 'OPENAI_API_KEY' in os.environ:
    raise ValueError('OPENAI_API_KEY is not set')

## Dataset Loader

In [None]:
# load documents
import os
from csv import DictReader
from uuid import uuid4

import pandas as pd


column_map = {"RFP_Question": "question", "RFP_Answer": "ground_truth"}


def load_documents(prefix):
    documents = []
    root_dir = "datasets/rag/"
    for file in os.listdir(root_dir):
        if file.startswith(prefix) and file.endswith(".csv"):
            # use csv dict reader to load the csv file
            with open(os.path.join(root_dir, file)) as f:
                reader = DictReader(f)
                for row in reader:
                    # add a unique id to the row
                    row["id"] = str(uuid4())
                    documents.append(row)

    df = pd.DataFrame(documents)
    df = df[["id", "RFP_Question", "RFP_Answer"]]
    df.rename(columns=column_map, inplace=True)

    return df

def load_dataset_split(limit=None):
    df = load_documents("rfp_existing_questions")

    if limit:
        df = df.head(limit)

    # split the dataset into a "train" - which gets inserted into the vector store
    # and a "test" - which is used to evaluate the search results
    train_df = df.sample(frac=0.8)
    test_df = df.drop(train_df.index)

    return train_df, test_df

## Embedding Model Selection

First let's setup our embedding model and run some tests to make sure its working well.

In [None]:
from openai import OpenAI

from validmind.models import EmbeddingModel

client = OpenAI()


def embed(question):
    """Returns a text embedding for the given text"""
    return (
        client.embeddings.create(
            input=question,
            model="text-embedding-3-small",
        )
        .data[0]
        .embedding
    )


vm_embedder = EmbeddingModel(input_id="embedding_model", predict_fn=embed)

Let's take a look at whats going on above... The `EmbeddingModel` we just instatiated is a subclass of `FunctionModel` type. `FunctionModel`s allow you to pass a `predict_fn` that will be called to compute 'predictions' for an input. By default, the `FunctionModel` will look at the `predict_fn`'s signature and pass in columns that match the argument names. So in the above model, it will look in the input dataframe for a column named `question` and pass that into the function. Once the `predict_fn` returns the prediction, it will be stored in the column thats specified by the `predict_col` property. This is set by default to `'embedding'` for `EmbeddingModel` instances. Other model types may set a default as well but this can always be customized by passing `predict_col` as an argument to any `VMModel` class or to `vm.init_model()` function. This column name is important since predictions are cached/stored in the validmind dataset that gets passed into tests and its also used in the `RAGModel` to store intermediate predictions before they are passed to the next model in the pipeline.

RAG Pipeline:
`input` ?-> `EmbeddingModel()` -> `RetrievalModel()` -> `GenerationModel()` -> `output`

Below we'll see more about how to customize the `predict_col` and more advanced ways to use `predict_args` to pass parameters to the `predict_fn`.

In [None]:
vm_embedder.predict_col

Let's create our test dataset so we can run it through our different models.

In [None]:
import validmind as vm

train_df, test_df = load_dataset_split()

vm_test_ds = vm.init_dataset(
    test_df,
    text_column="question", # some NLP and Embedding tests which work with text data require a `text_column` to be specified
    target_column="ground_truth",
    __log=False,
)

vm_test_ds.df.head()

Great, now let's run our embeddings model on the test dataframe and save the results back into the dataframe.

> Normally, you would not do this, but for sake of demonstration we are going to call the `predict` method and manually pass in the dataframe. This would normally be done using `vm_test_ds.assign_predictions(embedding_model)` or, if calling the `RAGModel`, it would be done as part of the pipeline.

In [None]:
vm_test_ds.df[vm_embedder.predict_col] = vm_embedder.predict(vm_test_ds.df)
vm_test_ds.df.head()

Let's go ahead and run one of the ValidMind embeddings stability analysis tests to make sure our embeddings model is working properly.

In [None]:
from validmind.tests import run_test

result = run_test(
    "validmind.model_validation.embeddings.StabilityAnalysisRandomNoise",
    inputs={"model": vm_embedder, "dataset": vm_test_ds},
    params={"probability": 0.3},
)

## Setup Vector Store

#### Generate embeddings for the questions

In [None]:
train_df[vm_embedder.predict_col] = vm_embedder.predict(train_df)
train_df.head()

#### Insert embeddings and questions into Vector DB

In [None]:
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, PointStruct, VectorParams

qdrant = QdrantClient(":memory:")
qdrant.recreate_collection(
    "rfp_rag_collection",
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
qdrant.upsert(
    "rfp_rag_collection",
    points=[
        PointStruct(
            id=row["id"],
            vector=row[vm_embedder.predict_col],
            payload={"question": row["question"], "ground_truth": row["ground_truth"]},
        )
        for _, row in train_df.iterrows()
    ],
)

## Setup Retrieval Model

In [None]:
from validmind.models import RetrievalModel


def retrieve(query_embedding, limit=10):
    contexts = []

    for result in qdrant.search(
        "rfp_rag_collection",
        query_vector=query_embedding,
        limit=limit,
    ):
        context = f"Q: {result.payload['question']}\n"
        context += f"A: {result.payload['ground_truth']}\n"

        contexts.append(context)

    return contexts


vm_retriever = RetrievalModel(
    input_id="retrieval_model",
    predict_fn=retrieve,  # function to call to retrieve the contexts
    predict_args={
        "query_embedding": vm_embedder.predict_col,
        "limit": lambda row: row["limit"] if "limit" in row else 10,
    },  # argument mapping for the predict_fn
    predict_col="retrieved_contexts",  # column name where the retrieved contexts will be stored
)

Notice how we are passing the `predict_args` dictionary which contains keys matching the argument names to the `retrieve()` function. This is how you can customize the inputs. In the above case, there are two arguments, `query_embedding` and `limit`. For `query_embedding` we are setting the value to the column name where the embeddings are stored - in this case, the `predict_col` of the `EmbeddingModel`. For `limit`, we are passing a lambda function that takes the row (a dictionary of the columns) and returns the value of the `limit` column if it exists, otherwise it returns 10. This would actually be automatically handled since the `limit` argument for the `retrieve()` function already sets a default but we are explicitly setting it here to demonstrate dynamic arguments.

Let's see how the arguments are built from a row:

In [None]:
# this is called internally whenever the `model.predict()` method is called
inputs = vm_retriever._get_args(vm_test_ds.df.to_dict(orient="records")[0])

# lets print out the inputs and their types
print("Inputs for `retrieve()` from the first row of our test dataset:")
print("\n".join([f"{k}: {type(v)}" for k, v in inputs.items()]))

In [None]:
vm_test_ds.df[vm_retriever.predict_col] = vm_retriever.predict(vm_test_ds.df)
vm_test_ds.df.head()

## Setup Generation Model

In [None]:
from validmind.models import GenerationModel

system_prompt = """
You are an expert RFP AI assistant.
You are tasked with answering new RFP questions based on existing RFP questions and answers.
You will be provided with the existing RFP questions and answer pairs that are the most relevant to the new RFP question.
After that you will be provided with a new RFP question.
You will generate an answer and respond only with the answer.
Ignore your pre-existing knowledge and answer the question based on the provided context.
""".strip()


def generate(question, contexts):
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": "\n\n".join(contexts)},
            {"role": "user", "content": question},
        ],
    )

    return response.choices[0].message.content

vm_generator = GenerationModel(
    input_id="generation_model",
    predict_fn=generate,
    predict_args={
        "contexts": vm_retriever.predict_col,
        # question will automatically be inferred from the `generate()` function signature
    }
)

In [None]:
vm_test_ds.df[vm_generator.predict_col] = vm_generator.predict(vm_test_ds.df)
vm_test_ds.df.head()

## Setup RAG Model (Pipeline of "Component" Models)

Now that we have our individual models setup, let's create a `RAGModel` instance that will chain them together and give us a single model that can be evalated end-to-end.

In [None]:
from validmind.models import RAGModel

vm_rag_model = RAGModel(
    embedder=vm_embedder,
    retriever=vm_retriever,
    generator=vm_generator,
    input_id="rag_pipeline",
)

Let's run the test dataset through the entire pipeline. It will overwrite the current predictions that we generated from the individual models, but the key here is that calling `predict` on the `RAGModel` will run the entire pipeline and store the intermediate predictions in the dataframe.

In [None]:
result_df = vm_rag_model.predict(vm_test_ds.df)
result_df.head()

## Experiment with some RAGAS Metrics

Below I am just experimenting to see how the RAGAS metrics can work with the `RAGModel` instance. This is not a full implementation of the RAGAS metrics but just a poc. We'll want to make this work in a more general way so that the columns can be properly mapped from the user-provided `predict_col` or the default `predict_col` to the column names that RAGAS expects i.e. `question`, `contexts`, `answer`, `ground_truth`.

In [None]:
vm_ragas_ds = vm.init_dataset(result_df, __log=False)

In [None]:
import plotly.express as px

def plot_distribution(scores):
    # plot distribution of scores (0-1) from ragas metric
    # scores is a list of floats
    fig = px.histogram(x=scores, nbins=10)
    fig.show()

In [None]:
import warnings

warnings.filterwarnings("ignore")

In [None]:
result = run_test(
    "validmind.model_validation.ragas.AnswerSimilarity",
    inputs={"dataset": vm_ragas_ds},
    show=False,
)
plot_distribution(result.metric.summary.results[0].data)

In [None]:
result = run_test(
    "validmind.model_validation.ragas.ContextEntityRecall",
    inputs={"dataset": vm_ragas_ds},
    show=False,
)
plot_distribution(result.metric.summary.results[0].data)

In [None]:
result = run_test(
    "validmind.model_validation.ragas.ContextPrecision",
    inputs={"dataset": vm_ragas_ds},
    show=False,
)
plot_distribution(result.metric.summary.results[0].data)

In [None]:
result = run_test(
    "validmind.model_validation.ragas.ContextRelevancy",
    inputs={"dataset": vm_ragas_ds},
    show=False,
)
plot_distribution(result.metric.summary.results[0].data)