This project aims to build an advanced RAG pipeline that includes quality control mechanisms through relevance checking, which helps mitigate hallucination and improve answer quality.


**Indexing**


In [None]:
from typing import List, Dict, Any
from langchain_core.documents.base import Document
from langchain_community.document_loaders import PyPDFLoader
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import chromadb
from langchain_chroma import Chroma
from langchain.retrievers import EnsembleRetriever, BM25Retriever
from langchain_core.vectorstores.base import VectorStoreRetriever
from dotenv import load_dotenv
import warnings

warnings.filterwarnings("ignore", category=UserWarning)

In [2]:
# Load the PDF file
loader = PyPDFLoader("google-2023-environmental-report.pdf", mode="single")
documents: List[Document] = loader.load()

In [3]:
# Create a embedding object
load_dotenv()
embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")

# Split the documents into chunks
chunker = SemanticChunker(embeddings=embedding_function)
chunks: List[Document] = chunker.split_documents(documents)
print(f"Split PDF into {len(chunks)} chunks.\n")

Split PDF into 102 chunks.



In [4]:
# Create a Chroma vector store
chroma_client = chromadb.Client()
collection_name = "google_environmental_report"

chroma_vector_store: Chroma = Chroma.from_documents(
    documents=chunks,
    embedding=embedding_function,
    persist_directory="chroma_db",
    collection_name=collection_name,
    client=chroma_client,
)

# Create a BM25 retriever
sparse_retriever: BM25Retriever = BM25Retriever.from_documents(
    documents=chunks,
    k=5
)
# Create a Chroma retriever
dense_retriever: VectorStoreRetriever = chroma_vector_store.as_retriever(
    search_kwargs={"k": 5}
)
# Create an ensemble retriever
ensemble_retriever = EnsembleRetriever(
    retrievers=[dense_retriever, sparse_retriever],
    weights=[0.5, 0.5],
    c=0, # c=0 means no re-ranking
    k=10
)

**Retrieval and Generation**


In [5]:
from langchain import hub
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field
from langchain.output_parsers import PydanticOutputParser
from langchain_core.outputs import Generation
import json
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

In [6]:
# Load a prompt from the hub
# This prompt is used to generate a question-answering prompt for the RAG model
prompt = hub.pull("jclemens24/rag-prompt")
print(f"Loaded prompt from hub\n: {prompt}")

# Create a prompt template for the relevance check
relevance_prompt_template: PromptTemplate = PromptTemplate.from_template(
    """
    Given the following question and retrieved context, determine if the context is relevant to the question.
    Provide a score from 1 to 5, where 1 is not at all relevant and 5 is highly relevant.
    Return ONLY the numeric score, without any additional text or explanation.

    Question: {question}
    Retrieved Context: {retrieved_context}

    Relevance Score:"""
)

Loaded prompt from hub
: input_variables=['context', 'question'] input_types={} partial_variables={} metadata={'lc_hub_owner': 'jclemens24', 'lc_hub_repo': 'rag-prompt', 'lc_hub_commit_hash': '1a1f3ccb9a5a92363310e3b130843dfb2540239366ebe712ddd94982acc06734'} messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['context', 'question'], input_types={}, partial_variables={}, template="You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.\nQuestion: {question} \nContext: {context} \nAnswer:"), additional_kwargs={})]


In [7]:
# Create a Pydantic model that represents the structure of the JSON output:
class FinalOutputModel(BaseModel):
    relevance_score: float = Field(description="The relevance score of the retrieved context to the question")
    answer: str = Field(description="The final answer to the question")


# Create an instance of PydanticOutputParser using the FinalOutputModel
pydantic_parser = PydanticOutputParser(pydantic_object=FinalOutputModel)

In [8]:
# Post-processing
def format_docs(docs: list[Document]) -> str:
    """
    Format the documents for the input to the context variable.
    """
    return "\n\n".join([doc.page_content for doc in docs])


# Define a function to extract the relevance score
def extract_relevance_score(llm_output: str) -> float:
    """
    Extract the relevance score from the LLM output.
    """
    try:
        score = float(llm_output.strip())
        return score
    except ValueError:
        return 0.0
    

# Define a function to get json output using Pydantic parser
def format_json_output(x: dict) -> FinalOutputModel | None:
    json_output: dict = {
        "relevance_score": extract_relevance_score(x['relevance_score']),
        "answer": x['answer'],
    }
    # uses json.dumps to convert the json_output dict to a JSON string and 
    # creates a Generation object with the JSON string as its text. 
    # Finally, uses parse_result() to parse the Generation object and returns the custom pydantic object.
    return pydantic_parser.parse_result([Generation(text=json.dumps(json_output))])

    
# Define a function to get conditional answer with relevance check
def conditional_answer(x: dict) -> str | FinalOutputModel | None:
    """
    Given a dictionary with the keys 'question', 'retrieved_context', and 'relevance_score',
    return the answer if the relevance score is above a certain threshold.
    """
    relevance_score: float = extract_relevance_score(x['relevance_score'])
    if relevance_score < 4:
        return "I don't know due to no relevant content found."
    else:
        return format_json_output(x)

In [None]:
# Define the LLM and output parser
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.0)
str_output_parser = StrOutputParser()

# Build the RAG chain that includes relevance check and answer generation
rag_chain = (
    RunnableParallel({"context": ensemble_retriever, "question": RunnablePassthrough()})
    | RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"]))) # Update context to a string
    | RunnableParallel(
        {
            "relevance_score": (  # Relevance check
                RunnablePassthrough()
                | (
                    lambda x: relevance_prompt_template.format(
                        question=x["question"], retrieved_context=x["context"]
                    )
                )
                | llm
                | str_output_parser
            ),
            "answer": (  # Answer generation
                RunnablePassthrough()
                | prompt
                | llm
                | str_output_parser
            ),
        }
    )
    | RunnablePassthrough().assign(answer=conditional_answer) # Update answer to the custom pydantic object
)

In [10]:
# Invoke the RAG chain with a user query
user_query = "What are Google's environmental initiatives?"
result: dict[str, Any] = rag_chain.invoke(user_query)

print(f"Original Question: {user_query}\n")
print(f"Relevance Score: {result['answer'].relevance_score}") 
print(f"Final Answer:\n{result['answer'].answer}\n")
print(f"Final JSON Output:\n{result}\n")

Original Question: What are Google's environmental initiatives?

Relevance Score: 5.0
Final Answer:
Google's environmental initiatives include a comprehensive approach to sustainability that focuses on three key pillars: empowering individuals to take action, collaborating with partners and customers, and operating the business sustainably. Some specific initiatives and actions include:

1. **Employee Engagement**: Google promotes sustainability within its culture by providing employees with opportunities to engage in environmental issues, participate in sustainability courses, and work on projects like Project Sunroof.

2. **Supplier Engagement**: Google works to build a low-carbon, circular supply chain by helping suppliers improve their environmental performance and ensuring compliance with environmental standards through audits and corrective action plans.

3. **Public Policy and Advocacy**: Google supports strong public policies that enhance global climate action, aligning with th

In [11]:
user_query = "How is Google developing its LLM?"
result: dict[str, Any] = rag_chain.invoke(user_query)

print(f"Original Question: {user_query}\n")
print(f"Final JSON Output:\n{result}")

Original Question: How is Google developing its LLM?

Final JSON Output:
{'relevance_score': '1', 'answer': "I don't know due to no relevant content found."}


**UI for Testing**


In [12]:
import asyncio    
import nest_asyncio  
import gradio as gr
 
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())  # set the event loop policy to the default policy
nest_asyncio.apply()  # apply the necessary patches to enable nested event loops

In [13]:
# Define a function to handle the user input and generate the response.
def process_user_input(question: str) -> tuple:
    """Process the user input and generate the response using the RAG chain."""

    # Run the RAG chain with the user question and retrieve the answer and sources.
    result: dict[str, Any] = rag_chain.invoke(question)
    
    # Extract the answer and sources from the result.
    if isinstance(result["answer"], FinalOutputModel):
        final_answer: str = result["answer"].answer
        relevance_score: float = result["answer"].relevance_score
    else:
        final_answer: str = result["answer"]
        relevance_score: float = float(result["relevance_score"])

    return relevance_score, final_answer


#  Next set up an instance of the Gradio interface:
demo = gr.Interface(
    fn=process_user_input,  # The function to be called when the user submits input.
    inputs=gr.Textbox(
        label="Enter your question",  
        value="What are Google's environmental initiatives?" # The default value for the input component.
    ),

    outputs=[
        gr.Textbox(label="Relevance Score"),  
        gr.Textbox(label="Final Answer"),  
    ],

    title="RAG Question Answering",  # The title of the Gradio interface.
    description="Enter a question about Google's 2023 environmental report and get an answer and associated relevance score.",  
    theme="default",  # The theme for the Gradio interface.
)

In [14]:
demo.launch(share=True, debug=True)

* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: https://4bf96ef078fc55d27f.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://4bf96ef078fc55d27f.gradio.live


