# Retrieval Augmented Generation

LLM's excel at a wide range of tasks, but they will struggle with queries specific to a unique business context. This is where Retrieval Augmented Generation (RAG) becomes invaluable. RAG enables an LLM to leverage your internal knowledge bases or customer support documents, significantly enhancing its ability to answer domain-specific questions. Enterprises are increasingly building RAG applications to improve workflows in customer support, Q&A over internal company documents, financial & legal analysis, and much more.

This short notebook demonstrate how to create a simple RAG solution using the Anthropic documentation as the source for knowledge base.

It creates up a basic RAG system using an in-memory vector database and embeddings from [Voyage AI](https://www.voyageai.com/).

## Setup

Install needed libraries, including:

1) `anthropic` - to interact with Claude

2) `voyageai` - to generate high quality embeddings

3) `pandas`, `numpy` - for data processing


You'll also need an `API key` for:
[Voyage AI](https://www.voyageai.com/) - for embeddings

Optionally, provide an `API key` for Anthropic Cloud Service. If not provided the code will use Amazon Bedrock.
For `API key` go to [Anthropic](https://www.anthropic.com/)

Note: This code will run with Claude Haiku model unless changed

In [None]:
## Install the required python packages 
!pip install anthropic
!pip install voyageai
!pip install pandas
!pip install numpy
!pip install python-dotenv

In [None]:
from dotenv import load_dotenv
import boto3
import json
import os
import anthropic

In [None]:
# This notebook requires that at a minimum you have created an account 
# with VoyageAI (embeddings service) and have set the VOYAGE_API_KEY value in the
# file that dotenv is going to read.
# See the following for details: https://pypi.org/project/python-dotenv/

# load_dotenv() loads in the key value information for the secret keys being used: 

# 1/ "VOYAGE_API_KEY"
# 2/ "ANTHROPIC_API_KEY" - If you are using the service direct from Anthropic (rather than using Amazon Bedrock)

load_dotenv()
anthropic_api_key=os.getenv("ANTHROPIC_API_KEY", None)

### Define a minimal in-Memory vector DB class

In this example, we're using an in-memory vector DB, but for a production application, you may want to use a hosted solution. 

In [None]:
import os
import pickle
import json
import numpy as np
import voyageai

class VectorDB:
    def __init__(self, name, api_key=None):
        if api_key is None:
            api_key = os.getenv("VOYAGE_API_KEY")
        self.client = voyageai.Client(api_key=api_key)
        self.name = name
        self.embeddings = []
        self.metadata = []
        self.query_cache = {}
        self.db_path = f"./data/{name}/vector_db.pkl"

    def load_data(self, data):
        if self.embeddings and self.metadata:
            print("Vector database is already loaded. Skipping data loading.")
            return
        if os.path.exists(self.db_path):
            print("Loading vector database from disk.")
            self.load_db()
            return

        texts = [f"Heading: {item['chunk_heading']}\n\n Chunk Text:{item['text']}" for item in data]
        self._embed_and_store(texts, data)
        self.save_db()
        print("Vector database loaded and saved.")

    # TODO Change this function to limit text size sent to embeddeding to a max of 256 words
    def _embed_and_store(self, texts, data):
        batch_size = 128
        result = [
            self.client.embed(
                texts[i : i + batch_size],
                model="voyage-2"
            ).embeddings
            for i in range(0, len(texts), batch_size)
        ]
        self.embeddings = [embedding for batch in result for embedding in batch]
        self.metadata = data

    def search(self, query, k=5, similarity_threshold=0.75):
        if query in self.query_cache:
            query_embedding = self.query_cache[query]
        else:
            query_embedding = self.client.embed([query], model="voyage-2").embeddings[0]
            self.query_cache[query] = query_embedding

        if not self.embeddings:
            raise ValueError("No data loaded in the vector database.")

        similarities = np.dot(self.embeddings, query_embedding)
        top_indices = np.argsort(similarities)[::-1]
        top_examples = []
        
        for idx in top_indices:
            if similarities[idx] >= similarity_threshold:
                example = {
                    "metadata": self.metadata[idx],
                    "similarity": similarities[idx],
                }
                top_examples.append(example)
                
                if len(top_examples) >= k:
                    break
        return top_examples

    def save_db(self):
        data = {
            "embeddings": self.embeddings,
            "metadata": self.metadata,
            "query_cache": json.dumps(self.query_cache),
        }
        os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
        with open(self.db_path, "wb") as file:
            pickle.dump(data, file)

    def load_db(self):
        if not os.path.exists(self.db_path):
            raise ValueError("Vector database file not found. Use load_data to create a new database.")
        with open(self.db_path, "rb") as file:
            data = pickle.load(file)
        self.embeddings = data["embeddings"]
        self.metadata = data["metadata"]
        self.query_cache = json.loads(data["query_cache"])

## Level 1 - Basic RAG

To get started, we'll set up a basic RAG pipeline using a bare bones approach. This is sometimes called 'Naive RAG' by many in the industry. A basic RAG pipeline includes the following 3 steps:

1) Chunk documents by heading - containing only the content from each subheading

2) Embed each document

3) Use Cosine similarity to retrieve documents in order to answer query

In [None]:
# Load the Anthropic documentation segments into a dictionary
with open('data/anthropic_docs.json', 'r') as f:
    anthropic_docs = json.load(f)

In [None]:
len(anthropic_docs)

In [None]:
abbreviated_docs = anthropic_docs[:10]

In [None]:
abbreviated_docs

In [None]:
# Initialize the VectorDB
db = VectorDB("anthropic_docs")
# Import the document segments into the vector database
db.load_data(abbreviated_docs)

In [None]:
len(db.embeddings)

### Define a minimal LLM Facade class

This facade makes it easy to use either AWS Bedrock or Anthropic Cloud for invoking the LLM.
If a value for the anthropic_api_key is set, then Anthropic Cloud will be used, otherwise, AWS Bedrock is used.

In [None]:
LLM_MAX_TOKENS = 2500
LLM_TEMPERATURE = 0.01
BEDROCK_MODEL_ID = 'anthropic.claude-3-haiku-20240307-v1:0'


class LlmFacade:
    def __init__(self, anthropic_api_key=None):
        self.max_tokens = LLM_MAX_TOKENS
        self.temperature = LLM_TEMPERATURE
        # Use Anthropic Claude via Anthropic Cloud if the key is set
        # if not, set up to use Anthropic Claude via Bedrock
        self.aws_bedrock = True

        if anthropic_api_key:
            self.anthropic_client = anthropic.Anthropic(api_key=anthropic_api_key)
            self.aws_bedrock = False
            print("Configured to use: Anthropic Cloud Service")
        else:
            session = boto3.Session()
            region = session.region_name

            # Set the model id to Claude Haiku
            self.bedrock_client = boto3.client(service_name='bedrock-runtime', region_name=region)
            print("Configured to use: AWS Bedrock Service")

    def invoke(self, prompt: str) -> str:
        if self.aws_bedrock == True:
            return self.invoke_aws_bedrock_llm(prompt)
        else:
            return self.invoke_anthropic_cloud_llm(prompt)

    def invoke_anthropic_cloud_llm(self, prompt: str) -> str:
        messages = [{"role": "user", "content": [{"text": prompt}]}]

        response = self.anthropic_client.messages.create(
            model="claude-3-haiku-20240307",
            max_tokens=self.max_tokens,
            messages=[{"role": "user", "content": prompt}],
            temperature=self.temperature
        )
        return response.content[0].text

    def invoke_aws_bedrock_llm(self, prompt: str) -> str:
        messages = [{"role": "user", "content": [{"text": prompt}]}]

        inference_config = {
            "temperature": self.temperature,
            "maxTokens": self.max_tokens
        }
        converse_api_params = {
            "modelId": BEDROCK_MODEL_ID,
            "messages": messages,
            "inferenceConfig": inference_config
        }
        # Send the request to the Bedrock service to generate a response
        try:
            response = self.bedrock_client.converse(**converse_api_params)

            # Extract the generated text content from the response
            text_content = response['output']['message']['content'][0]['text']

            # Return the generated text content
            return text_content

        except ClientError as err:
            message = err.response['Error']['Message']
            print(f"A client error occured: {message}")
        return("500: Request failed")

In [None]:
llm = LlmFacade(anthropic_api_key=anthropic_api_key)

In [None]:
llm.invoke("how fast does a swallow fly")

In [None]:
def retrieve_base(query, db, similarity_threshold=0.7):
    results = db.search(query, k=3, similarity_threshold=similarity_threshold)
    context = ""
    for result in results:
        chunk = result['metadata']
        context += f"\n{chunk['text']}\n"
    return results, context

def answer_query_base(query, db, llm):
    documents, context = retrieve_base(query, db)
    prompt = f"""
    You have been tasked with helping us to answer the following query: 
    <query>
    {query}
    </query>
    You have access to the following documents which are meant to provide context as you answer the query:
    <documents>
    {context}
    </documents>
    Please remain faithful to the underlying context, and only deviate from it if you are 100% sure that you know the answer already. 
    Answer the question now, and avoid providing preamble such as 'Here is the answer', etc
    """
    return llm.invoke(prompt)

In [None]:
example_question = ["i have a billing question", "what capabilities are there", "who's cat is that"]

In [None]:
i = 0
results, context = retrieve_base(example_question[i], db)
print("Question:", example_question[i])
results

In [None]:
i = 1
results, context = retrieve_base(example_question[i], db, 0.7)
print("Question:", example_question[i])
results

In [None]:
i = 2
results, context = retrieve_base(example_question[i], db, 0.7)
print("Question:", example_question[i])
results

In [None]:
i = 0
result = answer_query_base(example_question[i], db, llm)
print("Question:", example_question[i])
result

In [None]:
i = 1
result = answer_query_base(example_question[i], db, llm)
print("Question:", example_question[i])
result

In [None]:
i = 2
result = answer_query_base(example_question[i], db, llm)
print("Question:", example_question[i])
result