# Advanced RAG

We’ll use [Canopy](https://github.com/pinecone-io/canopy) to drive the embedding and querying process, and add additional processes around it. Namely, we’re going to do the following:

1. Query expansion: we’ll generate additional queries based on the query the user submits, in order to increase the chances of retrieving as many relevant documents as possible.
2. Critique: We’ll create a function which uses the LLM to produce a critique of the content we retrieve.
3. External search: We’ll create a “search tool” that our system can use in cases where the content retrieved doesn’t fall within a certain critique threshold.
4. Reranking: We’ll use the Cohere reranker to ensure the order of retrieved documents is optimal.
5. Comparing “naive” RAG to the “advanced” RAG methods.

## Install dependencies

In [None]:
!pip install -qU canopy-sdk

In [None]:
!pip install -qU langchain langchain_openai cohere==4.27 markdown google-search-results

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for google-search-results (setup.py) ... [?25l[?25hdone


Fix for Canopy on colab

In [None]:
!pip uninstall -y numpy
!pip install numpy==1.24.4

Found existing installation: numpy 1.26.4
Uninstalling numpy-1.26.4:
  Successfully uninstalled numpy-1.26.4
Collecting numpy==1.24.4
  Downloading numpy-1.24.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.3/17.3 MB[0m [31m29.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: numpy
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pandas-stubs 2.2.1.240316 requires numpy>=1.26.0; python_version < "3.13", but you have numpy 1.24.4 which is incompatible.[0m[31m
[0mSuccessfully installed numpy-1.24.4


## Credentials

In [None]:
import getpass
import os
print("Provide your Pinecone key")
os.environ["PINECONE_API_KEY"] =  getpass.getpass()
print("Provide your OpenAI API key")
os.environ["OPENAI_API_KEY"] = getpass.getpass()
print("Provide your Serp API key")
os.environ["SERPAPI_API_KEY"] = getpass.getpass()
print("Provide your Cohere API key")
os.environ["COHERE_API_KEY"] = getpass.getpass()

## Read data

In [None]:
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

data = pd.read_parquet("https://storage.googleapis.com/pinecone-datasets-dev/pinecone_docs_ada-002/raw/file1.parquet")
data.head()

Unnamed: 0,id,text,source,metadata
0,728aeea1-1dcf-5d0a-91f2-ecccd4dd4272,# Scale indexes\n\n[Suggest Edits](/edit/scali...,https://docs.pinecone.io/docs/scaling-indexes,"{'created_at': '2023_10_25', 'title': 'scaling..."
1,2f19f269-171f-5556-93f3-a2d7eabbe50f,# Understanding organizations\n\n[Suggest Edit...,https://docs.pinecone.io/docs/organizations,"{'created_at': '2023_10_25', 'title': 'organiz..."
2,b2a71cb3-5148-5090-86d5-7f4156edd7cf,# Manage datasets\n\n[Suggest Edits](/edit/dat...,https://docs.pinecone.io/docs/datasets,"{'created_at': '2023_10_25', 'title': 'datasets'}"
3,1dafe68a-2e78-57f7-a97a-93e043462196,# Architecture\n\n[Suggest Edits](/edit/archit...,https://docs.pinecone.io/docs/architecture,"{'created_at': '2023_10_25', 'title': 'archite..."
4,8b07b24d-4ec2-58a1-ac91-c8e6267b9ffd,# Moving to production\n\n[Suggest Edits](/edi...,https://docs.pinecone.io/docs/moving-to-produc...,"{'created_at': '2023_10_25', 'title': 'moving-..."


## Initialize Canopy

In [None]:
from canopy.knowledge_base import KnowledgeBase
from canopy.tokenizer import Tokenizer
Tokenizer.initialize()

INDEX_NAME = "advanced-rag"
kb = KnowledgeBase(index_name=INDEX_NAME)


Create a Canopy index, if it doesn't exist.

In [None]:
from canopy.knowledge_base import list_canopy_indexes
if not any(name.endswith(INDEX_NAME) for name in list_canopy_indexes()):
    kb.create_canopy_index()

Convert rows in `data` to an array of Canopy `Document`.

In [None]:
from canopy.models.data_models import Document
documents = [Document(**row) for _, row in data.iterrows()]

Upsert the documents to the Canopy knowledge base.

In [None]:
from tqdm.auto import tqdm
kb = KnowledgeBase(index_name=INDEX_NAME)
kb.connect()
batch_size = 10

for i in tqdm(range(0, len(documents), batch_size)):
    kb.upsert(documents[i: i+batch_size])

  0%|          | 0/6 [00:00<?, ?it/s]

Test the Canopy knowledge base.

In [None]:
from canopy.models.data_models import Query

results = kb.query([Query(text="p1 pod capacity")])

In [None]:
results

## Initialize the Context and Chat engines

In [None]:
from canopy.context_engine import ContextEngine
context_engine = ContextEngine(kb)

In [None]:
from canopy.chat_engine import ChatEngine
chat_engine = ChatEngine(context_engine)

In [None]:
from typing import Tuple
from canopy.models.data_models import Messages, UserMessage, AssistantMessage

def chat(new_message: str, history: Messages) -> Tuple[str, Messages]:
    messages = history + [UserMessage(content=new_message)]
    response = chat_engine.chat(messages)
    assistant_response = response.choices[0].message.content
    return assistant_response, messages + [AssistantMessage(content=assistant_response)]

In [None]:
import json
import ast

def str_to_json(s):
    try:
        # First, attempt to parse the string as JSON
        return json.loads(s)
    except json.JSONDecodeError:
        # If it fails, assume the string might be a Python literal
        try:
            return ast.literal_eval(s)
        except (ValueError, SyntaxError):
            # Handle the case where parsing fails for both methods
            print("Error: Input string is neither valid JSON nor a valid Python literal.")
            return None



## Evaluate with LLM
Here we call on the LLM itself to produce different types of evaluation of generated text in relation to a prompt.

In [None]:
def evaluate_with_llm(model, prompt, generated_text):
    """
    Uses a Large Language Model (LLM) to evaluate generated text.

    :param model: An instance of the LLM, ready to generate responses.
    :param prompt: The original prompt given to the system.
    :param generated_text: The text generated by the SELF-RAG system.
    :return: A dictionary containing critique scores or assessments.
    """
    evaluations = {}

    # Template for creating evaluation queries
    def create_evaluation_query(template, **kwargs):
        query = ChatPromptTemplate.from_template(template)
        chain = query | model
        return float(chain.invoke(kwargs).content)

    # Evaluate Relevance
    relevance_template = "Given the context provided by the following prompt: '{prompt}', please evaluate on a scale from 0 to 1, where 1 is highly relevant and 0 is not relevant at all, how relevant is this generated response: '{generated_text}'? Provide a numerical score only."
    evaluations['relevance'] = create_evaluation_query(relevance_template, prompt=prompt, generated_text=generated_text)

    # Evaluate Clarity
    clarity_template = "How clear and easily understandable is this text: '{generated_text}'? Rate its clarity on a scale from 0 to 1, where 1 indicates that the text is very clear and 0 indicates that the text is very unclear. Provide a numerical score only."
    evaluations['clarity'] = create_evaluation_query(clarity_template, prompt=prompt, generated_text=generated_text)

    # Evaluate Coherence
    coherence_template = "On a scale from 0 to 1, with 1 being highly coherent and 0 being not coherent at all, how well do the ideas in this generated text: '{generated_text}' flow together? Consider if the text makes logical sense as a whole. Provide a numerical score only."
    evaluations['coherence'] = create_evaluation_query(coherence_template, prompt=prompt, generated_text=generated_text)

    # Evaluate Detail and Exhaustiveness
    detail_template = "Assessing the detail and exhaustiveness relative to the prompt '{prompt}', how thoroughly does this generated text: '{generated_text}' cover the topic? Rate on a scale from 0 to 1, where 1 is very detailed and exhaustive, and 0 is not detailed at all. Provide a numerical score only."
    evaluations['details'] = create_evaluation_query(detail_template, prompt=prompt, generated_text=generated_text)

    # Evaluate Suitability as an Answer
    suitability_template = "Evaluate the suitability of this generated text: '{generated_text}' as an answer to the original prompt '{prompt}'. On a scale from 0 to 1, where 1 is a perfect answer and 0 is completely unsuitable, provide a numerical score only."
    evaluations['suitability'] = create_evaluation_query(suitability_template, prompt=prompt, generated_text=generated_text)

    return evaluations


## Critique
The critique function creates the evaluations and produces a weighted average based on configurable weights. In this way, we can change the impact each evaluation score has on the final critique score.

In [None]:
def critique(model, prompt, generated_text):
    evaluation_weights = {
        'relevance': 3,
        'clarity': 1,
        'coherence': 0.5,
        'details': 1.5,
        'suitability': 2
    }

    evaluations = evaluate_with_llm(model, prompt, generated_text)
    print("Evaluations:", evaluations)

    # Calculate the weighted sum of the evaluations
    weighted_sum = sum(evaluations[aspect] * evaluation_weights.get(aspect, 1) for aspect in evaluations)

    # Calculate the sum of weights for the aspects evaluated
    total_weight = sum(evaluation_weights.get(aspect, 1) for aspect in evaluations)

    # Calculate the weighted average of the evaluations
    weighted_average = weighted_sum / total_weight if total_weight > 0 else 0

    return [weighted_average, evaluations]


In [None]:
def is_retrieval_needed(model, prompt):
  is_retrieval_needed_prompt = ChatPromptTemplate.from_template("Given the prompt: '{prompt}', is retrieval from an external source necessary to answer the question? Reply with only True or False")
  is_retrieval_needed_chain = is_retrieval_needed_prompt | model

  return is_retrieval_needed_chain.invoke({"prompt": prompt}).content

In [None]:
def consolidate(model, text):
  consolidate_prompt = ChatPromptTemplate.from_template("Given the following set of texts, please consolidate them: '{text}'")
  consolidate_chain = consolidate_prompt | model

  return consolidate_chain.invoke({"text": text}).content

In [None]:
def compare(model, query, text1, text2):
  compare_prompt = ChatPromptTemplate.from_template("Given the following query: '{query}', score text1 and text2 between 0 and 1, to indicate which provides a better answer overall to the query. Reply with two numbers in an array, for example: [0.1, 0.9]. The sum total of the values should be 1. text1: '{text1}' \n text2: '{text2}'")
  compare_chain = compare_prompt | model

  return str_to_json(compare_chain.invoke({"query": query, "text1": text1, "text2": text2}).content)

In [None]:
def generate_queries(model,prompt, num_queries):
  query_generation_prompt = ChatPromptTemplate.from_template("Given the prompt: '{prompt}', generate {num_queries} questions that are better articulated. Return in the form of an list. For example: ['question 1', 'question 2', 'question 3']")
  query_generation_chain = query_generation_prompt | model
  return str_to_json(query_generation_chain.invoke({"prompt": prompt, "num_queries": num_queries}).content)

A helper function to get text from an array of Canopy `Document` objects.

In [None]:
from canopy.models.data_models import Query

def extract_documents_texts(results):
    # Initialize an empty list to store the extracted texts
    all_texts = []
    # Loop through each QueryResult in the results list
    for result in results:
        # Assuming result.documents is the correct way to access documents in a QueryResult
        for document in result.documents:
            # Assuming document.text is the correct way to access the text of a DocumentWithScore
            all_texts.append(document.text)
    # Return the flat list of all texts
    return all_texts



A reranking function that queries the knowledge base, then reranks the results using Cohere's `rerank-english-v2.0` model.

In [None]:
import cohere
from langchain_community.utilities import SerpAPIWrapper
co = cohere.Client(os.environ["COHERE_API_KEY"])


def get_reranked_result(query, top_n=1):
  matches = kb.query([Query(text=query)])
  docs = extract_documents_texts(matches)
  rerank_results = co.rerank(model="rerank-english-v2.0", query=query, documents=docs, top_n=top_n)
  texts = []
  for rerank_result in rerank_results:
      # Accessing the 'text' field in the document attribute of each RerankResult
      text = rerank_result.document['text']
      texts.append(text)
  return texts

The code bellow defines the `QueryProcessor` and `QueryDetail` classes which are used in the `advanced_rag_query` function. In this segment, the code

- Defines the**`QueryDetail`** class to encapsulate details about a query, including the query itself, a list to hold content responses, a score and details for critique, and flags for whether retrieval and search operations are necessary.
    - The **`__init__`** method initializes these attributes. **`query`** is required, while others like **`content`** and **`critique_details`** are set to their default values.
    - **`add_response`** method decides whether retrieval is needed based on the model's response to the query, then adjusts attributes based on critique results, including whether a search is needed.
    - **`search_and_add_results`** method is called if the critique score is low, indicating poor response quality. It performs a search and adds the results to the content list.
- Defines a **`QueryProcessor`** class to handle a list of queries using a specified model and search mechanism.
    - The **`__init__`** method sets up the processor with a model, search tool, and initializes **`QueryDetail`** objects for each query.
    - **`process_queries`** method processes each query through **`add_response`**, consolidates responses if a search was needed, and recalculates critique scores and details.
- The **`advanced_rag_query`** function creates a setup for processing queries using an unspecified model and a **`SerpAPIWrapper`** for searches. It generates an initial list of queries, processes them with **`QueryProcessor`**, and returns the processed queries.

In [None]:
from typing import List, Dict, Any, Tuple
from collections import defaultdict

class QueryDetail:
    def __init__(self, query: str):
        self.query = query
        self.content: List[str] = []
        self.critique_score: float = 0.0
        self.critique_details: Dict[str, Any] = {}
        self.retrieval_needed: bool = False
        self.search_needed: bool = False

    def add_response(self, model, search) -> None:
        """Process the query to add response, handle retrieval and critique."""
        if is_retrieval_needed(model, self.query):
            response = " ".join(get_reranked_result(self.query, top_n=3))
            self.retrieval_needed = True
        else:
            response = "Some generated answer"
            self.retrieval_needed = False

        self.content.append(response)

        critique_score, critique_details = critique(model, self.query, response)
        self.critique_score = critique_score
        self.critique_details = critique_details
        self.search_needed = critique_score < 0.5

        if self.search_needed:
            self.search_and_add_results(search)

    def search_and_add_results(self, search) -> None:
        """Perform a search and process the results if critique score is low."""
        search_result_raw = search.run(self.query)
        search_result = str_to_json(search_result_raw) or []
        self.content.extend(search_result)

class QueryProcessor:
    def __init__(self, model, search, queries: List[str]):
        self.model = model
        self.search = search
        self.queries = [QueryDetail(query) for query in queries]

    def process_queries(self) -> List[QueryDetail]:
        """Process each query in the list."""
        for query_detail in self.queries:
            query_detail.add_response(self.model, self.search)
            if query_detail.search_needed:
                consolidated_response = consolidate(self.model, query_detail.content)
                query_detail.content = [consolidated_response]
                critique_score, critique_details = critique(self.model, query_detail.query, consolidated_response)
                query_detail.critique_score = critique_score
                query_detail.critique_details = critique_details
        return self.queries

def advanced_rag_query(model, query: str, num_queries: int) -> List[QueryDetail]:
    search = SerpAPIWrapper()
    initial_queries = generate_queries(model, query, num_queries)[:num_queries]
    query_processor = QueryProcessor(model, search, initial_queries)
    processed_queries = query_processor.process_queries()
    return processed_queries


In [None]:
query = "How can I make a new Pinecone index?"

In [None]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
model = ChatOpenAI(model="gpt-4")

In [None]:
results = advanced_rag_query(model, query, 3)
combined_content = " ".join(content for result in results for content in result.content)
advanced_rag_results = consolidate(model, combined_content)

Evaluations: {'relevance': 0.875, 'clarity': 0.8, 'coherence': 0.9, 'details': 0.8, 'suitability': 0.85}
Evaluations: {'relevance': 1.0, 'clarity': 0.8, 'coherence': 0.8, 'details': 0.9, 'suitability': 0.8}
Evaluations: {'relevance': 1.0, 'clarity': 0.8, 'coherence': 0.9, 'details': 0.9, 'suitability': 0.9}


In [None]:
advanced_rag_critique = critique(model, query, advanced_rag_results)

Evaluations: {'relevance': 1.0, 'clarity': 0.85, 'coherence': 1.0, 'details': 0.8, 'suitability': 1.0}


In [None]:
history = []
rag_result, history = chat(query, history)
rag_critique = critique(model, query, rag_result)


Evaluations: {'relevance': 1.0, 'clarity': 0.8, 'coherence': 1.0, 'details': 0.9, 'suitability': 1.0}


In [None]:
from IPython.display import display, HTML
import markdown

# Convert your Markdown content and critiques to strings, assuming they might not be strings already
final_result_html = markdown.markdown(str(advanced_rag_results))
rag_result_html = markdown.markdown(str(rag_result))
advanced_rag_critique_html = markdown.markdown(str(advanced_rag_critique))
rag_critique_html = markdown.markdown(str(rag_critique))

# Construct HTML table with the pre-rendered HTML content for each cell
html = f"""
<table>
<tr>
    <th>Advanced RAG</th>
    <th>RAG</th>
</tr>
<tr>
    <td>{final_result_html}</td>
    <td>{rag_result_html}</td>
</tr>
<tr>
    <td>{advanced_rag_critique_html}</td>
    <td>{rag_critique_html}</td>
</tr>
</table>
"""

# Display the HTML table
display(HTML(html))


Advanced RAG,RAG
"To create an index in Pinecone, first download a pre-embedded dataset from the pinecone-datasets library. This allows you to skip the embedding and preprocessing steps. ```python import pinecone_datasets dataset = pinecone_datasets.load_dataset('wikipedia-simple-text-embedding-ada-002-100K') dataset.head() ``` After downloading the data, initialize your Pinecone environment and create your first index. You have the option to select a distance metric for your index. Note - By default, all fields are indexed. To avoid redundant and costly indexing, pass an additional empty metadata_config parameter. python pinecone.create_index(  name=index_name_v1,  metric='cosine',  dimension=1536,  metadata_config={“indexed”:[]} ) Before creating an index, ensure your Pinecone API key is set up. If the 'openai' index already exists, you can connect to it directly. ```python import pinecone pinecone.init(  api_key=""YOUR_API_KEY"",  environment=""YOUR_ENV"" ) if 'openai' not in pinecone.list_indexes():  pinecone.create_index('openai', dimension=len(embeds[0])) index = pinecone.Index('openai') ``` When preparing your project structure, consider creating separate projects for your development and production indexes. This allows you to test changes before deploying them to production. Ensure that you have properly configured user access to your production environment. Before moving your index to production, test that your index is returning accurate results in the context of your application. Consider identifying the appropriate metrics for evaluating your results.","To create a new Pinecone index, you can follow these general steps: Initialize a connection to Pinecone using your API key and environment. Check if the index already exists, and if not, create a new index with a specified dimension. Connect to the newly created index for further operations. You can also consider specifying additional configurations and choices like the distance metric and metadata fields during the index creation process."
"[0.94375, {'relevance': 1.0, 'clarity': 0.85, 'coherence': 1.0, 'details': 0.8, 'suitability': 1.0}]","[0.95625, {'relevance': 1.0, 'clarity': 0.8, 'coherence': 1.0, 'details': 0.9, 'suitability': 1.0}]"


In [None]:
compare(model, query, advanced_rag_results, rag_result)

[0.9, 0.1]