# A Comprehensive Guide for Developing and Serving RAG Applications in Production (Part 1)

- **Blog post**: https://www.anyscale.com/blog
- **GitHub repository**: https://github.com/ray-project/llm-applications
- **Anyscale Endpoints**: https://endpoints.anyscale.com/
- **Ray documentation**: https://docs.ray.io/

In this guide, we will learn how to:
- 💻 Develop a retrieval augmented generation (RAG) based LLM application.
- 🚀 Scale the major components (embed, index, serve, etc.) in our application.
- ✅ Evaluate different configurations of our application to optimize for both per-component (ex. `retrieval_score`) and overall performance (`quality_score`).
- 🔀 Implement a hybrid routing approach that closes the gap between open-source and closed-source LLMs.
- 📦 Serve the application in a highlight available and scalable manner.

# Overview


Large language models (LLMs) have undoubtedly changed the way we interact with information. However, they come with their fair share of limitations as to what we can ask of them. For example, suppose we wanted to ask our LLM (ex. GPT-4) which two teams are going to play in the Superbowl next weekend? Because our LLMs are only aware of the information that they've been trained on, they will fall short when we require them to know information beyond that. Retrieval augmented generation (RAG) based LLM applications address this exact issue and extend the utility of LLMs and their generative reasoning abilities on our unique datasets.

In this guide, we're going to build an RAG based LLM application where we will incorporate external data sources to augment our LLM’s capabilities. Specifically, we will be building an assistant that can answer questions about [Ray](https://github.com/ray-project/ray). The goal here is to make it easier for developers to adopt Ray but also, as we'll see in this guide, to help improve our Ray documentation itself and provide a foundation for other LLM applications. Our application involves many moving pieces (embedding model, context parameters, the LLM itself, etc.) and so it's important that we experiment with different configurations to optimize for the best quality responses. But it's non-trivial to evaluate and quantitatively compare different configurations for a generative task.

**Note**: We'll be experimenting with different LLMs (OpenAI, Llama, etc.) in this guide. You will need [OpenAI credentials](https://platform.openai.com/account/api-keys) to access [ChatGPT models](https://platform.openai.com/docs/models/) and [Anyscale Endpoints](https://endpoints.anyscale.com/) to access OSS LLMs.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> overall application view.

## Set up

We're going to start by setting up our base imports, directories and initializing Ray with credentials. We'll be using [Ray](https://docs.ray.io/) to easily scale our workloads with minimal changes to our code.

In [1]:
import os
import ray

In [2]:
import sys; sys.path.append("..")
import warnings; warnings.filterwarnings("ignore")
from dotenv import load_dotenv; load_dotenv()

True

In [3]:
from rag.config import ROOT_DIR

We're going to start by initiailizing Ray with some required credentials for our application, such as, our [OpenAI](https://platform.openai.com/docs/models/gpt-4) (for ChatGPT models), [Anyscale Endpoints](https://endpoints.anyscale.com/) (for OSS LLMs like Llama-2) and database connection credentials.

In [4]:
# Credentials
ray.init(runtime_env={
    "env_vars": {
        "OPENAI_API_BASE": os.environ["OPENAI_API_BASE"],
        "OPENAI_API_KEY": os.environ["OPENAI_API_KEY"], 
        "ANYSCALE_API_BASE": os.environ["ANYSCALE_API_BASE"],
        "ANYSCALE_API_KEY": os.environ["ANYSCALE_API_KEY"],
        "DB_CONNECTION_STRING": os.environ["DB_CONNECTION_STRING"],
    },
    "working_dir": str(ROOT_DIR),
})

2023-09-10 23:34:33,380	INFO worker.py:1431 -- Connecting to existing Ray cluster at address: 10.0.7.241:6379...
2023-09-10 23:34:33,433	INFO worker.py:1612 -- Connected to Ray cluster. View the dashboard at [1m[32mhttps://session-yn5cwtau135l5cajlbkzrdyqqp.i.anyscaleuserdata-staging.com [39m[22m
2023-09-10 23:34:33,513	INFO packaging.py:518 -- Creating a file package for local directory '/home/ray/ray-assistant/notebooks/..'.
2023-09-10 23:34:33,657	INFO packaging.py:346 -- Pushing file package 'gcs://_ray_pkg_94378f84ca5756e8.zip' (19.15MiB) to Ray cluster...
2023-09-10 23:34:33,711	INFO packaging.py:359 -- Successfully pushed file package 'gcs://_ray_pkg_94378f84ca5756e8.zip'.


0,1
Python version:,3.8.13
Ray version:,2.6.3
Dashboard:,http://session-yn5cwtau135l5cajlbkzrdyqqp.i.anyscaleuserdata-staging.com


We've also created some mappings for the different embedding and language models we'll be developing with in our application:

In [5]:
from rag.config import EMBEDDING_DIMENSIONS, MAX_CONTEXT_LENGTHS

In [6]:
# Embedding dimensions
EMBEDDING_DIMENSIONS

{'thenlper/gte-base': 768,
 'BAAI/bge-large-en': 1024,
 'text-embedding-ada-002': 1536}

In [7]:
# LLM context lengths
MAX_CONTEXT_LENGTHS

{'gpt-4': 8192,
 'gpt-3.5-turbo': 4096,
 'gpt-3.5-turbo-16k': 16384,
 'meta-llama/Llama-2-7b-chat-hf': 4096,
 'meta-llama/Llama-2-13b-chat-hf': 4096,
 'meta-llama/Llama-2-70b-chat-hf': 4096}

## Data

### Load data

In [8]:
from pathlib import Path
from rag.config import EFS_DIR

We need to first download the [Ray documentation](https://docs.ray.io/) to a directory:
```bash
export EFS_DIR=/desired/output/directory
wget -e robots=off --recursive --no-clobber --page-requisites \
  --html-extension --convert-links --restrict-file-names=windows \
  --domains docs.ray.io --no-parent --accept=html \
  -P $EFS_DIR https://docs.ray.io/en/master/
```

Then, we'll load the paths to our downloaded artifacts (html files) into a [Ray Dataset](https://docs.ray.io/en/latest/data/data.html) so that we can perform workloads on them at scale (ex. embed, index, etc.)

In [9]:
# Ray dataset
DOCS_DIR = Path(EFS_DIR, "docs.ray.io/en/master/")
ds = ray.data.from_items([{"path": path} for path in DOCS_DIR.rglob("*.html") if not path.is_dir()])
print(f"{ds.count()} documents")

3282 documents


### Sections

Now that we have a dataset of all the paths to the html files, we're going to develop some functions that can appropriately extract the text from these files. We want to do this in a generalized manner so that we can perform this extraction across all of our docs pages. Therefore, we identify the sections in our html page and then extract the text in between them. We save all of this into a list of dictionaries that map the text within a section to the specific url + section anchor id.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Example of sectionization process.

In [10]:
from rag.data import extract_sections

In [11]:
sample_html_fp = Path(EFS_DIR, "docs.ray.io/en/master/rllib/rllib-env.html")
extract_sections({"path": sample_html_fp})[0]

{'source': 'https://docs.ray.io/en/master/rllib/rllib-env.html#environments',
 'text': '\nEnvironments#\nRLlib works with several different types of environments, including Farama-Foundation Gymnasium, user-defined, multi-agent, and also batched environments.\nTip\nNot all environments work with all algorithms. Check out the algorithm overview for more information.\n'}

We can apply our extraction function to all the paths in our dataset by using [flat_map](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.flat_map.html). This will apply our `extract_section` function to each path in our dataset by utilizing all of our CPU workers in parallel.

In [12]:
# Extract sections
sections_ds = ds.flat_map(extract_sections)
sections_ds.count()

2023-09-10 23:34:41,263	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)]
2023-09-10 23:34:41,264	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-10 23:34:41,265	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

5727

### Chunk data

We now have a list of sections (with text and source of each section) but we shouldn't directly use this as context to our RAG application just yet. The text lengths of each section are all varied and many are quite large chunks. If were to use these large sections, then we'd be inserting a lot of noisy/unwanted context and because all LLMs have a maximum context length, we wouldn't be able to fit too many relevant contexts. Therefore, we're going to split the text within each section into smaller chunks. Intuitively, smaller chunks will encapsulate single/few concepts and will be less noisy compared to larger chunks. We're going to choose some typical text splitting values (ex. `chunk_size=100`) to create our chunks for now but we'll be experiments with a range of values later.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Sample chunking logic in action.

In [13]:
from functools import partial
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [14]:
# Text splitter
chunk_size = 300
chunk_overlap = 50
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],
    chunk_size=chunk_size,
    chunk_overlap=chunk_overlap,
    length_function=len)

In [15]:
# Chunk a sample section
sample_section = sections_ds.take(1)[0]
chunks = text_splitter.create_documents(
    texts=[sample_section["text"]], 
    metadatas=[{"source": sample_section["source"]}])
print (chunks[0])

2023-09-10 23:34:59,499	INFO dataset.py:2180 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.


page_content='ray.tune.TuneConfig.search_alg#\nTuneConfig.search_alg: Optional[Union[ray.tune.search.searcher.Searcher, ray.tune.search.search_algorithm.SearchAlgorithm]] = None#' metadata={'source': 'https://docs.ray.io/en/master/tune/api/doc/ray.tune.TuneConfig.search_alg.html#ray-tune-tuneconfig-search-alg'}


In [16]:
def chunk_section(section, chunk_size, chunk_overlap):
    text_splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", " ", ""],
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len)
    chunks = text_splitter.create_documents(
        texts=[section["text"]], 
        metadatas=[{"source": section["source"]}])
    return [{"text": chunk.page_content, "source": chunk.metadata["source"]} for chunk in chunks]

In [17]:
# Scale chunking
chunks_ds = sections_ds.flat_map(partial(
    chunk_section, 
    chunk_size=chunk_size, 
    chunk_overlap=chunk_overlap))
print(f"{chunks_ds.count()} chunks")
chunks_ds.show(1)

2023-09-10 23:34:59,521	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)->FlatMap(partial)]
2023-09-10 23:34:59,522	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-10 23:34:59,523	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]



32276 chunks
{'text': 'ray.tune.search.basic_variant.BasicVariantGenerator.set_search_properties#\nBasicVariantGenerator.set_search_properties(metric: Optional[str], mode: Optional[str], config: Dict, **spec) → bool#\nPass search properties to search algorithm.', 'source': 'https://docs.ray.io/en/master/tune/api/doc/ray.tune.search.basic_variant.BasicVariantGenerator.set_search_properties.html#ray-tune-search-basic-variant-basicvariantgenerator-set-search-properties'}


### Embed data

Now that we've created small chunks from our dataset, we need a way to identify the most relevant ones to a given query. A very effective and quick method is to embed our data using a pretrained model and use the same model to embed the query. We can then compute the distance between all of the chunk embeddings and our query embedding to determine the top k chunks. There are many different pretrained models to choose from to embed our data but the most popular ones can be discovered through [HuggingFace's Massive Text Embedding Benchmark (MTEB)](https://huggingface.co/spaces/mteb/leaderboard) leadboard. These models were pretrained on very large text corpus through tasks such as next/masked token prediction that allows them to learn to represent subtokens in N dimensions and capture semantic relationships. We can leverage this to represent our data and make decisions such as the most relevant contexts to use to answer a given query. We're using Langchain's Embedding wrappers ([HuggingFaceEmbeddings](https://api.python.langchain.com/en/latest/embeddings/langchain.embeddings.huggingface.HuggingFaceEmbeddings.html) and [OpenAIEmbeddings](https://api.python.langchain.com/en/latest/embeddings/langchain.embeddings.openai.OpenAIEmbeddings.html)) to easily load the models and embed our document chunks.

**Note**: embeddings aren't the only way to determine the more relevant chunks. We could also use an LLM to decide! However, because LLMs are much larger than these embedding models and have maximum context lengths, it's better to use embeddings to retrieve the top k chunks. And then we could use LLMs on the fewer k chunks to determine the <k chunks to use as the context to answer our query. We could also use reranking (ex. [Cohere Rerank](https://txt.cohere.com/rerank/)) to further identify the most relevant chunks to use.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Represent a text chunk getting embedded.

In [18]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import numpy as np
from ray.data import ActorPoolStrategy

In [19]:
class EmbedChunks:
    def __init__(self, model_name):
        if model_name == "text-embedding-ada-002":
            self.embedding_model = OpenAIEmbeddings(
                model=model_name,
                openai_api_base=os.environ["OPENAI_API_BASE"],
                openai_api_key=os.environ["OPENAI_API_KEY"])
        else:
            self.embedding_model = HuggingFaceEmbeddings(
                model_name=model_name,
                model_kwargs={"device": "cuda"},
                encode_kwargs={"device": "cuda", "batch_size": 100})
    
    def __call__(self, batch):
        embeddings = self.embedding_model.embed_documents(batch["text"])
        return {"text": batch["text"], "source": batch["source"], "embeddings": embeddings}

Here we're able to embed our chunks at scale by using [map_batches](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html). All we had to do was define the `batch_size` and the compute to use (we're using two workers, each with 1 GPU).

In [20]:
# Embed chunks
embedding_model_name = "thenlper/gte-base"
embedded_chunks = chunks_ds.map_batches(
    EmbedChunks,
    fn_constructor_kwargs={"model_name": embedding_model_name},
    batch_size=100, 
    num_gpus=1,
    compute=ActorPoolStrategy(size=2))

In [None]:
# Sample
sample = embedded_chunks.take(1)
print ("embedding size:", len(sample[0]["embeddings"]))
print (sample[0]["text"])

### Index data

Now that we have our embedded chunks, we need to index (store) them somewhere so that we can retrieve them quickly for inference. While there are many popular vector database options, we're going to use [Postgres](https://www.postgresql.org/) for it's simplificty and performance. We'll create a table (`document`) and write the (`text`, `source`, `embedding`) triplets for each embedded chunk we have.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Show a triplet getting indexed in a vector DB.

In [21]:
import psycopg
from pgvector.psycopg import register_vector

In [None]:
%%bash
# Set up pgvector
bash ../setup-pgvector.sh

In [None]:
%%bash
# Drop table and load index
psql "$DB_CONNECTION_STRING" -c "DROP TABLE IF EXISTS document;"
sudo -u postgres psql -f ../migrations/vector-768.sql  # "thenlper/gte-base" dimension is 768
export SQL_DUMP_FP="/efs/shared_storage/goku/sql_dumps/gte-base_300_50.sql"
psql "$DB_CONNECTION_STRING" -f $SQL_DUMP_FP  # load
psql "$DB_CONNECTION_STRING" -c "SELECT count(*) FROM document;"  # num rows

If we don't have an index saved already, we can index the data and save it:

In [22]:
class StoreResults:
    def __call__(self, batch):
        with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
            register_vector(conn)
            with conn.cursor() as cur:
                for text, source, embedding in zip(batch["text"], batch["source"], batch["embeddings"]):
                    cur.execute("INSERT INTO document (text, source, embedding) VALUES (%s, %s, %s)", (text, source, embedding,),)
        return {}

In [26]:
# Index data
embedded_chunks.map_batches(
    StoreResults,
    batch_size=128,
    num_cpus=1,
    compute=ActorPoolStrategy(size=28),
).count()

2023-09-07 13:54:40,013	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(EmbedChunks)] -> ActorPoolMapOperator[MapBatches(StoreResults)]
2023-09-07 13:54:40,014	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-07 13:54:40,015	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-09-07 13:54:40,041	INFO actor_pool_map_operator.py:117 -- MapBatches(EmbedChunks): Waiting for 2 pool actors to start...
2023-09-07 13:54:58,943	INFO actor_pool_map_operator.py:117 -- MapBatches(StoreResults): Waiting for 28 pool actors to start...


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]



0

In [24]:
%%bash
# Save index
export SQL_DUMP_FP="/efs/shared_storage/goku/sql_dumps/gte-base_300_50.sql"
mkdir -p $(dirname "$SQL_DUMP_FP") && touch $SQL_DUMP_FP
sudo -u postgres pg_dump -c > $SQL_DUMP_FP  # save

## Retrieval

With our embedded chunks properly indexed in our vector database, we're ready to perform retrieval for a given query. We'll start by using the same embedding model we used to embed our text chunks to now embed the incoming query.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Show the query getting embedded and show the retrieval process.

In [23]:
import json
import numpy as np

In [24]:
# Embed query
embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)
query = "What is the default batch size for map_batches?"
embedding = np.array(embedding_model.embed_query(query))
len(embedding)

768

Then, we'll retrieve the top most revelant chunks by extracting the closest embedded chunks to our embedded query. We use cosine distance (`<=>`) but there are [many options](https://github.com/pgvector/pgvector#vector-operators) to choose from. Once we retrieve the top `num_chunks`, we can collect the text for each chunk and use it as context to generate a response.

In [25]:
# Get context
num_chunks = 5
with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
    register_vector(conn)
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM document ORDER BY embedding <=> %s LIMIT %s", (embedding, num_chunks))
        rows = cur.fetchall()
        context = [{"text": row[1]} for row in rows]
        sources = [row[2] for row in rows]

In [26]:
for i, item in enumerate(context):
    print (sources[i])
    print (item["text"])
    print ()

https://docs.ray.io/en/master/data/api/doc/ray.data.Dataset.map_batches.html#ray-data-dataset-map-batches
to a given map task. Default batch_size is 4096 with “default”.
compute – Either “tasks” (default) to use Ray Tasks or an
ActorPoolStrategy to use an autoscaling actor pool.
batch_format – If "default" or "numpy", batches are
Dict[str, numpy.ndarray]. If "pandas", batches are
pandas.DataFrame.
zero_copy_batch – Whether fn should be provided zero-copy, read-only
batches. If this is True and no copy is required for the
batch_format conversion, the batch is a zero-copy, read-only

https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-size
Configuring batch size#
Increasing batch_size improves the performance of vectorized transformations like
NumPy functions and model inference. However, if your batch size is too large, your
program might run out of memory. If you encounter an out-of-memory error, decrease your
batch_size.
Note
The default batch size depends on y

## Generation

We can now use the context to generate a response from our LLM. Without this relevant context that we retreived, the LLM may not have been able to accurately answer our question. And as our data grows, we can just as easily embed and index any new data and be able to retrieve it to answer questions.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Show how retrieved context + query texts are fed into API.

In [27]:
import openai
import time

In [28]:
from rag.generate import prepare_response
from rag.utils import get_credentials

In [29]:
def generate_response(
    llm, temperature=0.0, stream=True,
    system_content="", assistant_content="", user_content="", 
    max_retries=3, retry_interval=60):
    """Generate response from an LLM."""
    retry_count = 0
    api_base, api_key = get_credentials(llm=llm)
    while retry_count < max_retries:
        try:
            response = openai.ChatCompletion.create(
                model=llm,
                temperature=temperature,
                stream=stream,
                api_base=api_base,
                api_key=api_key,
                messages=[
                    {"role": "system", "content": system_content},
                    {"role": "assistant", "content": assistant_content},
                    {"role": "user", "content": user_content},
                ],
            )
            return prepare_response(response=response, stream=stream)

        except Exception as e:
            print(f"Exception: {e}")
            time.sleep(retry_interval)  # default is per-minute rate limits
            retry_count += 1
    return ""

In [30]:
# Generate response
query = "What is the default batch size for map_batches?"
response = generate_response(
    llm="meta-llama/Llama-2-70b-chat-hf",
    temperature=0.0,
    stream=True,
    system_content="Answer the query using the context provided. Be succinct.",
    user_content=f"query: {query}, context: {context}")

The default batch size for map_batches is 4096.

### Agent

Let's combine the context retrieval and response generation together into a conventient query agent that we can use to easily generate our responses.

In [31]:
from rag.embed import get_embedding_model
from rag.generate import get_sources_and_context

In [32]:
class QueryAgent:
    def __init__(self, embedding_model_name="thenlper/gte-base",
                 llm="meta-llama/Llama-2-70b-chat-hf", temperature=0.0, 
                 max_context_length=4096, system_content="", assistant_content=""):
        
        # Embedding model
        self.embedding_model = get_embedding_model(
            embedding_model_name=embedding_model_name, 
            model_kwargs={"device": "cuda"}, 
            encode_kwargs={"device": "cuda", "batch_size": 100})
        
        # LLM
        self.llm = llm
        self.temperature = temperature
        self.context_length = max_context_length - len(system_content + assistant_content)
        self.system_content = system_content
        self.assistant_content = assistant_content

    def __call__(self, query, num_chunks=5, stream=True):
        # Get sources and context
        sources, context = get_sources_and_context(
            query=query,
            embedding_model=self.embedding_model,
            num_chunks=num_chunks)
            
        # Generate response
        user_content = f"query: {query}, context: {context}"
        answer = generate_response(
            llm=self.llm,
            temperature=self.temperature,
            stream=stream,
            system_content=self.system_content,
            assistant_content=self.assistant_content,
            user_content=user_content[: self.context_length])

        # Result
        result = {
            "question": query,
            "sources": sources,
            "answer": answer,
            "llm": self.llm,
        }
        return result

In [33]:
embedding_model_name="thenlper/gte-base"
llm="meta-llama/Llama-2-7b-chat-hf"

In [34]:
query = "What is the default batch size for map_batches?"
system_content = "Answer the query using the context provided. Be succinct."
agent = QueryAgent(
    embedding_model_name=embedding_model_name,
    llm=llm,
    max_context_length=MAX_CONTEXT_LENGTHS[llm],
    system_content=system_content)
result = agent(query=query)
print("\n\n", json.dumps(result, indent=2))

The default batch size for `map_batches` is 4096

 {
  "question": "What is the default batch size for map_batches?",
  "sources": [
    "https://docs.ray.io/en/master/data/api/doc/ray.data.Dataset.map_batches.html#ray-data-dataset-map-batches",
    "https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-size",
    "https://docs.ray.io/en/master/data/data-internals.html#execution-memory",
    "https://docs.ray.io/en/master/serve/advanced-guides/dyn-req-batch.html#tips-for-fine-tuning-batching-parameters",
    "https://docs.ray.io/en/master/data/examples/pytorch_resnet_batch_prediction.html#model-inference"
  ],
  "answer": "The default batch size for `map_batches` is 4096",
  "llm": "meta-llama/Llama-2-7b-chat-hf"
}


# Evaluation

So far, we've chosen typical/arbitrary values for the various parts of our RAG application. But if we were to change something, such as our chunking logic, embedding model, LLM, etc. how can we know that we have a better configuration than before. A generative task like this is very difficult to quantitatively assess and so we need to develop creative ways to do so. 

Because we have many moving parts in our application, we need to perform unit/component and end-to-end evaluation. Component-wise evaluation can involve evaluating our retrieval in isolation (is the best source in our set of retrieved chunks) and evaluating our LLMs response (given the best source, is the LLM able to produce a quality answer). As for end-to-end evaluation, we can assess the quality of the entire system (given all data, what is the quality of the response). 

We'll be asking our evaluator LLM to score the response between 1-5 using the context, however, we could also have it produce scores for other dimensions such as hallucination (is the generated answer using information only from the provided context), toxticity, etc.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Component and end-to-end evaluation.

In [35]:
# If running tests / small samples, set num_samples to <10
EXPERIMENTS_DIR = Path(ROOT_DIR, "experiments")
NUM_SAMPLES = None  # None = all samples

## Evaluator

We're going to start by determining our evaluator. Given a response to a query and relevant context, our evaluator should be a trusted way to score/assess the quality of the response. But before we can determine our evaluator, we need a dataset of questions and the source where the answer comes from. We can use this dataset to ask our different evaluators to provide an answer and then rate their answer (ex. score between 1-5). We can then inspect this dataset to determine if our evaluator is unbiased and has sound reasoning for the scores that are assigned.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Show the process of evaluator answers the question (given dataset of questions + best source) and how we inspect the results to determine the evaluator.

We'll start by manually creating our dataset. We have a list of user queries and the ideal source to answer the query [`datasets/eval-dataset-v1.jsonl`](https://github.com/ray-project/llm-applications/blob/main/datasets/eval-dataset-v1.jsonl). We will our LLM app above to generate reference answer for each query/source pair using `gpt-4`.

In [36]:
from bs4 import BeautifulSoup
from IPython.display import JSON, clear_output, display
from tqdm import tqdm
import urllib.parse

In [37]:
from rag.evaluate import extract_from_response
from rag.data import fetch_text

In [38]:
# Load dataset
with open(Path(ROOT_DIR, "datasets/eval-dataset-v1.jsonl"), "r") as f:
    data = [json.loads(item) for item in list(f)]

In [39]:
data[:5]

[{'question': 'I’m struggling a bit with Ray Data type conversions when I do map_batches. Any advice?',
  'source': 'https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-format'},
 {'question': 'How does autoscaling work in a Ray Serve application?',
  'source': 'https://docs.ray.io/en/master/serve/scaling-and-resource-allocation.html#autoscaling'},
 {'question': 'how do I get the address of a ray node',
  'source': 'https://docs.ray.io/en/master/ray-core/miscellaneous.html#node-information'},
 {'question': 'Does Ray support NCCL?',
  'source': 'https://docs.ray.io/en/master/ray-more-libs/ray-collective.html'},
 {'question': 'Is Ray integrated with DeepSpeed?',
  'source': 'https://docs.ray.io/en/master/ray-air/examples/gptj_deepspeed_fine_tuning.html#fine-tuning-the-model-with-ray-air-a-name-train-a'}]

In [40]:
# Sample
uri = "https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-format"
fetch_text(uri=uri)

'\nConfiguring batch format#\nRay Data represents batches as dicts of NumPy ndarrays or pandas DataFrames. By\ndefault, Ray Data represents batches as dicts of NumPy ndarrays.\nTo configure the batch type, specify batch_format in\nmap_batches(). You can return either format from your function.\n\n\n\nNumPy\nfrom typing import Dict\nimport numpy as np\nimport ray\n\ndef increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:\n    batch["image"] = np.clip(batch["image"] + 4, 0, 255)\n    return batch\n\nds = (\n    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")\n    .map_batches(increase_brightness, batch_format="numpy")\n)\n\n\n\n\n\npandas\nimport pandas as pd\nimport ray\n\ndef drop_nas(batch: pd.DataFrame) -> pd.DataFrame:\n    return batch.dropna()\n\nds = (\n    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")\n    .map_batches(drop_nas, batch_format="pandas")\n)\n\n\n\n\n'

In [41]:
# Content for inference
system_content = """
    Answer the query using the context provided. Be succinct.
    Then, you must {score} your response between 1 and 5.
    You must return your response in a line with only the score.
    Do not add any more details.
    On a separate line provide your {reasoning} for the score as well.
    Return your response following the exact format outlined below.
    Do not add or remove anything.
    And all of this must be in a valid JSON format.
    
    {"answer": answer,
     "score": score,
     "reasoning": reasoning}
    """
assistant_content = ""

In [42]:
class QueryAgentWithContext(QueryAgent):
    def __call__(self, query, context):
        user_content = f"query: {query}, context: {context}"
        response = generate_response(
            llm=self.llm,
            temperature=self.temperature,
            stream=True,
            system_content=self.system_content,
            assistant_content=self.assistant_content,
            user_content=user_content[: self.context_length])
        return response

In [43]:
def get_references(data, llm, temperature, system_content, assistant_content, num_samples=None):
    # Initialize agent
    agent = QueryAgentWithContext(
        llm=llm, 
        temperature=temperature,
        system_content=system_content,
        assistant_content=assistant_content)
    
    results = []
    for row in tqdm(data[:num_samples]):
        # Generate response
        query = row["question"]
        context = fetch_text(uri=row["source"])
        response = agent(query=query, context=context)

        # Extract from response
        answer, score, reasoning = extract_from_response(response=response)
        result = ({
                "question": query,
                "source": row["source"],
                "answer": answer,
                "score": score,
                "reasoning": reasoning,
            })
        results.append(result)
        clear_output(wait=True)
        display(JSON(json.dumps(result, indent=2)))
    return results

Let's generate reference responses with `gpt-4` as well:

In [47]:
# GPT-4
results = get_references(
    data=data, num_samples=num_samples, llm="gpt-4", temperature=0.0, 
    system_content=system_content, assistant_content=assistant_content)
print (np.mean([float(result["score"]) for result in results if result["score"]]))

<IPython.core.display.JSON object>

100%|██████████| 1/1 [00:12<00:00, 12.55s/it]

5.0





In [51]:
# Save to file
REFERENCES_FILE_PATH = Path(EXPERIMENTS_DIR, "references", "gpt-4.json")
REFERENCES_FILE_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(REFERENCES_FILE_PATH, "w") as fp:
    json.dump(results, fp, indent=4)

We can use these responses to decide on a quality evaluator for our future experiments. This evaluator will be used to score answers for different experiment configuations and so we need to be able to trust their scores, reasoning, etc. We did the same with Llama-2-70b as an evaluator and we determined that it was not a good evaluator. For most answers the reasoning is not good, and the score is pretty random with lots of 4s. Therefore, our evaluator will be `gpt-4`.

In [44]:
EVALUATOR = "gpt-4"

## Cold start

We may not always have a prepared dataset of questions and the best source to answer that question readily available. To address this cold start problem, we could use an LLM to look at our text chunks and generate questions that the specific chunk would answer. This provides us with quality questions and the exact source the answer is in. However, this dataset generation method could be a bit noisy. The generate questions may not always be resembling of what your users may ask and the specific chunk we say is the best source may also have that exact information in other chunks. Nonetheless, this is a great way to start our development process while we collect + manually label a high quality dataset.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Show the synthetic data generation process.

In [None]:
# Prompt
num_questions = 3
system_content = f"""
Create {num_questions} questions using only the context provided.
End each question with a '?' character and then in a newline write the answer to that question using only the context provided.
Separate each question/answer pair by a newline.
"""

In [63]:
# Generate questions
synthetic_data = []
for chunk in chunks[:1]:  # small samples
    response = generate_response(
        llm="gpt-4",
        temperature=0.0,
        system_content=system_content,
        user_content=f"context: {chunk.page_content}")
    entries = response.split("\n\n")
    for entry in entries:
        question, answer = entry.split("\n")
        synthetic_data.append({"question": question, "source": chunk.metadata["source"], "answer": answer})

In [64]:
synthetic_data[:3]

[{'question': 'What is the context discussing about?',
  'source': 'https://docs.ray.io/en/master/tune/api/integration.html#external-library-integrations-for-ray-tune',
  'answer': 'The context is discussing about external library integrations for Ray Tune.'},
 {'question': 'What is Ray Tune?',
  'source': 'https://docs.ray.io/en/master/tune/api/integration.html#external-library-integrations-for-ray-tune',
  'answer': 'The context does not provide information on what Ray Tune is.'},
 {'question': 'What are external library integrations?',
  'source': 'https://docs.ray.io/en/master/tune/api/integration.html#external-library-integrations-for-ray-tune',
  'answer': 'The context does not provide information on what external library integrations are.'}]

## Experiments

With our evaluator set, we're ready to start experimenting with the various components in our LLM application. While we could perform this as a large [tuning experiment](https://docs.ray.io/en/latest/tune/index.html), where we can search across promising combintion of values/decisions, we're going to evaluation one decision at a time and fix the best value for the next experiment.

**Note**: this approach is slightly biased because many of our decisions are not indepedent (ex. `chunk_size` and `num_chunks` should ideally be evaluated across many combinations of values).

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Illustrate all the components that we'll be tuning.

### Utilities

Before we get started with our experiments, we're going to define some utility functions that we'll use to easily generate and evaluate responses using the different experiment configurations. We'll also define some functions to help determine our response quality score, retrieval recall score, etc.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Represent the main experiment function doing the `generate` and `evaluate` responses.

We'll set where our labeled data and reference reports are located. We'll be using the former to generate responses and the latter dataset to evaluate those responses.

In [45]:
from rag.generate import generate_responses
from rag.evaluate import evaluate_responses

Let's define a function to determine our retrieval score, which registers a success if the best source is anywhere in our retrieval `num_chunks` sources. We don't account for order, exact page section, etc. but we could add those constraints to have a more conservative retreival score.

In [46]:
def get_retrieval_score(references, generated):
    matches = np.zeros(len(references))
    for i in range(len(references)):
        reference_source = references[i]["source"].split("#")[0]
        if not reference_source:
            matches[i] = 1
            continue
        for source in generated[i]["sources"]:
            # sections don't have to perfectly match
            if reference_source == source.split("#")[0]:
                matches[i] = 1
                continue
    retrieval_score = np.mean(matches)
    return retrieval_score

We'll define one encompassing function that will generate and evaluate the responses so that we can run these experiments with one function call.

In [47]:
def run_experiment(
    experiment_name,
    chunk_size, chunk_overlap, num_chunks,
    embedding_model_name, llm, evaluator,
    docs_dir, experiments_dir, references_fp,
    num_samples=None):
    """Generate responses and evaluate them."""
    
    # Generate responses
    generation_system_content = "Answer the query using the context provided. Be succinct."
    generate_responses(
        experiment_name=experiment_name, 
        chunk_size=chunk_size, 
        chunk_overlap=chunk_overlap, 
        num_chunks=num_chunks,
        embedding_model_name=embedding_model_name, 
        llm=llm, 
        temperature=0.0, 
        max_context_length=MAX_CONTEXT_LENGTHS[llm], 
        system_content=generation_system_content,
        assistant_content="",
        docs_dir=docs_dir,
        experiments_dir=experiments_dir,
        references_fp=references_fp,
        num_samples=num_samples)

    # Evaluate responses
    evaluation_system_content = """
        Your job is to rate the quality of our generated answer {generated_answer}
        given a query {query} and a reference answer {reference_answer}.
        Your score has to be between 1 and 5.
        You must return your response in a line with only the score.
        Do not return answers in any other format.
        On a separate line provide your reasoning for the score as well.
        """
    evaluate_responses(
        experiment_name=experiment_name,
        evaluator=evaluator, 
        temperature=0.0, 
        max_context_length=MAX_CONTEXT_LENGTHS[evaluator],
        system_content=evaluation_system_content,
        assistant_content="",
        experiments_dir=experiments_dir,
        references_fp=references_fp,
        responses_fp=str(Path(experiments_dir, "responses", f"{experiment_name}.json")),
        num_samples=num_samples)

In [48]:
def print_experiment(experiment_name, experiments_dir, evaluator=EVALUATOR):
    eval_fp = Path(experiments_dir, "evaluations", f"{experiment_name}_{evaluator}.json")
    with open(eval_fp, "r") as fp:
        d = json.load(fp)
    print (experiment_name)
    print ("  retrieval score:", d["retrieval_score"])
    print ("  quality score:", d["quality_score"])
    print ()

In [49]:
llm = "gpt-3.5-turbo"

### Context

We're first going to test if the additonal context we provide is helpful at all. This is to validate that the RAG system is indeed worth the effort. We can do this by settings `num_chunks=0` (no context) and comparing that to `num_chunks=5`.

In [None]:
# Without context
num_chunks = 0
experiment_name = f"without-context"
run_experiment(
    experiment_name=experiment_name, 
    chunk_size=300, 
    chunk_overlap=50,
    num_chunks=num_chunks,
    embedding_model_name="thenlper/gte-base",
    llm=llm,
    evaluator=EVALUATOR,
    docs_dir=DOCS_DIR, 
    experiments_dir=EXPERIMENTS_DIR, 
    references_fp=REFERENCES_FILE_PATH,
    num_samples=NUM_SAMPLES)

As a sanity check, our retrieval score should be zero since we're not using any context :)

In [55]:
print_experiment(experiment_name, EXPERIMENTS_DIR)

without-context
  retrieval score: 0.0
  quality score: 3.110169491525424



In [56]:
# With context
num_chunks = 5
experiment_name = "with-context"
run_experiment(
    experiment_name=experiment_name, 
    chunk_size=300, 
    chunk_overlap=50, 
    num_chunks=num_chunks,
    embedding_model_name="thenlper/gte-base",
    llm=llm,
    evaluator=EVALUATOR,
    docs_dir=DOCS_DIR, 
    experiments_dir=EXPERIMENTS_DIR, 
    references_fp=REFERENCES_FILE_PATH,
    num_samples=NUM_SAMPLES)

<IPython.core.display.JSON object>

100%|██████████| 1/1 [00:04<00:00,  4.54s/it]


In [61]:
print_experiment(experiment_name, EXPERIMENTS_DIR)

with-context
  retrieval score: 0.5254237288135594
  quality score: 3.4491525423728815



As we can see, using context (RAG) does indeed help in the quality of our answers (and by a meaningful margin).

### Chunk size

Next, we'll access various chunk sizes. Smaller chunks (but not too small!) are able to encapsulate atomic concepts which yields more precise retrieval. While larger chunks may be more noisy. Popular strategies include using small chunks but retrieving a bit of the [surrounding chunks](https://gpt-index.readthedocs.io/en/latest/end_to_end_tutorials/dev_practices/production_rag.html#decoupling-chunks-used-for-retrieval-vs-chunks-used-for-synthesis) around it (since it may have relevnat info) or store [mulitple embeddings](https://python.langchain.com/docs/modules/data_connection/retrievers/multi_vector) per document (ex. summary embedding per document). 

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> illustrate small vs large and popular strategies.

In [65]:
chunk_sizes = [100, 300, 500, 700]

In [66]:
for chunk_size in chunk_sizes:
    experiment_name = f"chunk-size-{chunk_size}"
    run_experiment(
        experiment_name=experiment_name, 
        chunk_size=chunk_size, 
        chunk_overlap=50, 
        num_chunks=5,
        embedding_model_name="thenlper/gte-base",
        llm=llm,
        evaluator=EVALUATOR,
        docs_dir=DOCS_DIR, 
        experiments_dir=EXPERIMENTS_DIR, 
        references_fp=REFERENCES_FILE_PATH,
        num_samples=NUM_SAMPLES)

<IPython.core.display.JSON object>

100%|██████████| 177/177 [21:14<00:00,  7.20s/it]


In [67]:
for chunk_size in chunk_sizes:
    experiment_name = f"chunk-size-{chunk_size}"
    print_experiment(experiment_name, EXPERIMENTS_DIR)

chunk-size-100
  retrieval score: 0.4180790960451977
  quality score: 3.073446327683616

chunk-size-300
  retrieval score: 0.5254237288135594
  quality score: 3.3983050847457625

chunk-size-500
  retrieval score: 0.5480225988700564
  quality score: 3.5338983050847457

chunk-size-700
  retrieval score: 0.519774011299435
  quality score: 3.573446327683616



Seem that a larger chunk size does help but it tapers off around the 600 characters mark (too much context might be too noisy).

**Note**: If we were to use larger chunk sizes (ours is based on characters), keep in mind that [most](https://huggingface.co/spaces/mteb/leaderboard) open source embedding models have a maximum sequence length of 512 sub-word tokens. This means that if our chunk contains more than 512 sub-word tokens, the embedding wouldn't account for it anyway (unless we finetune our embedding model to have longer sequence lengths).

In [50]:
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50

### Number of chunks

Next we'll experiment with the number of chunks to use. More chunks will allow us to add more context but too many could potentially introduce a lot of noise.

**Note**: The `chunk_size` we chose multiplied by the `num_chunks` below fits inside the LLM's context length. We're experimenting with the chunk size and number of chunks as if they were indepdent variables but they area heavily related. Especially since all of our LLMs have a finite maximum context length. So ideally, we would tune for a combination if `chunk_size` * `num_chunks`.

In [69]:
num_chunks_list = [1, 3, 5, 7]

In [70]:
for num_chunks in num_chunks_list:
    experiment_name = f"num-chunks-{num_chunks}"
    run_experiment(
        experiment_name=experiment_name, 
        chunk_size=CHUNK_SIZE, 
        chunk_overlap=CHUNK_OVERLAP, 
        num_chunks=num_chunks,
        embedding_model_name="thenlper/gte-base",
        llm=llm,
        evaluator=EVALUATOR,
        docs_dir=DOCS_DIR, 
        experiments_dir=EXPERIMENTS_DIR, 
        references_fp=REFERENCES_FILE_PATH,
        num_samples=NUM_SAMPLES)

<IPython.core.display.JSON object>

100%|██████████| 177/177 [24:52<00:00,  8.43s/it]


In [71]:
for num_chunks in num_chunks_list:
    experiment_name=f"num-chunks-{num_chunks}"
    print_experiment(experiment_name, EXPERIMENTS_DIR)

num-chunks-1
  retrieval score: 0.20903954802259886
  quality score: 3.1045197740112993

num-chunks-3
  retrieval score: 0.4406779661016949
  quality score: 3.477401129943503

num-chunks-5
  retrieval score: 0.5480225988700564
  quality score: 3.5706214689265536

num-chunks-7
  retrieval score: 0.6214689265536724
  quality score: 3.6016949152542375



Increasing our number of chunks improves our retrieval and quality scores. We had to stop testing at 6 chunks since our `chunk_size` is 600 tokens and `Llama-2-70b`'s maximum context length is 4096 tokens (we also have to account for the system, assistant and user content to our LLM). This is a major reason to invest in extending context size via RoPE scaling (rotary position embeddings), etc. But it also seems that the benefit of increasing the number of chunks is starting to taper off.

In [51]:
NUM_CHUNKS = 7

### Embedding models

So far, we've used [`thenlper/gte-base`](https://huggingface.co/thenlper/gte-base) as our embedding model because it's a relatively small (0.22 GB) and performant option. But now, let's explore other popular options such the current leader on the [MTEB leaderboard](https://huggingface.co/spaces/mteb/leaderboard), [`BAAI/bge-large-en`](https://huggingface.co/BAAI/bge-large-en) (1.34 GB), and OpenAI's [`text-embedding-ada-002`](https://openai.com/blog/new-and-improved-embedding-model).

In [73]:
embedding_model_names = ["thenlper/gte-base", "BAAI/bge-large-en", "text-embedding-ada-002"]

In [74]:
for embedding_model_name in embedding_model_names:
    experiment_name = f"{embedding_model_name.split('/')[-1]}"
    run_experiment(
        experiment_name=experiment_name, 
        chunk_size=CHUNK_SIZE, 
        chunk_overlap=CHUNK_OVERLAP, 
        num_chunks=NUM_CHUNKS,
        embedding_model_name=embedding_model_name,
        llm=llm,
        evaluator=EVALUATOR,
        docs_dir=DOCS_DIR, 
        experiments_dir=EXPERIMENTS_DIR, 
        references_fp=REFERENCES_FILE_PATH,
        num_samples=NUM_SAMPLES)

<IPython.core.display.JSON object>

100%|██████████| 177/177 [54:21<00:00, 18.42s/it]


In [75]:
for embedding_model_name in embedding_model_names:
    experiment_name = f"{embedding_model_name.split('/')[-1]}"
    print_experiment(experiment_name, EXPERIMENTS_DIR)

gte-base
  retrieval score: 0.6214689265536724
  quality score: 3.57909604519774

bge-large-en
  retrieval score: 0.4406779661016949
  quality score: 3.3446327683615817

text-embedding-ada-002
  retrieval score: 0.5988700564971752
  quality score: 3.5112994350282487



This is an interesting outcome because the #1 (`BAAI/bge-large-en`) on the current leaderboard isn't necessarily the best for our specific task. Using the smaller `thenlper/gte-base` produced the best retrieval and quality scores in our experiments.

In [52]:
EMBEDDING_MODEL_NAME = "thenlper/gte-base"

### OSS vs. closed LLMs

We're now going to use the best configurations from above to evaluate different choices for the main LLM.

**Note**:
- We've been using a specific LLM so far to decide on the configuration so that specific LLM's performance here will be a bit biased.
- This list is not exhaustive and even for the LLMs we use, there are versions with longer context windows available.

In [63]:
llms = ["gpt-3.5-turbo",
        "gpt-4",
        "meta-llama/Llama-2-7b-chat-hf", 
        "meta-llama/Llama-2-13b-chat-hf", 
        "meta-llama/Llama-2-70b-chat-hf",
        "tiiuae/falcon-180b"]

In [64]:
for llm in llms:
    experiment_name = f"{llm.split('/')[-1].lower()}"
    run_experiment(
        experiment_name=experiment_name, 
        chunk_size=CHUNK_SIZE, 
        chunk_overlap=CHUNK_OVERLAP, 
        num_chunks=NUM_CHUNKS,
        embedding_model_name=EMBEDDING_MODEL_NAME,
        llm=llm,
        evaluator=EVALUATOR,
        docs_dir=DOCS_DIR, 
        experiments_dir=EXPERIMENTS_DIR, 
        references_fp=REFERENCES_FILE_PATH,
        num_samples=NUM_SAMPLES)

<IPython.core.display.JSON object>

100%|██████████| 177/177 [20:22<00:00,  6.91s/it]


In [183]:
for llm in llms:
    experiment_name = f"{llm.split('/')[-1].lower()}"
    print_experiment(experiment_name, EXPERIMENTS_DIR)

gpt-3.5-turbo
  retrieval score: 0.6214689265536724
  quality score: 3.57909604519774

gpt-4
  retrieval score: 0.6214689265536724
  quality score: 3.824858757062147

llama-2-7b-chat-hf
  retrieval score: 0.6214689265536724
  quality score: 2.864406779661017

llama-2-13b-chat-hf
  retrieval score: 0.6214689265536724
  quality score: 3.138418079096045

llama-2-70b-chat-hf
  retrieval score: 0.6214689265536724
  quality score: 3.4887005649717513

falcon-180b
  retrieval score: 0.6214689265536724
  quality score: 3.4804469273743015



<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> Represent all evaluation reports with nice tables.

**Note**: Some of our LLMs have much larger context lengths, ex. `gpt-4` is 8192 and `gpt-3.5-turbo-16k` is 16384. We could increase the number of chunks that we use for these since we saw that increasing `num_chunks` continued to improve the retrieval and quality scores. However, we will keep this value fixed for now since the performance started to taper off anyway and so we can compare these performances under the exact same configurations.

In [53]:
LLM = "meta-llama/Llama-2-70b-chat-hf"

## Cost analysis

**Note**: Our `Llama-2` models are priced at $1/M tokens with [Anyscale Endpoints](https://endpoints.anyscale.com/).

In [64]:
# Pricing details
pricing = {
    "gpt-3.5-turbo": {
        "prompt": 2e-6,
        "sampled": 2e-6
    },
    "gpt-4": {
        "prompt": 3e-5,
        "sampled": 6e-5
    },
    "llama-2-7b-chat-hf": {
        "prompt": 1e-6,
        "sampled": 1e-6
    },
    "llama-2-13b-chat-hf": {
        "prompt": 1e-6,
        "sampled": 1e-6
    },
    "llama-2-70b-chat-hf": {
        "prompt": 1e-6,
        "sampled": 1e-6
    }
}

In [None]:
def cost_analysis(llm):
    experiment_name = f"{llm.split('/')[-1].lower()}"
    eval_fp = Path(ROOT_DIR, EXPERIMENTS_DIR, "evaluations", f"{experiment_name}_{EVALUATOR}.json")
    with open(eval_fp, "r") as fp:
        d = json.load(fp)
    num_samples = len(d["results"])
    prompt_size, sampled_size = 0, 0
    for result in d["results"]:
        prompt_size += len(result["question"]) + (CHUNK_SIZE * NUM_CHUNKS)
        sampled_size += len(result["generated_answer"])
    total_cost = pricing[experiment_name]["prompt"] * prompt_size + pricing[experiment_name]["sampled"] * sampled_size
    avg_cost = total_cost / num_samples
    
    print (llm)
    print (f"  avg prompt size: {int(prompt_size/num_samples)}")
    print (f"  avg sampled size: {int(sampled_size/num_samples)}")
    print (f"  total cost: ${total_cost:.2f}")
    print (f"  avg cost: ${avg_cost:.2f}")
    print ()

In [67]:
for llm in llms[:-1]:
    cost_analysis(llm=llm)

gpt-3.5-turbo
  avg prompt size: 3567
  avg sampled size: 852
  total cost: $1.56
  avg cost: $0.01

gpt-4
  avg prompt size: 3567
  avg sampled size: 677
  total cost: $26.14
  avg cost: $0.15

meta-llama/Llama-2-7b-chat-hf
  avg prompt size: 3567
  avg sampled size: 2375
  total cost: $1.05
  avg cost: $0.01

meta-llama/Llama-2-13b-chat-hf
  avg prompt size: 3567
  avg sampled size: 1619
  total cost: $0.92
  avg cost: $0.01

meta-llama/Llama-2-70b-chat-hf
  avg prompt size: 3567
  avg sampled size: 1476
  total cost: $0.89
  avg cost: $0.01



## Routing

We can close the gap in performance between open source and proprietary models by routing queries to the right model according to the hardness or topic of the query. E.g. for our RAG application, open source models are doing really well on simple queries where most of the answer is already in the sources. Open source models are doing less well on problems that involve reasoning, numbers or code examples. We can train a classifier that takes the query and routes it to the best model. In order to implement this, we hand-annotated a dataset of 1.8k queries according to which model (`Llama-2-70b` or `gpt-4`) does better -- by default we route to `Llama-2-70b` and only if the query needs more advanced capabilities do we send the query to `gpt-4`. We then evaluate the performance of the model on a test set that has been scored with a judge.

In [54]:
import numpy as np
import pickle
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from sklearn.metrics import precision_recall_fscore_support
from sklearn.pipeline import Pipeline

Let's first train the model on the training dataset `routing-dataset-training.jsonl`.

In [55]:
with open(Path(ROOT_DIR, "datasets", "routing-dataset-train.jsonl")) as f:
    records = [json.loads(l) for l in f]
    texts = [record["question"] for record in records]
    labels = [record["target"] for record in records]

In [56]:
# Sample records (1 = can be handled by OSS LLM)
print ("Question for GPT-4:\n", [record for record in records if record["target"] == 0][0]) 
print ("\nQuestion for Llama-2-70b:\n", [record for record in records if record["target"] == 1][0])

Question for GPT-4:
 {'question': 'if I am inside of a anyscale cluster how do I get my cluster-env-build-id', 'target': 0}

Question for Llama-2-70b:
 {'question': 'what is num_samples in tune?', 'target': 1}


In [57]:
# Train classifier
vectorizer = CountVectorizer()
classifier = LogisticRegression(multi_class="multinomial", solver="lbfgs")
router = Pipeline([("vectorizer", vectorizer), ("classifier", classifier)])
router.fit(texts, labels)

Now let's evaluate the performance on the test dataset:

In [58]:
with open(Path(ROOT_DIR, "datasets", "routing-dataset-test.jsonl")) as f:
    records = [json.loads(line) for line in f]
    texts = [record["question"] for record in records]
    y_test = [record["target"] for record in records]
    score_test = [record["score"] for record in records]

In [59]:
# Predictions
y_pred = router.predict(texts)

In [60]:
metrics = {}
performance = precision_recall_fscore_support(y_test, y_pred, average="weighted")
metrics["precision"] = performance[0]
metrics["recall"] = performance[1]
metrics["f1"] = performance[2]
metrics["num_samples"] = np.float64(len(y_test))
print (json.dumps(metrics, indent=4))

{
    "precision": 0.917778649921507,
    "recall": 0.9285714285714286,
    "f1": 0.9215235703917096,
    "num_samples": 574.0
}


In [61]:
print ("# total samples", len(y_pred))
print(f"# samples for OSS models: {sum(y_pred)} ({sum(y_pred)*100/len(y_pred):.1f}%)")
print("Performance on OSS samples:", np.mean([score_test[i] for i, p in enumerate(y_pred) if p]))
print("Performance on dropped samples:", np.mean([score_test[i] for i, p in enumerate(y_pred) if not p]))

# total samples 574
# samples for OSS models: 546 (95.1%)
Performance on OSS samples: 3.8653846153846154
Performance on dropped samples: 3.625


In [62]:
# Inference
query = "Give me the code for getting results from a training run"
router.predict([query])[0]

0

In [63]:
# Save
router_fp = Path(ROOT_DIR, "datasets", "router.pkl")
with open(router_fp, "wb") as file:
    pickle.dump(router, file)

In [64]:
# Load
with open(router_fp, "rb") as file:
    router = pickle.load(file)

## Serving

Now we're ready to start serving our Ray Assistant using our best configuration. We're going to use [Ray Serve](https://docs.ray.io/en/latest/serve/index.html) with [FastAPI](https://fastapi.tiangolo.com/) to develop and scale our service. First, we'll define some data structures like `Query` and `Answer` to represent the inputs and outputs to our service. We will also define a small function to load our index (assumes that the respective SQL dump file already exists). Finally, we can define our `QueryAgent` and use it to serve `POST` requests with the query. And we can serve our agent at any deployment scale we wish using the [@serve.deployment](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.Deployment.html) decorator where we can specify the number of replicas, compute resources, etc.

<span style="background: yellow; color: red; font-size: 1rem;"><b>DIAGRAM:</b></span> high level diagram of serving + scaling our LLM agent.

In [65]:
import requests
from typing import List

In [66]:
from fastapi import FastAPI
from pydantic import BaseModel
from ray import serve
from rag.index import load_index
from starlette.responses import StreamingResponse

In [67]:
# Initialize application
app = FastAPI()

In [68]:
class Query(BaseModel):
    query: str

In [69]:
class Answer(BaseModel):
    question: str
    sources: List[str]
    answer: str
    llm: str

In [70]:
router_fp = Path(ROOT_DIR, "datasets", "router.pkl")
with open(router_fp, "rb") as file:
    router = pickle.load(file)

In [71]:
@serve.deployment(route_prefix="/", num_replicas=1, ray_actor_options={"num_cpus": 28, "num_gpus": 2})
@serve.ingress(app)
class RayAssistantDeployment:
    def __init__(self, chunk_size, chunk_overlap, num_chunks, embedding_model_name, llm):
        # Set up
        load_index(
            embedding_model_name=embedding_model_name,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap)

        # Query agent
        self.num_chunks = num_chunks
        system_content = "Answer the query using the context provided. Be succint."
        self.oss_agent = QueryAgent(llm=llm, max_context_length=MAX_CONTEXT_LENGTHS[llm], system_content=system_content)
        self.gpt_agent = QueryAgent(llm="gpt-4", max_context_length=MAX_CONTEXT_LENGTHS["gpt-4"], system_content=system_content)
            
        # Router
        router_fp = Path(ROOT_DIR, "datasets", "router.pkl")
        with open(router_fp, "rb") as file:
            self.router = pickle.load(file)

    @app.post("/query")
    def query(self, query: Query) -> Answer:
        use_oss_agent = self.router.predict([query.query])[0]
        agent = self.oss_agent if use_oss_agent else self.gpt_agent
        result = agent(query=query.query, num_chunks=self.num_chunks, stream=False)
        return Answer.parse_obj(result)

    @application.post("/stream")
    def stream(self, query: Query) -> StreamingResponse:
        use_oss_agent = self.router.predict([query.query])[0]
        agent = self.oss_agent if use_oss_agent else self.gpt_agent
        result = agent(query=query.query, num_chunks=self.num_chunks, stream=True)
        return StreamingResponse(result["answer"], media_type="text/plain")

In [72]:
# Deploy the Ray Serve application.
deployment = RayAssistantDeployment.bind(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
    num_chunks=NUM_CHUNKS,
    embedding_model_name=EMBEDDING_MODEL_NAME,
    llm=LLM)
serve.run(deployment)

[2m[36m(ServeController pid=883400)[0m INFO 2023-09-10 23:35:47,404 controller 883400 deployment_state.py:1308 - Deploying new version of deployment default_RayAssistantDeployment.
[2m[36m(HTTPProxyActor pid=883470)[0m INFO:     Started server process [883470]
[2m[36m(ServeController pid=883400)[0m INFO 2023-09-10 23:35:47,507 controller 883400 deployment_state.py:1571 - Adding 1 replica to deployment default_RayAssistantDeployment.
2023-09-10 23:36:10,433	INFO router.py:853 -- Using PowerOfTwoChoicesReplicaScheduler.
2023-09-10 23:36:10,440	INFO router.py:329 -- Got updated replicas for deployment default_RayAssistantDeployment: {'default_RayAssistantDeployment#bZVeMN'}.


RayServeSyncHandle(deployment='default_RayAssistantDeployment')

In [73]:
# Inference
data = {"query": "What is the default batch size for map_batches?"}
response = requests.post("http://127.0.0.1:8000/query", json=data)
print(response.json())

[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m The default batch size
[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m  for
[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m  map_batch
[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m es
[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m  is 4
[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m 0
[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m 9
[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m 6
[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m .
{'question': 'What is the default batch size for map_batches?', 'sources': ['https://docs.ray.io/en/master/data/api/doc/ray.data.Dataset.map_batches.html#ray-data-dataset-map-batches', 'https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-size', 'https://docs.ray.io/en/master/data/data-inte

In [74]:
# Shutdown
serve.shutdown()

[2m[36m(ServeController pid=883400)[0m INFO 2023-09-10 23:36:14,879 controller 883400 deployment_state.py:1595 - Removing 1 replica from deployment 'default_RayAssistantDeployment'.
[2m[36m(ServeReplica:default_RayAssistantDeployment pid=883545)[0m INFO 2023-09-10 23:36:14,845 default_RayAssistantDeployment default_RayAssistantDeployment#bZVeMN AAQsmDrBNj /query default replica.py:723 - __CALL__ OK 4382.8ms
2023-09-10 23:36:14,884	INFO router.py:329 -- Got updated replicas for deployment default_RayAssistantDeployment: set().


## Next steps

Coming in Part II:

- add additional context with retrieved chunks
- keyword search with semantic (embedding) search
- reranking with LLM on retrieved chunks (from embeddings)
- fine-tune embedding model
- fine-tune base LLM (gpt-3.5 and OSS)
- longer context lengths (RoPE)
- structuring and updating the index in the vector DB
- offline/online indexing of new data