# Corrective RAG

In many real-world applications, users expect question-answering systems to provide relevant, accurate, and up-to-date answers — even if the system’s local knowledge base (e.g., PDFs or internal documents) doesn’t have a direct answer.

Standard RAG systems typically retrieve information based on similarity search and pass it to a language model. However, this approach can fail when retrieved content is irrelevant, ambiguous, or incomplete.

This notebook implements a corrective RAG (CRAG) system — an enhanced, adaptive RAG pipeline that addresses such limitations by dynamically correcting itself. When local retrieval fails, it evaluates the retrieval quality, decides whether to trust it or fall back to web search, and refines both sources to synthesize a high-quality final answer. This enables CRAG to make intelligent decisions about how to retrieve and trust information. The goal is to enhance response quality and reliability, especially when internal document knowledge is insufficient or outdated.

In [1]:
import os
from dotenv import load_dotenv
from langchain.prompts import PromptTemplate
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.document_loaders import PyPDFLoader
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.tools import DuckDuckGoSearchResults
from typing import List, Tuple
import json

# Load environment variables from a .env file
load_dotenv()

# Set the OpenAI API key environment variable
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')

We use `DuckDuckGoSearchResults` as a fallback search utility for up-to-date online knowledge

#### Load and index the document into a vector store
To answer questions based on a document, we first need to turn it into a format our language model can search. Language models on their own don’t "know" the content of a PDF unless we explicitly make it available in a structured way. That is where embedding and vector search come in.

This process builds a memory-like structure from our document that can be semantically searched.

In [2]:
# Define the path to the PDF document we want to index
path = "Understanding_Climate_Change.pdf"

def encode_pdf(path, chunk_size=1000, chunk_overlap=200):
    """
    Encodes a PDF book into a vector store using OpenAI embeddings.

    Args:
        path: The path to the PDF file.
        chunk_size: The desired size of each text chunk.
        chunk_overlap: The amount of overlap between consecutive chunks.

    Returns:
        A FAISS vector store containing the encoded book content.
    """

    # Load and parse the PDF document into a list of text objects (usually one per page)
    loader = PyPDFLoader(path)
    documents = loader.load()  # Extracts text from each page

    # Split the document into smaller chunks, with overlap to maintain semantic continuity
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,  # Each chunk will be around 1000 characters
        chunk_overlap=chunk_overlap,  # Overlap 200 characters with the previous chunk
        length_function=len  # Use raw character count to determine length
    )
    texts = text_splitter.split_documents(documents)
    # Replace tab characters with spaces
    for doc in texts:
        doc.page_content = doc.page_content.replace('\t', ' ')  # Replace tabs with spaces

    # Convert text chunks into dense vectors using OpenAI's embedding model
    embeddings = OpenAIEmbeddings()

    # Store the resulting embeddings into a FAISS index for fast similarity search
    vectorstore = FAISS.from_documents(texts, embeddings)

    return vectorstore

# Build the vector store from the PDF
vectorstore = encode_pdf(path)

In this step, we:
- Load the PDF into memory as raw text
- Chunk the text into overlapping segments so that each chunk preserves context from its neighbors (this helps avoid cutting off important ideas mid-sentence)
- Embed each chunk using OpenAI's embeddings model, converting text into numerical vectors
- Store these vectors in a FAISS vector store, which allows for fast similarity searches based on user queries later

So later, when a user asks a question, the system can search semantically — even if the question uses different wording than the document — and fetch the most relevant chunks of text from the original PDF.

#### Initialize the language model
We will use a lightweight OpenAI model to handle reasoning, classification, and generation throughout the pipeline.

In [3]:
llm = ChatOpenAI(model="gpt-4o-mini-2024-07-18", max_tokens=1000, temperature=0)

This model will be used for multiple sub-tasks: relevance scoring, query rewriting, knowledge refinement, and final response generation. It is configured to minimize variability (`temperature = 0`) and to keep responses concise and safe from excessive verbosity (via `max_tokens`).

#### Initialize the web search tool
We will use DuckDuckGo, a privacy-preserving search engine, for external search when internal sources are weak.

In [4]:
search = DuckDuckGoSearchResults(backend="html")

This initializes a `DuckDuckGoSearchResults` tool from LangChain, which can be used inside chains to execute real-time web searches. It performs a backend HTTP request to DuckDuckGo’s search engine with our query and returns a list of the top results — including snippets, titles, and URLs.

DuckDuckGo is chosen because it doesn't track users, doesn't require API keys, and provides fast, general-purpose search results by accessing to publicly available information. It is particularly useful in corrective RAG when we need to “correct” for poor document retrieval by pulling in additional knowledge from the web.

Now, we have a reasoning engine (`llm`) and an external knowledge source (`search`) to rely on when document-based info is insufficient. The search object acts as a callable tool later in the pipeline.

### Define core logic chains
The strength of Corrective RAG lies in its dynamic decision-making — the ability to assess whether retrieved content is good enough, and to intelligently refine or seek external knowledge when it isn’t. This step defines three LLM-powered utility chains that enable that reasoning:
- Relevance evaluator — Decides how useful a document is for a given question.
- Knowledge refiner — Extracts structured, bullet-pointed insights from dense text.
- Query rewriter — Optimizes user questions into web-search-friendly format.

Let’s implement each of these.

#### Relevance evaluator – Semantic scoring chain
This function determines how closely a retrieved chunk of text matches a user query. Instead of relying on raw vector similarity (which can be noisy), we ask the language model to judge the match using its own understanding of meaning.

In [5]:
# Retrieval Evaluator

# Define input schema for structured relevance evaluation output
class RetrievalEvaluatorInput(BaseModel):
    relevance_score: float = Field(..., description="The relevance score of the document to the query. the score should be between 0 and 1.")

# Use the language model to assign a relevance score between 0 and 1 for a document-query pair
def retrieval_evaluator(query: str, document: str) -> float:
    # Define a prompt to ask the model for a relevance judgment
    prompt = PromptTemplate(
        input_variables=["query", "document"],
        template="On a scale from 0 to 1, how relevant is the following document to the query? Query: {query}\nDocument: {document}\nRelevance score:"
    )
    # Combine the prompt with the model and enforce the output schema
    chain = prompt | llm.with_structured_output(RetrievalEvaluatorInput)
    # Provide inputs and retrieve the model's structured output
    input_variables = {"query": query, "document": document}
    result = chain.invoke(input_variables).relevance_score
    return result

This function wraps a prompt inside a structured output chain, where the language model is asked to produce a numerical score (between 0 and 1) that quantifies how relevant the document is to the query. It is a more nuanced approach than raw vector cosine similarity, because it factors in true semantic alignment — helping us decide intelligently whether to trust the content or seek corrections.


#### Knowledge refiner – Bullet-point summarization chain
When we retrieve a long paragraph, it is often noisy or verbose. This module extracts just the essentials in bullet point form — clean, digestible, and ready for reasoning or inclusion in a final answer.

In [6]:
# Knowledge Refinement

# Define output schema for refined key points
class KnowledgeRefinementInput(BaseModel):
    key_points: str = Field(..., description="The document to extract key information from.")

# Extract key bullet points from a given document
def knowledge_refinement(document: str) -> List[str]:
    # Prompt the LLM to summarize key insights as bullet points
    prompt = PromptTemplate(
        input_variables=["document"],
        template="Extract the key information from the following document in bullet points:\n{document}\nKey points:"
    )
    # Create a chain that outputs the structured key points
    chain = prompt | llm.with_structured_output(KnowledgeRefinementInput)
    # Get structured key points from model output
    input_variables = {"document": document}
    result = chain.invoke(input_variables).key_points
    # Return list of cleaned, non-empty bullet lines
    return [point.strip() for point in result.split('\n') if point.strip()]

Here, we build a prompt chain that forces the model to return a summarized set of key takeaways. The output is post-processed into a clean list by stripping empty lines or whitespace. This ensures retrieved knowledge — whether from internal docs or web results — is concise and actionable for downstream use.


#### Query rewriter – Web search optimizer chain
Often, natural user questions don’t translate well to search engine queries (they are too specific, vague, or conversational). This chain rewrites them into search-optimized phrases — improving the odds of getting good hits when we fall back to web search.

In [7]:
# Web Search Query Rewriter

# Define output schema for rewritten query
class QueryRewriterInput(BaseModel):
    query: str = Field(..., description="The query to rewrite.")

# Rewrite a user query to make it better suited for search engines
def rewrite_query(query: str) -> str:
    # Prompt to rephrase query for improved search performance
    prompt = PromptTemplate(
        input_variables=["query"],
        template="Rewrite the following query to make it more suitable for a web search:\n{query}\nRewritten query:"
    )
    # Chain model output to structured query result
    chain = prompt | llm.with_structured_output(QueryRewriterInput)
    # Call model and extract the rewritten query
    input_variables = {"query": query}
    return chain.invoke(input_variables).query.strip()

This function helps the model take a natural language question and reshape it into something more aligned with how people write queries for search engines — usually shorter, noun-based, and keyword-rich. By using structured output again, we ensure that the LLM returns exactly what we need: a single rewritten string, reliably.


### Helper function to parse search results - — Clean titles & links from search output
Once we run a DuckDuckGo web search (especially after rewriting a vague or incomplete query), we get back a blob of structured JSON. But we don’t need the whole thing — just clean titles and links that we can display, reference, or use in follow-up LLM steps.

Let’s write a helper that extracts those essentials.

In [8]:
# Parse a JSON-formatted string of search results into a list of (title, link) tuples
def parse_search_results(results_string: str) -> List[Tuple[str, str]]:
    """
    Parse a JSON string of search results into a list of title-link tuples.

    Args:
        results_string (str): A JSON-formatted string containing search results.

    Returns:
        List[Tuple[str, str]]: A list of tuples, where each tuple contains the title and link of a search result.
                               If parsing fails, an empty list is returned.
    """
    try:
        # Load the JSON string into a Python list of result dicts
        results = json.loads(results_string)
        # For each result dict, extract a title and a link; fallback if missing
        return [(result.get('title', 'Untitled'), result.get('link', '')) for result in results]
    except json.JSONDecodeError:
        # Handle JSON decoding errors by returning an empty list
        print("Error parsing search results. Returning empty list.")
        return []

After a DuckDuckGo search is triggered, we get back a JSON string — each item being a small dictionary with fields like "title", "snippet", "link", and so on. This function is focused purely on distilling those into minimal (title, link) pairs. If the string is malformed or empty, it returns an empty list and logs the issue, preventing our pipeline from crashing downstream.

### Sub-functions for the CRAG process
The following utility functions represent the core logic that glues together our corrective RAG system — from document retrieval to response generation. These modular subroutines keep the pipeline clean, interpretable, and adaptable to different retrieval or generation strategies.

#### Document retrieval from FAISS
We start with pulling relevant documents from the vector store (FAISS) using semantic similarity.



In [9]:
# Retrieve top-k documents from a FAISS vector index based on the input query
def retrieve_documents(query: str, faiss_index: FAISS, k: int = 3) -> List[str]:
    """
    Retrieve documents based on a query using a FAISS index.

    Args:
        query (str): The query string to search for.
        faiss_index (FAISS): The FAISS index used for similarity search.
        k (int): The number of top documents to retrieve. Defaults to 3.

    Returns:
        List[str]: A list of the retrieved document contents.
    """
    # Perform similarity search using vector distance
    docs = faiss_index.similarity_search(query, k=k)
    # Return only the content (not metadata or embeddings)
    return [doc.page_content for doc in docs]

This function connects to the FAISS index to retrieve the most semantically similar documents for a user query. It uses approximate nearest neighbor (ANN) search under the hood — matching the vectorized query to the top `k` most relevant embedded chunks. This is our internal memory source.

#### Relevance scoring
Once we have documents, we still need to evaluate how well they match the query.

In [10]:
# Evaluate each document's relevance to the given query using LLM-based semantic scoring
def evaluate_documents(query: str, documents: List[str]) -> List[float]:
    """
    Evaluate the relevance of documents based on a query.

    Args:
        query (str): The query string.
        documents (List[str]): A list of document contents to evaluate.

    Returns:
        List[float]: A list of relevance scores for each document.
    """
    # Run the LLM-based scoring function for each document
    return [retrieval_evaluator(query, doc) for doc in documents]

Rather than relying solely on vector distance (which can be noisy), this function leverages the LLM to assign a 0–1 relevance score for each document. It is essentially a second-pass semantic filter — helpful in surfacing more contextually aligned sources.

#### Web search with rewriting and knowledge extraction
When local documents are not sufficient, this function performs an augmented web search — rewrites the query, runs the search, refines the output, and collects sources.

In [11]:
# Run a full web search pipeline — rewrite, search, extract knowledge, and parse sources
def perform_web_search(query: str) -> Tuple[List[str], List[Tuple[str, str]]]:
    """
    Perform a web search based on a query.

    Args:
        query (str): The query string to search for.

    Returns:
        Tuple[List[str], List[Tuple[str, str]]]:
            - A list of refined knowledge obtained from the web search.
            - A list of tuples containing titles and links of the sources.
    """
    # Rewrite user query to better suit web search engine expectations
    rewritten_query = rewrite_query(query)
    # Run the rewritten query through DuckDuckGo
    web_results = search.run(rewritten_query)
    # Summarize web results into bullet-pointed insights
    web_knowledge = knowledge_refinement(web_results)
    # Parse result metadata (titles and URLs) for attribution
    sources = parse_search_results(web_results)
    return web_knowledge, sources

This full pipeline upgrades vague or domain-specific user queries into web-searchable prompts, scrapes live DuckDuckGo results, summarizes them, and extracts source links. This is our “external brain” when the vector store lacks sufficient context.

#### Final response generation with source attribution
Once we have usable knowledge (from local or web), we generate a natural-language response, optionally including sources.

In [12]:
# Use the LLM to generate a complete answer using knowledge and source metadata
def generate_response(query: str, knowledge: str, sources: List[Tuple[str, str]]) -> str:
    """
    Generate a response to a query using knowledge and sources.

    Args:
        query (str): The query string.
        knowledge (str): The refined knowledge to use in the response.
        sources (List[Tuple[str, str]]): A list of tuples containing titles and links of the sources.

    Returns:
        str: The generated response.
    """
    response_prompt = PromptTemplate(
        input_variables=["query", "knowledge", "sources"],
        template="Based on the following knowledge, answer the query. Include the sources with their links (if available) at the end of your answer:\nQuery: {query}\nKnowledge: {knowledge}\nSources: {sources}\nAnswer:"
    )
    # Format source strings (title: link) for display
    input_variables = {
        "query": query,
        "knowledge": knowledge,
        "sources": "\n".join([f"{title}: {link}" if link else title for title, link in sources])
    }
    # Use LLM to synthesize final answer
    response_chain = response_prompt | llm
    return response_chain.invoke(input_variables).content

It gives the LLM everything it needs: the refined knowledge and the query itself, along with structured attribution info (titles + links). The prompt encourages the model to cite its sources — a step toward responsible AI output.

### CRAG process
Finally, we define the full orchestration pipeline that performs dynamic correction based on document relevance. This is the decision-making brain of the Corrective RAG system. It routes user queries through the right path — deciding whether to trust retrieved content, fall back to the web, or intelligently merge both. This approach adds resilience and robustness to our RAG workflow.


In [13]:
# Full corrective RAG pipeline
def crag_process(query: str, faiss_index: FAISS) -> str:
    """
    Process a query by retrieving, evaluating, and using documents or performing a web search to generate a response.

    Args:
        query (str): The query string to process.
        faiss_index (FAISS): The FAISS index used for document retrieval.

    Returns:
        str: The generated response based on the query.
    """
    print(f"\nProcessing query: {query}")

    # Step 1: Try to retrieve relevant documents from local FAISS index
    retrieved_docs = retrieve_documents(query, faiss_index)

    # Step 2: Evaluate how semantically aligned each document is
    eval_scores = evaluate_documents(query, retrieved_docs)

    print(f"\nRetrieved {len(retrieved_docs)} documents")
    print(f"Evaluation scores: {eval_scores}")

    # Step 3: Decide which action to use (threshold-based logic) based on evaluation scores
    max_score = max(eval_scores)
    sources = []

    if max_score > 0.7:
        # Confident in internal content — no correction needed
        print("\nAction: Correct - Using retrieved document")
        best_doc = retrieved_docs[eval_scores.index(max_score)]
        final_knowledge = best_doc
        sources.append(("Retrieved document", ""))
    elif max_score < 0.3:
        # Retrieved data is weak — fallback to live web search
        print("\nAction: Incorrect - Performing web search")
        final_knowledge, sources = perform_web_search(query)
    else:
        # Ambiguous case — fuse the best retrieved chunk with external knowledge
        print("\nAction: Ambiguous - Combining retrieved document and web search")
        best_doc = retrieved_docs[eval_scores.index(max_score)]
        # Summarize the best local document
        retrieved_knowledge = knowledge_refinement(best_doc)
        # Get web knowledge and sources
        web_knowledge, web_sources = perform_web_search(query)
        # Combine both sets of knowledge
        final_knowledge = "\n".join(retrieved_knowledge + web_knowledge)
        sources = [("Retrieved document", "")] + web_sources

    print("\nFinal knowledge:")
    print(final_knowledge)

    print("\nSources:")
    for title, link in sources:
        print(f"{title}: {link}" if link else title)

    # Step 4: Use LLM to generate final answer based on knowledge and sources
    print("\nGenerating response...")
    response = generate_response(query, final_knowledge, sources)

    print("\nResponse generated")
    return response

This function orchestrates the full decision tree for corrective generation. First, it checks whether the internal FAISS-based documents are good enough. If they are (based on an LLM-assigned score > 0.7), it proceeds directly. If they are clearly poor (< 0.3), it skips the internal data entirely and runs a full web search. In edge cases (scores between 0.3–0.7), it merges insights from both — a kind of hybrid mode that reduces hallucination and improves relevance.

Each path results in structured, cleaned-up knowledge and a source list, which is then passed into the generation model. This means our response is always grounded either in a trusted internal corpus or in fresh, attributed information from the web.


### Example: Query with high Rrelevance to internal documents
This test case demonstrates how the system handles a well-covered topic that is already included in the internal knowledge base.

In [14]:
# A factual question well-aligned with internal corpus content
query = "What are the main causes of climate change?"

# Process the query using the CRAG pipeline
result = crag_process(query, vectorstore)
print(f"Query: {query}")
print(f"Answer: {result}")


Processing query: What are the main causes of climate change?

Retrieved 3 documents
Evaluation scores: [0.9, 0.9, 0.7]

Action: Correct - Using retrieved document

Final knowledge:
Chapter 2: Causes of Climate Change 
Greenhouse Gases 
The primary cause of recent climate change is the increase in greenhouse gases in the 
atmosphere. Greenhouse gases, such as carbon dioxide (CO2), methane (CH4), and nitrous 
oxide (N2O), trap heat from the sun, creating a "greenhouse effect." This effect is essential 
for life on Earth, as it keeps the planet warm enough to support life. However, human 
activities have intensified this natural process, leading to a warmer climate. 
Fossil Fuels 
Burning fossil fuels for energy releases large amounts of CO2. This includes coal, oil, and 
natural gas used for electricity, heating, and transportation. The industrial revolution marked 
the beginning of a significant increase in fossil fuel consumption, which continues to rise 
today. 
Coal

Sources:
Retri

Here, the internal documents — containing detailed environmental data — are highly relevant to the query. The retrieval step successfully pulls one or more useful passages, and the model assigns a high semantic relevance score (above the threshold). Because of that, CRAG confidently skips external search and directly uses retrieved content for generation. This path represents the “correct” correction state: no intervention needed, only internal data is used.

### Example: Query with low relevance to internal documents
This example shows what happens when the system encounters a completely out-of-domain or fictional topic that is not present in our indexed documents.

In [15]:
# A fictional question unlikely to be covered in the local index
query = "how did harry beat quirrell?"

# Process the query through CRAG to trigger fallback behavior
result = crag_process(query, vectorstore)
print(f"Query: {query}")
print(f"Answer: {result}")


Processing query: how did harry beat quirrell?

Retrieved 3 documents
Evaluation scores: [0.0, 0.0, 0.0]

Action: Incorrect - Performing web search
Error parsing search results. Returning empty list.

Final knowledge:
[]

Sources:

Generating response...

Response generated
Query: how did harry beat quirrell?
Answer: In "Harry Potter and the Sorcerer's Stone," Harry Potter defeats Professor Quirrell during the climax of the story. Quirrell, who is possessed by Lord Voldemort, attempts to steal the Sorcerer's Stone to gain immortality. When Harry confronts Quirrell in the underground chamber, he is unable to touch Harry without suffering severe pain. This is because Harry is protected by the sacrificial love of his mother, Lily Potter, which creates a magical barrier against Voldemort.

As Quirrell tries to seize the Stone from Harry, he experiences intense agony when he makes contact with him. This ultimately leads to Quirrell's defeat, as he cannot withstand the protective magic that

Here, the vector store returns irrelevant or empty results, and the evaluator assigns very low semantic scores. The system interprets this as a signal that internal data is not useful and invokes a full web search instead. This demonstrates the “incorrect” path — where corrective behavior is fully triggered. The query is rewritten for search engine compatibility, fresh content is retrieved and refined, and the final answer is generated from that external source. This fallback is what gives CRAG its resilience and adaptability.