Here a vector store solution will be integrated into a RAG pipeline and it will be evaluated using numerical RAG evaluation techniques incorporating LLM-as-a-Judge metrics.
 
- RAG pipeline will be numerically evaluated.

# Questions:
- Should the pipeline pass these objectives? Is the judge LLM sufficient for evaluating the pipeline? Does a particular metric even matter for the use case?
- If the vectorstore-as-a-memory component is left in our chain, will it still pass the evaluation? Additionally, is the evaluation useful for assessing vectorstore-as-a-memory performance?

### **Environment Setup:**

In [2]:
# %pip install -q langchain requests sentence-transformers gradio rich
# %pip install -q faiss ragas

## If a typing-extensions issue is encountered, restart your runtime and try again

from functools import partial
from huggingface_hub import list_models
import json
from langchain_core.embeddings.embeddings import Embeddings
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompt_values import ChatPromptValue
from langchain_core.runnables.config import RunnableConfig
import os
from pydantic import BaseModel, ConfigDict, PrivateAttr
from rich.console import Console
from rich.style import Style
import requests
from sentence_transformers import SentenceTransformer
from typing import List, Optional

console = Console()
base_style = Style(color="#76B900", bold=True)
norm_style = Style(bold=True)
pprint = partial(console.print, style=base_style)
pprint2 = partial(console.print, style=norm_style)

# Gemini API Configuration
GEMINI_API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent"
# Secret GEMINI_API_KEY environment variable to be set beforehand
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

if not GEMINI_API_KEY:
    raise ValueError("GEMINI_API_KEY environment variable not set.")
    exit()

class SentenceTransformersEmbeddings(BaseModel, Embeddings):
    """Wrapper for Gemini Embeddings."""

    model_config = ConfigDict(arbitrary_types_allowed=True)
    model: str = "Alibaba-NLP/gte-base-en-v1.5"
    embedder: SentenceTransformer = None
    
    def __init__(self, model: str = "Alibaba-NLP/gte-base-en-v1.5", trust_remote_code: Optional[bool] = False):
        """
        Initialize the SentenceTransformersEmbeddings class using a SentenceTransformer model.
        :param model: Name of the pre-trained model to load from SentenceTransformers.
        """
        super().__init__()
        self.embedder = SentenceTransformer(model, trust_remote_code=trust_remote_code)

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """
        Generate embeddings for a list of texts.
        :param texts: List of input texts.
        :return: List of embeddings (one for each text).
        """
        if not texts:
            pprint2("")
            raise ValueError("Input texts list cannot be empty.")
        return self.embedder.encode(texts, convert_to_tensor=False).tolist()

    def embed_query(self, text: str) -> List[float]:
        """
        Generate embedding for a single query.
        :param text: Input query text.
        :return: Embedding for the query as a list of floats.
        """
        if not text:
            raise ValueError("Input text cannot be empty.")
        return self.embedder.encode(text, convert_to_tensor=False).tolist()

    @classmethod
    def get_available_models(cls, search_term: str = "sentence-transformers"):
        """
        Dynamically fetch available SentenceTransformer models from Hugging Face Model Hub.
        :param search_term: Search term to filter relevant models (default: 'sentence-transformers').
        :return: List of model names.
        """
        models = list_models(author=search_term)
        return [model.modelId for model in models]

class GeminiChat(BaseChatModel):
    """Wrapper for Gemini Chat."""

    model: str = "gemini-1.5-flash"
    temperature: float = 1.0
    max_tokens: Optional[int] = None

    # Declare private attributes
    _api_url: str = PrivateAttr()
    _headers: dict = PrivateAttr()
    
    def __init__(self, model: str = "gemini-1.5-flash", temperature: float = 1.0, max_tokens: Optional[int] = None):
        super().__init__(model=model, temperature=temperature, max_tokens=max_tokens)
        
        # Initialize private attributes
        self._api_url = f"{GEMINI_API_URL}"
        self._headers = {
            "Content-Type": "application/json"
        }

    def _llm_type(self):
        return "gemini-1.5-flash"
    
    def _generate(self, prompt: ChatPromptValue, stop: Optional[List[str]] = None) -> str:
        payload = {
            "contents": [
                {
                    "parts": [{"text": prompt.messages[0].content}]
                }
            ]
        }
        if self.max_tokens is not None:  # Include max_tokens only if it's valid
            payload["max_tokens"] = self.max_tokens
        response = requests.post(
            f"{self._api_url}?key={GEMINI_API_KEY}",
            headers=self._headers,
            json=payload
        )
        if response.status_code == 200:
            try:
                return response.json()["candidates"][0]["content"]["parts"][0]["text"]
            except Exception as e:
                pprint2(f"Error parsing response JSON: {repr(e)}")
                return ""
        else:
            try:
                error_data = response.json()
                pprint2(f"Error {response.status_code}: {error_data}")
            except Exception as e:
                pprint2(f"Error {response.status_code}: {repr(e)}")
            return ""
            
    def invoke(self, prompt: str, stop: Optional[List[str]] = None) -> str:
        return self._generate(prompt, stop)

    def stream(self, input_text: ChatPromptValue, config: Optional[RunnableConfig]):
        # Simulate streaming by yielding tokens from the response
        response = self._generate(input_text)
        for token in response:
            yield token
    
    @property
    def _identifying_params(self):
        return {
            "model": self.model,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens
        }

    @classmethod
    def get_available_models(cls):
        pprint2("GeminiChat does not support listing available models via API.")
        return []

# Instantiate SentenceTransformersEmbeddings and GeminiChat
embedder = SentenceTransformersEmbeddings(model="Alibaba-NLP/gte-base-en-v1.5", trust_remote_code=True)
instruct_llm = GeminiChat(model="gemini-1.5-flash", temperature=1.0)

## 1. Pre-Release Evaluation

- **Typical Use Inspection:**

- **Edge Case Inspection:**

- **Progressive Rollout:**

## 2. LLM-as-a-Judge Formulation

- LLM simulates a range of interaction scenarios and generate synthetic data, allowing an evaluation developer to generate targeted inputs to eliciting a range of behaviors from your chatbot.

- The chatbot's correspondence/retrieval on synthetic data can be evaluated or parsed by an LLM and a consistent output format such as "Pass"/"Fail", similarity, or extraction can be enforced.

- Many such results can be aggregated and a metric can be derived which explains something like "% of passing evaluations", "average number of relevant details from the sources", "average cosine similarity", etc.

This idea of using LLMs to test out and quantify chatbot quality, known as [**"LLM-as-a-Judge,"**](https://arxiv.org/abs/2306.05685) allows for easy test specifications that align closely with human judgment and can be fine-tuned and replicated at scale.

**Popular frameworks for off-the-shelf judge formulations:**
- [**RAGAs (RAG Assessment)**](https://docs.ragas.io/en/stable/)
- [**LangChain Evaluators**](https://python.langchain.com/docs/guides/evaluation/)

### 3. RAG For Conversation History

### 3.1 Constructing Our Vector Store Retriever
**Vector Stores**, or vector storage systems, abstract away most of the low-level details of the embedding/comparison strategies and provide a simple interface to load and compare vectors.

Now we start with the [**FAISS vector store**](https://python.langchain.com/docs/integrations/vectorstores/faiss), which integrates a LangChain-compatable Embedding model with the [**FAISS (Facebook AI Similarity Search)**](https://github.com/facebookresearch/faiss) library to make the process fast and scalable on our local machine.

In [4]:
%%time
## ^^ This cell will be timed to see how long the conversation embedding takes
from langchain.vectorstores import FAISS

conversation = [  ## This conversation was generated partially by an AI system, and modified to exhibit desirable properties
    "[User]  Hello! My name is Beras, and I'm a big blue bear! Can you please tell me about the rocky mountains?",
    "[Agent] The Rocky Mountains are a beautiful and majestic range of mountains that stretch across North America",
    "[Beras] Wow, that sounds amazing! Ive never been to the Rocky Mountains before, but Ive heard many great things about them.",
    "[Agent] I hope you get to visit them someday, Beras! It would be a great adventure for you!"
    "[Beras] Thank you for the suggestion! Ill definitely keep it in mind for the future.",
    "[Agent] In the meantime, you can learn more about the Rocky Mountains by doing some research online or watching documentaries about them."
    "[Beras] I live in the arctic, so I'm not used to the warm climate there. I was just curious, ya know!",
    "[Agent] Absolutely! Lets continue the conversation and explore more about the Rocky Mountains and their significance!"
]

# Streamlined from_texts FAISS vectorstore construction from text list
convstore = FAISS.from_texts(conversation, embedding=embedder)
retriever = convstore.as_retriever()

CPU times: user 136 ms, sys: 123 ms, total: 259 ms
Wall time: 387 ms


The retriever can now be used like any other LangChain runnable to query the vector store for some relevant documents:

In [6]:
from langchain_core.runnables import RunnableLambda
from langchain.document_transformers import LongContextReorder

## Utility Runnable/Method
def RPrint(preface=""):
    """Simple passthrough "prints, then returns" chain"""
    def print_and_return(x, preface):
        if preface: print(preface, end="")
        pprint(x)
        return x
    return RunnableLambda(partial(print_and_return, preface=preface))

def docs2str(docs, title="Document"):
    """Useful utility for making chunks into context string. Optional, but useful"""
    out_str = ""
    for doc in docs:
        doc_name = getattr(doc, 'metadata', {}).get('Title', title)
        if doc_name: out_str += f"[Quote from {doc_name}] "
        out_str += getattr(doc, 'page_content', str(doc)) + "\n"
    return out_str

## Optional; Reorders longer documents to center of output text
long_reorder = RunnableLambda(LongContextReorder().transform_documents)

In [8]:
context_prompt = ChatPromptTemplate.from_template(
    "Answer the question using only the context"
    "\n\nRetrieved Context: {context}"
    "\n\nUser Question: {question}"
    "\nAnswer the user conversationally. User is not aware of context."
)

chain = (
    {
        'context': convstore.as_retriever() | long_reorder | docs2str,
        'question': (lambda x:x)
    }
    | context_prompt
    # | RPrint()
    | instruct_llm
    | StrOutputParser()
)

pprint(chain.invoke("Where does Beras live?"))

In [10]:
pprint(chain.invoke("Where are the Rocky Mountains?"))

In [12]:
pprint(chain.invoke("Where are the Rocky Mountains? Are they close to California?"))

In [13]:
pprint(chain.invoke("How far away is Beras from the Rocky Mountains?"))

### 3.2 Automatic Conversation Storage

Now we see how our vector store memory unit should function, we can perform one last integration to allow our conversation to add new entries to our conversation: a runnable that calls the `add_texts` method for us to update the store state.

In [16]:
from langchain_core.runnables import RunnableAssign
from operator import itemgetter

## Reset knowledge base and define what it means to add more messages.
convstore = FAISS.from_texts(conversation, embedding=embedder)

def save_memory_and_get_output(d, vstore):
    """Accepts 'input'/'output' dictionary and saves to convstore"""
    vstore.add_texts([f"User said {d.get('input')}", f"Agent said {d.get('output')}"])
    return d.get('output')

chat_prompt = ChatPromptTemplate.from_template(
    "Answer the question using only the context"
    "\n\nRetrieved Context: {context}"
    "\n\nUser Question: {input}"
    "\nAnswer the user conversationally. Make sure the conversation flows naturally.\n"
    "[Agent]"
)

conv_chain = (
    {
        'context': convstore.as_retriever() | long_reorder | docs2str,
        'input': (lambda x:x)
    }
    | RunnableAssign({'output' : chat_prompt | instruct_llm | StrOutputParser()})
    | partial(save_memory_and_get_output, vstore=convstore)
)
pprint(conv_chain.invoke("I'm glad you agree! I can't wait to get some ice cream there! It's such a good food!"))
print()
pprint(conv_chain.invoke("Can you guess what my favorite food is?"))
print()
pprint(conv_chain.invoke("Actually, my favorite is honey! Not sure where you got that idea?"))
print()
pprint(conv_chain.invoke("I see! Fair enough! Do you know my favorite food now?"))










## 4. RAG For Document Chunk Retrieval

The idea that data chunks can be embedded and searched through is flabbergasting. Applying RAG with documents is a double-edged sword; it may **seem** to work well out of the box but requires some extra care when optimizing it for truly reliable performance.


### 4.1 Loading And Chunking Documents

In [18]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import ArxivLoader

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000, chunk_overlap=100,
    separators=["\n\n", "\n", ".", ";", ",", " "],
)

print("Loading Documents")
docs = [
    ArxivLoader(query="1706.03762").load(),  ## Attention Is All You Need Paper
    ArxivLoader(query="1810.04805").load(),  ## BERT Paper
    ArxivLoader(query="2005.11401").load(),  ## RAG Paper
    ArxivLoader(query="2205.00445").load(),  ## MRKL Paper
    ArxivLoader(query="2310.06825").load(),  ## Mistral Paper
    ArxivLoader(query="2306.05685").load(),  ## LLM-as-a-Judge
    ## Some longer papers
    ArxivLoader(query="2210.03629").load(),  ## ReAct Paper
    ArxivLoader(query="2112.10752").load(),  ## Latent Stable Diffusion Paper
    ArxivLoader(query="2103.00020").load(),  ## CLIP Paper
]

## Cut the paper short if references is included.
## This is a standard string in papers.
for doc in docs:
    content = json.dumps(doc[0].page_content)
    if "References" in content:
        doc[0].page_content = content[:content.index("References")]

## Split the documents and also filter out stubs (overly short chunks)
print("Chunking Documents")
docs_chunks = [text_splitter.split_documents(doc) for doc in docs]
docs_chunks = [[c for c in dchunks if len(c.page_content) > 200] for dchunks in docs_chunks]

## Make some custom Chunks to give big-picture details
doc_string = "Available Documents:"
doc_metadata = []
for chunks in docs_chunks:
    metadata = getattr(chunks[0], 'metadata', {})
    doc_string += "\n - " + metadata.get('Title')
    doc_metadata += [str(metadata)]

extra_chunks = [doc_string] + doc_metadata

## Printing out some summary information for reference
pprint(doc_string, '\n')
for i, chunks in enumerate(docs_chunks):
    print(f"Document {i}")
    print(f" - # Chunks: {len(chunks)}")
    print(f" - Metadata: ")
    pprint(chunks[0].metadata)
    print()

Loading Documents
Chunking Documents


Document 0
 - # Chunks: 35
 - Metadata: 



Document 1
 - # Chunks: 45
 - Metadata: 



Document 2
 - # Chunks: 46
 - Metadata: 



Document 3
 - # Chunks: 40
 - Metadata: 



Document 4
 - # Chunks: 21
 - Metadata: 



Document 5
 - # Chunks: 44
 - Metadata: 



Document 6
 - # Chunks: 123
 - Metadata: 



Document 7
 - # Chunks: 52
 - Metadata: 



Document 8
 - # Chunks: 155
 - Metadata: 





### 4.2 Constructing Document Vector Stores

In [34]:
%%time
print("Constructing Vector Stores")
vecstores = [FAISS.from_texts(extra_chunks, embedder)]
vecstores += [FAISS.from_documents(doc_chunk, embedder) for doc_chunk in docs_chunks]

Constructing Vector Stores
CPU times: user 3.74 s, sys: 7min 18s, total: 7min 21s
Wall time: 35min 46s


Constructing Vector Stores
CPU times: user 3.74 s, sys: 7min 18s, total: 7min 21s
Wall time: 35min 46s

In [None]:
from faiss import IndexFlatL2
from langchain_community.docstore.in_memory import InMemoryDocstore

embed_dims = len(embedder.embed_query("test"))
def default_FAISS():
    '''Useful utility for making an empty FAISS vectorstore'''
    return FAISS(
        embedding_function=embedder,
        index=IndexFlatL2(embed_dims),
        docstore=InMemoryDocstore(),
        index_to_docstore_id={},
        normalize_L2=False
    )

def aggregate_vstores(vectorstores):
    ## Initialize an empty FAISS Index and merge others into it
    ## We'll use default_faiss for simplicity, though it's tied to our embedder by reference
    agg_vstore = default_FAISS()
    for vstore in vectorstores:
        agg_vstore.merge_from(vstore)
    return agg_vstore

## Unintuitive optimization; merge_from seems to optimize constituent vector stores away
docstore = aggregate_vstores(vecstores)

print(f"Constructed aggregate docstore with {len(docstore.docstore._dict)} chunks")

If you want to load pre-constructed docstore instead:

In [34]:
docstore = FAISS.load_local("docstore_index", embedder, allow_dangerous_deserialization=True)
print(f"Loaded aggregate docstore with {len(docstore.docstore._dict)} chunks")

Loaded aggregate docstore with 571 chunks


### 4.3 Implementing RAG Chain

In [36]:
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.passthrough import RunnableAssign
import gradio as gr
from operator import itemgetter

convstore = default_FAISS()

def save_memory_and_get_output(d, vstore):
    """Accepts 'input'/'output' dictionary and saves to convstore"""
    vstore.add_texts([
        f"User previously responded with {d.get('input')}",
        f"Agent previously responded with {d.get('output')}"
    ])
    return d.get('output')

initial_msg = (
    "Hello! I am a document chat agent here to help the user!"
    f" I have access to the following documents: {doc_string}\n\nHow can I help you?"
)

chat_prompt = ChatPromptTemplate.from_template("You are a document chatbot. Help the user as they ask questions about documents."
    " User messaged just asked: {input}\n\n"
    " From this, we have retrieved the following potentially-useful info: "
    " Conversation History Retrieval:\n{history}\n\n"
    " Document Retrieval:\n{context}\n\n"
    " (Answer only from retrieval. Only cite sources that are used. Make your response conversational.)"
)

stream_chain = chat_prompt | instruct_llm | StrOutputParser()

## Implementing the retrieval chain to make the system work

retrieval_chain = (
    {'input' : (lambda x: x)}
    ## Retrieve history & context from convstore & docstore, respectively.
    ## Our solution uses RunnableAssign, itemgetter, long_reorder, and docs2str
    | RunnableAssign({'history': itemgetter('input') | convstore.as_retriever() | long_reorder | docs2str})
    | RunnableAssign({'context': itemgetter('input') | docstore.as_retriever()  | long_reorder | docs2str})
)

def chat_gen(message, history=[], return_buffer=True):
    buffer = ""
    ## First perform the retrieval based on the input message
    retrieval = retrieval_chain.invoke(message)
    line_buffer = ""

    ## Then, stream the results of the stream_chain
    for token in stream_chain.stream(retrieval):
        buffer += token
        ## If you're using standard print, keep line from getting too long
        yield buffer if return_buffer else token

    ## Lastly, save the chat exchange to the conversation memory buffer
    save_memory_and_get_output({'input':  message, 'output': buffer}, convstore)


## Start of Agent Event Loop
test_question = "Tell me about RAG!"

## Before you launch your gradio interface, make sure your thing works
for response in chat_gen(test_question, return_buffer=False):
    print(response, end='')

RAG, or Retrieval-Augmented Generation, is a method for answering questions using both a retriever and a generator.  Unlike some other approaches, it achieves state-of-the-art performance without needing expensive pre-training or extra components like a re-ranker or extractive reader [Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks].  It's advantageous because it can generate answers even if the exact answer isn't verbatim in the retrieved documents; it can synthesize information from multiple documents to create a correct response, something extractive methods can't do [Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks]. In fact, RAG can even generate correct answers even when the answer isn't in *any* of the retrieved documents [Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks].  There are two main variations: RAG-Token and RAG-Sequence, with the latter requiring a different decoding approach due to its likelihood calculation [Retrieval

### 4.4 Gradio Chatbot

In [42]:
mssg = {"role": "user", "content": initial_msg}
chatbot = gr.Chatbot(value=[mssg], type='messages')
demo = gr.ChatInterface(chat_gen, chatbot=chatbot, type='messages').queue()

try:
    demo.launch(debug=True, share=True, show_api=False)
    demo.close()
except Exception as e:
    demo.close()
    print(e)
    raise e

* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: https://b8adb8c80dc10192d6.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://b8adb8c80dc10192d6.gradio.live
Closing server running on port: 7860


### 4.5 Saving The Index

In [40]:
## Save and compress the index
docstore.save_local("docstore_index")
!tar czvf docstore_index.tgz docstore_index

# !rm -rf docstore_index

a docstore_index
a docstore_index/index.faiss
a docstore_index/index.pkl


### 4.6 Testing The Index

In [133]:
os.environ["TOKENIZERS_PARALLELISM"] = "False"
!tar xzvf docstore_index.tgz
new_db = FAISS.load_local("docstore_index", embedder, allow_dangerous_deserialization=True)
testdocs = new_db.similarity_search("Testing")
print(testdocs[0].page_content[:1000])

x docstore_index/
x docstore_index/index.faiss
x docstore_index/index.pkl
.\n\u2022 Better characterizing biases in models, alerting other\nresearchers to areas of concern and areas for interven-\ntions.\n\u2022 Creating suites of tests to evaluate systems like CLIP\non, so we can better characterize model capabilities\nearlier in the development cycle.\n\u2022 Identifying potential failure modes and areas for further\nwork.\nWe plan to contribute to this work, and hope this analysis\nprovides some motivating examples for subsequent research.\n8. Related Work\nAny model that leverages written, spoken, signed or any\nother form of human language as part of its training signal\nis arguably using natural language as a source of supervi-\nsion. This is an admittedly extremely broad area and covers\nmost work in the \ufb01eld of distributional semantics including\ntopic models (Blei et al., 2003), word, sentence, and para-\ngraph vectors (Mikolov et al., 2013; Kiros et al., 2015; Le &\nMiko

## 5. Pairwise Evaluator

The following is a custom implementation of a simplified [LangChain Pairwise String Evaluator](https://python.langchain.com/docs/guides/evaluation/examples/comparisons). 

**Preparing for RAG chain evaluation, we need to:**

- Pull in the document index.
- Recreate the RAG pipeline of choice.

**Implementing a judge formulation as follows:**

- Sample RAG agent document pool to find two document chunks.
- Use those two document chunks to generate a synthetic "baseline" question-answer pair.
- Use RAG agent to generate its own answer.
- Use a judge LLM to compare the two responses while grounding the synthetic generation as "ground-truth correct."

The chain should be a simple but powerful process that that tests if the RAG chain outperforms a narrow chatbot with limited document access.

### 5.1. Pulling In Document Retrieval Index

In [139]:
## Make sure docstore_index.tgz is in working directory
from langchain_community.vectorstores import FAISS

# embedder = SentenceTransformersEmbeddings(model="Alibaba-NLP/gte-base-en-v1.5")

!tar xzvf docstore_index.tgz
docstore = FAISS.load_local("docstore_index", embedder, allow_dangerous_deserialization=True)
docs = list(docstore.docstore._dict.values())

def format_chunk(doc):
    return (
        f"Paper: {doc.metadata.get('Title', 'unknown')}"
        f"\n\nSummary: {doc.metadata.get('Summary', 'unknown')}"
        f"\n\nPage Body: {doc.page_content.replace("\n", " ")}"
    )

## This printout just confirms that the store has been retrieved
pprint(f"Constructed aggregate docstore with {len(docstore.docstore._dict)} chunks")
pprint(f"Sample Chunk:")
print(format_chunk(docs[len(docs)//2]))

x docstore_index/
x docstore_index/index.faiss
x docstore_index/index.pkl


Paper: ReAct: Synergizing Reasoning and Acting in Language Models

Summary: While large language models (LLMs) have demonstrated impressive capabilities
across tasks in language understanding and interactive decision making, their
abilities for reasoning (e.g. chain-of-thought prompting) and acting (e.g.
action plan generation) have primarily been studied as separate topics. In this
paper, we explore the use of LLMs to generate both reasoning traces and
task-specific actions in an interleaved manner, allowing for greater synergy
between the two: reasoning traces help the model induce, track, and update
action plans as well as handle exceptions, while actions allow it to interface
with external sources, such as knowledge bases or environments, to gather
additional information. We apply our approach, named ReAct, to a diverse set of
language and decision making tasks and demonstrate its effectiveness over
state-of-the-art baselines, as well as improved human interpretability and
trustwor

### 5.2. Pulling In RAG Chain 

In [217]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableBranch
from operator import itemgetter

embedder = SentenceTransformersEmbeddings(model="Alibaba-NLP/gte-base-en-v1.5", trust_remote_code=True)

instruct_llm = GeminiChat(model="gemini-1.5-flash")

llm = instruct_llm | StrOutputParser()

chat_prompt = ChatPromptTemplate.from_template(
    "{input}"
    " Use the format:\n\nAnswer: (answer)\n\n"
    "Document Retrieval:\n{context}\n\n"
    "Question Again: {input}"
)

def output_puller(inputs):
    """"Output generator. Useful if your chain returns a dictionary with key 'output'"""
    if isinstance(inputs, dict):
        inputs = [inputs]
    for token in inputs:
        if token.get('output'):
            yield token.get('output')

## Chain 1 Specs: "Hello World" -> retrieval_chain 
long_reorder = RunnableLambda(LongContextReorder().transform_documents)
context_getter = itemgetter('input') | docstore.as_retriever() | long_reorder | docs2str
retrieval_chain = {'input': (lambda x: x)} | RunnableAssign({'context': context_getter})

## Chain 2 Specs: retrieval_chain -> generator_chain 
generator_chain = RunnableLambda(lambda x: RunnableAssign({'formatted_input': chat_prompt.invoke})) | RunnableLambda(lambda x: RunnableAssign({'output': RunnableLambda(lambda y: llm.invoke(y['formatted_input']))}))

rag_chain = retrieval_chain | generator_chain

## pprint(rag_chain.invoke("Tell me something interesting!"))
for token in [rag_chain.stream("Tell me something interesting!")]:
    for t in RunnableLambda(output_puller).invoke(token):
        print(t, end="")

Answer:  Researchers are developing a new type of question-answering task using Jeopardy! clues as a benchmark.  Because Jeopardy! questions require concise, factual statements that cleverly conceal an answer, generating them is a challenging test of a large language model's knowledge and generation capabilities.


### 5.3. Generating Synthetic Question-Answer Pairs

Now we implement the first few part of our evaluation routine:

- **Sample RAG agent document pool to find two document chunks.**
- **Use those two document chunks to generate a synthetic "baseline" question-answer pair.**
- Use RAG agent to generate its own answer.
- Use a judge LLM to compare the two responses while grounding the synthetic generation as "ground-truth correct."

The chain should be a simple but powerful process that tests if the RAG chain outperforms a narrow chatbot with limited document access.

In [231]:
import random

num_questions = 3
synth_questions = []
synth_answers = []

simple_prompt1 = ChatPromptTemplate.from_template('{system}\n\n{input}')

for i in range(num_questions):
    doc1 = random.sample(docs, 1)[0]
    sys_msg = (
        "Use the documents provided to generate an interesting question."
        " Rely more on the document bodies than the summary."
        " Use the format:\nQuestion: (good question, 1 sentence, detailed)"
    )
    usr_msg = (
        f"Document: {format_chunk(doc1)}"
    )

    q = (simple_prompt1 | llm).invoke({'system': sys_msg, 'input': usr_msg})
    synth_questions += [q]
    pprint2(f"Q {i+1}")
    pprint2(synth_questions[-1])
    print()










In [233]:
synth_answers = []

simple_prompt2 = ChatPromptTemplate.from_template('{input} {system}')

for i in range(num_questions):
    sys_msg = (
        " Use the format: \"Answer: (answer)\""
    )
    a = (simple_prompt2 | llm).invoke({'system': sys_msg, 'input': synth_questions[i]})
    synth_answers += [a]
    pprint2(f"A {i+1}")
    pprint(synth_answers[-1])
    print()










### 5.4. Answer The Synthetic Questions

Now we implement the third part of our evaluation routine:

- Sample RAG agent document pool to find two document chunks.
- Use those two document chunks to generate a synthetic "baseline" question-answer pair.
- **Use RAG agent to generate its own answer.**
- Use a judge LLM to compare the two responses while grounding the synthetic generation as "ground-truth correct."

The chain should be a simple but powerful process that tests if the RAG chain outperforms a narrow chatbot with limited document access.

In [235]:
rag_answers = []
contexts = []
for i, q in enumerate(synth_questions):
    outp = rag_chain.invoke(q)
    rag_answer = outp.get('output', '')
    rag_answers += [rag_answer]
    context = outp.get('context', '')
    contexts += [context] 
    pprint2(f"Q {i+1}\n", q, "", sep="\n")
    pprint(f"RAG Answer: \n\n{rag_answer}", "", sep='\n')

### 5.5. Implementing Human Preference Metric

Now we implement the fourth part of our evaluation routine:

- Sample RAG agent document pool to find two document chunks.
- Use those two document chunks to generate synthetic "baseline" question-answer pair.
- Use RAG agent to generate its own answer.
- **Use a judge LLM to compare the two responses while grounding the synthetic generation as "ground-truth correct."**

The chain should be a simple but powerful process that tests if the RAG chain outperforms a narrow chatbot with limited document access.

In [252]:
from ragas import evaluate, EvaluationDataset, SingleTurnSample
from ragas.embeddings import LangchainEmbeddingsWrapper
from ragas.metrics import LLMContextRecall, Faithfulness, FactualCorrectness, SemanticSimilarity
from ragas.llms import LangchainLLMWrapper

eval_prompt = ChatPromptTemplate.from_template("""INSTRUCTION: 
Evaluate the following Question-Answer pair for accuracy & precision of detail.
[1] First answer is more detailed, accurate and precise.
[2] Second answer is more detailed, accurate and precise.

Output Format:
[Score] Justification

{qa_trio}

EVALUATION: 
""")

pref_score = []
trio_gen = zip(synth_questions, synth_answers, rag_answers)

# For RAGAs scores
samples = [
    SingleTurnSample(user_input=q, retrieved_contexts=[c], response=a_rag, reference=a_synth)
    for q, c, a_rag, a_synth in zip(synth_questions, contexts, rag_answers, synth_answers)
]

dataset = EvaluationDataset(samples)

# evaluator_llm = LangchainLLMWrapper(instruct_llm)
# evaluator_embeddings = LangchainEmbeddingsWrapper(embedder)

# metrics = [LLMContextRecall(llm=evaluator_llm), FactualCorrectness(llm=evaluator_llm), Faithfulness(llm=evaluator_llm), SemanticSimilarity(embeddings=evaluator_embeddings)]
# results = evaluate(dataset=dataset, metrics=metrics)

for i, (q, a_synth, a_rag) in enumerate(trio_gen):
    pprint2(f"Set {i+1}\n\nQuestion: {q}\n\n")
    qa_trio = f"Question: {q}\n\nAnswer 1: {a_rag}\n\n Answer 2: {a_synth}"
    pref_score += [(eval_prompt | llm).invoke({'qa_trio': qa_trio})]
    pprint(f"RAG Answer: {a_rag}\n\n")
    pprint(f"Synth Answer: {a_synth}\n\n")
    pprint2(f"Synth Evaluation: {pref_score[-1]}\n\n")
# pprint2(f"RAGAs Scores: {result.scores.samples.todict()}\n\n")

We now have an LLM system that reasons about our pipeline and tries to evaluate it. Now we have some judge results, we aggregate the results and see how often our formulation was correct according to an LLM:

In [254]:
pref_score = sum(("[1]" in score) for score in pref_score) / len(pref_score)
print(f"Preference Score: {pref_score}")

Preference Score: 1.0
