# Comprehensive Agentic RAG Workflow

Building a complete agentic RAG system that combines query routing, document grading, and query rewriting using LangGraph.

This notebook implements:
- Query Routing
- Document Relevance Grading
- Self Correction through Query Rewriting

## Setup and Environment

In [387]:
import os
from typing import TypedDict, List, Literal, Annotated, Sequence
from typing_extensions import TypedDict

from langchain_chroma.vectorstores import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.retrievers import TavilySearchAPIRetriever
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage, AIMessage
from langchain_text_splitters import RecursiveCharacterTextSplitter

from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages

from pydantic import BaseModel, Field

In [None]:
# Set your API keys
os.environ["OPENAI_API_KEY"] = ""
os.environ["TAVILY_API_KEY"] = ""

## Building the Vector Database

In [389]:
# Load articles from sajalsharma.com
urls = [
    "https://sajalsharma.com/posts/introduction-to-agentic-rag/",
    "https://sajalsharma.com/posts/agentic-rag-query-router-langgraph/",
    "https://sajalsharma.com/posts/corrective-rag-langgraph/",
]

# Load documents
print("Loading blog posts from sajalsharma.com...")
docs = []
for url in urls:
    try:
        loader = WebBaseLoader(url)
        docs.extend(loader.load())
        print(f"✓ Loaded: {url}")
    except Exception as e:
        print(f"✗ Failed to load {url}: {e}")

Loading blog posts from sajalsharma.com...
✓ Loaded: https://sajalsharma.com/posts/introduction-to-agentic-rag/
✓ Loaded: https://sajalsharma.com/posts/agentic-rag-query-router-langgraph/
✓ Loaded: https://sajalsharma.com/posts/corrective-rag-langgraph/


In [390]:
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500, 
    chunk_overlap=100
)
doc_splits = text_splitter.split_documents(docs)
print(f"\nCreated {len(doc_splits)} document chunks")


Created 177 document chunks


In [412]:
vector_store = Chroma.from_documents(
    documents=doc_splits,
    embedding=OpenAIEmbeddings(),
    collection_name="blog-posts",
    persist_directory="chroma"
)
    

In [413]:
retriever = vector_store.as_retriever()

## Defining the Graph State

In [414]:
class GraphState(TypedDict):
    """
    Represents the state of our comprehensive agentic RAG graph.
    
    Attributes:
        messages: Conversation history
        query: Original user query
        chosen_datasource: Selected retrieval source
        retrieved_docs: Documents retrieved from any source
        relevance_check: Whether documents are relevant
        query_rewrite_count: Number of query rewrites attempted
        final_answer: Generated response
    """
    messages: Annotated[Sequence[BaseMessage], add_messages]
    query: str
    chosen_datasource: str
    retrieved_docs: List[Document]
    relevance_check: str
    query_rewrite_count: int
    final_answer: str

## Creating the Router Agent

In [415]:
# Initialize LLM
llm = ChatOpenAI(model="gpt-4.1", temperature=0)

# Define routing schema
class RouteQuery(BaseModel):
    """Route query to appropriate datasource."""
    datasource: Literal["vectorstore", "web_search", "direct_response"] = Field(
        description="Choose between vectorstore for Sajal's blog content about RAG and agents, web_search for current events, or direct_response for general knowledge"
    )
    reasoning: str = Field(
        description="Brief explanation for the routing decision"
    )

In [416]:
# Router prompt
router_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are an expert at routing user queries to the appropriate data source.
    
    Based on the query, choose where to route it:
    - vectorstore: For questions about Sajal's blog posts on agentic RAG, corrective RAG, query routing, or related RAG patterns
    - web_search: For current events, recent developments, or information requiring real-time data
    - direct_response: For general knowledge, definitions, or questions that don't require external data
    
    Analyze the query carefully and make the best routing decision."""),
    ("human", "{query}")
])

# Create router chain
router_chain = router_prompt | llm.with_structured_output(RouteQuery)

In [417]:
def route_query(state: GraphState) -> GraphState:
    """Route query to the appropriate datasource."""
    print("*** ROUTING QUERY ***")
    
    query = state["query"]
    router_result = router_chain.invoke({"query": query})
    
    print(f"Routing to: {router_result.datasource}")
    print(f"Reasoning: {router_result.reasoning}")
    
    return {
        "chosen_datasource": router_result.datasource,
        "messages": [AIMessage(content=f"Routing to {router_result.datasource}: {router_result.reasoning}")]
    }

## Implementing Retrieval Nodes

In [418]:
# Vector store retrieval
def retrieve_from_vectorstore(state: GraphState) -> GraphState:
    """Retrieve documents from vector store."""
    print("*** RETRIEVING FROM VECTOR STORE ***")
    
    query = state["query"]
    documents = retriever.invoke(query)
    return {
        "retrieved_docs": documents,
        "messages": [AIMessage(content=f"Retrieved {len(documents)} documents from vector store")]
    }

In [419]:
# Web search retrieval
web_search_retriever = TavilySearchAPIRetriever(k=3)

def retrieve_from_web(state: GraphState) -> GraphState:
    """Retrieve documents from web search."""
    print("*** RETRIEVING FROM WEB SEARCH ***")
    
    query = state["query"]
    documents = web_search_retriever.invoke(query)
    return {
        "retrieved_docs": documents,
        "messages": [AIMessage(content=f"Retrieved {len(documents)} documents from web search")]
    }

In [420]:
# Note: prepare_direct_response node removed as it was redundant

## Document Grading with Self-Reflection

In [438]:
# Document grading schema
class GradeDocuments(BaseModel):
    """Binary score for document relevance."""
    binary_score: Literal["yes", "no"] = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )

# Grading prompt
grade_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a grader assessing relevance of retrieved documents to a user question.
    
    Retrieved document:
    {document}
    
    User question: {question}
    
    If the document is relevant to the user's original question, grade it as relevant.
    Give a binary score 'yes' or 'no' to indicate relevance."""),
    ("human", "Grade the document.")
])

grade_chain = grade_prompt | llm.with_structured_output(GradeDocuments)

In [439]:
def grade_documents(state: GraphState) -> GraphState:
    """Grade the relevance of retrieved documents."""
    print("*** GRADING DOCUMENTS ***")
    
    query = state["query"]
    documents = state["retrieved_docs"]
    
    if not documents:
        return {"relevance_check": "no_documents"}
    
    # Grade each document
    relevant_docs = []
    for i, doc in enumerate(documents):
        # Get a snippet of the document content for display
        snippet = doc.page_content[:50].replace('\n', ' ').strip()
        if len(doc.page_content) > 50:
            snippet += "..."
        
        grade = grade_chain.invoke({
            "document": doc.page_content,
            "question": query
        })
        
        if grade.binary_score == "yes":
            print(f"✓ Document {i+1} graded as RELEVANT")
            print(f"  Snippet: {doc.page_content}")
            relevant_docs.append(doc)
        else:
            print(f"✗ Document {i+1} graded as NOT RELEVANT")
            print(f"  Snippet: {doc.page_content}")
    
    # Update state based on grading results
    if relevant_docs:
        return {
            "retrieved_docs": relevant_docs,
            "relevance_check": "relevant",
            "messages": [AIMessage(content=f"Found {len(relevant_docs)} relevant documents")]
        }
    else:
        return {
            "relevance_check": "not_relevant",
            "messages": [AIMessage(content="No relevant documents found")]
        }

## Query Rewriting for Better Retrieval

In [440]:
# Query rewriting prompt
rewrite_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a query rewriting expert. The user's original query didn't retrieve relevant documents.
    
    Analyze the query and rewrite it to improve retrieval chances:
    - Make it more specific or more general as appropriate
    - Add synonyms or related terms
    - Rephrase to target likely document content
    - Consider the retrieval failure and adjust accordingly
    
    Original query: {query}
    Previous datasource: {datasource}"""),
    ("human", "Provide a rewritten query that will retrieve better results.")
])

rewrite_chain = rewrite_prompt | llm | StrOutputParser()

In [441]:
def rewrite_query(state: GraphState) -> GraphState:
    """Rewrite the query for better retrieval."""
    print("*** REWRITING QUERY ***")
    
    original_query = state["query"]
    datasource = state.get("chosen_datasource", "unknown")
    count = state.get("query_rewrite_count", 0)
    
    # Rewrite the query
    rewritten_query = rewrite_chain.invoke({
        "query": original_query,
        "datasource": datasource
    })
    
    print(f"Original: {original_query}")
    print(f"Rewritten: {rewritten_query}")
    
    return {
        "query": rewritten_query,
        "query_rewrite_count": count + 1,
        "messages": [AIMessage(content=f"Query rewritten: {rewritten_query}")]
    }

## Response Generation

In [442]:
# RAG generation prompt
rag_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are an AI assistant. Answer the question based on the retrieved context.
    
    Use the following pieces of retrieved context to answer the question. 
    If you don't know the answer, say that you don't know.
    Keep the answer concise but comprehensive.
    
    Context:
    {context}"""),
    ("human", "{question}")
])

rag_chain = rag_prompt | llm | StrOutputParser()

In [443]:
def generate_with_context(state: GraphState) -> GraphState:
    """Generate answer using retrieved documents."""
    print("*** GENERATING WITH CONTEXT ***")
    
    query = state["query"]
    documents = state["retrieved_docs"]
    
    # Format documents for context
    context = "\n\n".join([doc.page_content for doc in documents])
    
    # Generate response
    answer = rag_chain.invoke({
        "context": context,
        "question": query
    })
    
    return {
        "final_answer": answer,
        "messages": [AIMessage(content="Generated response with context")]
    }

In [444]:
def generate_direct_response(state: GraphState) -> GraphState:
    """Generate response without retrieval context."""
    print("*** GENERATING DIRECT RESPONSE ***")
    
    query = state["query"]
    
    direct_prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful AI assistant. Answer the question based on your knowledge."),
        ("human", "{question}")
    ])
    
    direct_chain = direct_prompt | llm | StrOutputParser()
    answer = direct_chain.invoke({"question": query})
    
    return {
        "final_answer": answer,
        "messages": [AIMessage(content="Generated direct response")]
    }

In [445]:
def generate_no_info_response(state: GraphState) -> GraphState:
    """Generate response when no relevant information is found."""
    print("*** GENERATING NO INFO RESPONSE ***")
    
    query = state["query"]
    attempts = state.get("query_rewrite_count", 0)
    
    answer = f"""I couldn't find relevant information to answer your question: "{query}"
    
I attempted to search {attempts + 1} time(s) across different sources and reformulated the query, 
but no relevant documents were found. This might be because:
- The information isn't available in my current knowledge sources
- The topic is too specific or recent
- The query needs to be approached differently

Please try rephrasing your question or providing more context."""
    
    return {
        "final_answer": answer,
        "messages": [AIMessage(content="No relevant information found")]
    }

## Conditional Edge Decision Functions

In [446]:
def should_retry(state: GraphState) -> Literal["rewrite_query", "no_info"]:
    """Determine if we should retry with rewritten query."""
    rewrite_count = state.get("query_rewrite_count", 0)
    max_retries = 2
    
    if rewrite_count < max_retries:
        return "rewrite_query"
    else:
        return "no_info"

def route_after_grading(state: GraphState) -> Literal["generate", "retry_decision"]:
    """Route based on document grading results."""
    relevance = state.get("relevance_check", "")
    
    if relevance == "relevant":
        return "generate"
    else:
        return "retry_decision"

def route_to_retrieval(state: GraphState) -> Literal["vectorstore", "web_search", "direct_response"]:
    """Route to appropriate retrieval method."""
    return state["chosen_datasource"]

## Compiling the Complete Workflow

In [447]:
def compile_workflow():
    """Compile the complete agentic RAG workflow."""
    workflow = StateGraph(GraphState)
    
    # Add all nodes
    workflow.add_node("route_query", route_query)
    workflow.add_node("retrieve_vectorstore", retrieve_from_vectorstore)
    workflow.add_node("retrieve_web", retrieve_from_web)
    workflow.add_node("grade_documents", grade_documents)
    workflow.add_node("rewrite_query", rewrite_query)
    workflow.add_node("generate_with_context", generate_with_context)
    workflow.add_node("generate_direct", generate_direct_response)
    workflow.add_node("generate_no_info", generate_no_info_response)
    
    # Build the graph flow
    workflow.set_entry_point("route_query")
    
    # Routing from query router - direct_response goes straight to generate_direct
    workflow.add_conditional_edges(
        "route_query",
        route_to_retrieval,
        {
            "vectorstore": "retrieve_vectorstore",
            "web_search": "retrieve_web",
            "direct_response": "generate_direct"
        }
    )
    
    # After retrieval, grade documents
    workflow.add_edge("retrieve_vectorstore", "grade_documents")
    workflow.add_edge("retrieve_web", "grade_documents")
    
    # After grading, decide next step
    workflow.add_conditional_edges(
        "grade_documents",
        route_after_grading,
        {
            "generate": "generate_with_context",
            "retry_decision": "rewrite_query"
        }
    )
    
    # After rewriting, check retry limit
    workflow.add_conditional_edges(
        "rewrite_query",
        should_retry,
        {
            "rewrite_query": "route_query",
            "no_info": "generate_no_info"
        }
    )
    
    # All generation nodes lead to END
    workflow.add_edge("generate_with_context", END)
    workflow.add_edge("generate_direct", END)
    workflow.add_edge("generate_no_info", END)
    
    return workflow.compile()

# Compile the workflow
app = compile_workflow()
print("✓ Comprehensive agentic RAG workflow compiled successfully")

✓ Comprehensive agentic RAG workflow compiled successfully


## Testing the Workflow

In [448]:
def run_workflow(query: str):
    """Run the agentic RAG workflow and return the response."""
    print(f"\n{'='*60}")
    print(f"QUERY: {query}")
    print(f"{'='*60}\n")
    
    initial_state = {
        "messages": [],
        "query": query,
        "query_rewrite_count": 0
    }
    
    result = app.invoke(initial_state)
    return result["final_answer"]

In [449]:
# Test 1: Query about corrective RAG from Sajal's blog (should route to vectorstore)
response1 = run_workflow("How does corrective RAG handle irrelevant documents?")
print(f"\nRESPONSE:\n{response1}")


QUERY: How does corrective RAG handle irrelevant documents?

*** ROUTING QUERY ***
Routing to: vectorstore
Reasoning: The query specifically asks about 'corrective RAG,' which is a topic covered in Sajal's blog posts. The user is seeking an explanation related to a RAG pattern, making the vectorstore (containing Sajal's blog content) the most appropriate data source.
*** RETRIEVING FROM VECTOR STORE ***
*** GRADING DOCUMENTS ***
✓ Document 1 graded as RELEVANT
  Snippet: This is where Corrective RAG (CRAG) comes into play. It enhances the traditional RAG framework by introducing a lightweight retrieval evaluator that assesses the quality of retrieved documents and assigns a confidence score. This score then informs whether to proceed with the generated answer or seek further information, potentially through approaches such as web-search, or in the case of this document, passing in more context to the LLM.
✓ Document 2 graded as RELEVANT
  Snippet: The initial retrieval process is perf

In [450]:
# Test 2: Query about agentic patterns from Sajal's blog
response2 = run_workflow("What are the main agentic patterns for RAG according to Sajal?")
print(f"\nRESPONSE:\n{response2}")


QUERY: What are the main agentic patterns for RAG according to Sajal?

*** ROUTING QUERY ***
Routing to: vectorstore
Reasoning: The query specifically asks about Sajal's perspective on agentic patterns for RAG, which would be covered in Sajal's blog posts. Therefore, the vectorstore containing Sajal's blog content is the appropriate data source.
*** RETRIEVING FROM VECTOR STORE ***
*** GRADING DOCUMENTS ***
✗ Document 1 graded as NOT RELEVANT
  Snippet: These behaviors align with the agentic patterns discussed earlier, transforming RAG into a more intelligent and self-correcting process. Agentic RAG actively guides its own retrieval strategy, ensuring higher-quality and more contextually relevant answers.
Agentic Patterns for RAG
✓ Document 2 graded as RELEVANT
  Snippet: An Introduction to Agentic RAG


    Skip to content    Sajal Sharma              
Posts
   
Tags
   
About Me
      Search                        Go back    An Introduction to Agentic RAG Published:Mar 4, 2025  Tabl

In [451]:
# Test 3: Current events query (should route to web search)
response3 = run_workflow("What are the latest LangGraph features released in 2024?")
print(f"\nRESPONSE:\n{response3}")


QUERY: What are the latest LangGraph features released in 2024?

*** ROUTING QUERY ***
Routing to: web_search
Reasoning: The query asks for the latest features of LangGraph released in 2024, which requires up-to-date information about recent developments. This information is best obtained through a web search.
*** RETRIEVING FROM WEB SEARCH ***
*** GRADING DOCUMENTS ***
✗ Document 1 graded as NOT RELEVANT
  Snippet: Zach Anderson Jul 13, 2024 16:26 LangChain, a leading platform in the AI development space, has released its latest updates, showcasing new use cases and enhancements across its ecosystem. According to the LangChain Blog , the updates cover advancements in LangGraph Cloud, LangSmith's self-improving evaluators, and revamped documentation for
✓ Document 2 graded as RELEVANT
  Snippet: We also have a new stable release of LangGraph. By LangChain 6 min read Jun 27, 2024 (Oct '24) Edit: Since the launch of LangGraph Cloud, we now have multiple deployment options alongside Lang

In [452]:
# Test 4: General knowledge query (should route to direct response)
response4 = run_workflow("What is the difference between a list and a tuple in Python?")
print(f"\nRESPONSE:\n{response4}")


QUERY: What is the difference between a list and a tuple in Python?

*** ROUTING QUERY ***
Routing to: direct_response
Reasoning: This is a general programming knowledge question about Python data structures and does not require external data or specific blog content.
*** GENERATING DIRECT RESPONSE ***

RESPONSE:
In Python, **lists** and **tuples** are both used to store collections of items, but they have some important differences:

### 1. Mutability
- **List:** Mutable (can be changed after creation; you can add, remove, or modify elements).
- **Tuple:** Immutable (cannot be changed after creation; elements cannot be added, removed, or modified).

### 2. Syntax
- **List:** Defined using square brackets `[ ]`
  ```python
  my_list = [1, 2, 3]
  ```
- **Tuple:** Defined using parentheses `( )`
  ```python
  my_tuple = (1, 2, 3)
  ```

### 3. Methods
- **List:** Has many built-in methods (like `append()`, `remove()`, `pop()`, etc.).
- **Tuple:** Has very few built-in methods (mainly `

In [453]:
# Test 5: Vague query that might require rewriting
response5 = run_workflow("is agentic rag bad?")
print(f"\nRESPONSE:\n{response5}")


QUERY: is agentic rag bad?

*** ROUTING QUERY ***
Routing to: vectorstore
Reasoning: The query is specifically about 'agentic RAG,' which is a topic covered in Sajal's blog posts. The user is likely seeking an informed perspective or analysis from those blog posts, making the vectorstore the appropriate data source.
*** RETRIEVING FROM VECTOR STORE ***
*** GRADING DOCUMENTS ***
✗ Document 1 graded as NOT RELEVANT
  Snippet: What makes RAG Agentic?
Agentic RAG introduces autonomy and adaptability into the standard RAG pipeline by allowing the system to actively control the retrieval process rather than relying on a fixed retrieval-then-generate flow.
This means that instead of a linear pipeline, where retrieval is a single-step precursor to generation, an Agentic RAG system introduces decision points throughout the process. The system might:
✗ Document 2 graded as NOT RELEVANT
  Snippet: Agentic RAG represents a significant evolution in retrieval-augmented generation, introducing auton

In [437]:
# Test 6: Query with no relevant information in Sajal's blog (should trigger no_info response)
response6 = run_workflow("What is Sajal's favorite food?")
print(f"\nRESPONSE:\n{response6}")


QUERY: What is Sajal's favorite food?

*** ROUTING QUERY ***
Routing to: direct_response
Reasoning: The query is asking for a personal preference (Sajal's favorite food), which is general knowledge unless specifically mentioned in Sajal's blog posts. Since there is no indication that this is covered in the blog content, and it is not a current event, direct_response is the best choice.
*** GENERATING DIRECT RESPONSE ***

RESPONSE:
I don't have enough information to determine who Sajal is or what their favorite food might be. If you provide more context about Sajal, I may be able to help!


## Conclusion

This comprehensive agentic RAG workflow demonstrates:

1. **Intelligent Query Routing**: Automatically selects the most appropriate data source
2. **Document Quality Control**: Validates retrieved documents before generation
3. **Self-Correction**: Rewrites queries when initial retrieval fails
4. **Graceful Failure Handling**: Provides meaningful responses when no relevant information exists

The modular design allows for easy extension and customization based on specific requirements.

### Key Benefits:

- **Adaptive Behavior**: System adjusts strategy based on query type and retrieval results
- **Quality Assurance**: Document grading ensures only relevant information is used
- **Robustness**: Multiple fallback mechanisms prevent system failures
- **Transparency**: Clear workflow execution makes the system interpretable

### Potential Extensions:

- Multi-source parallel retrieval
- Confidence scoring for responses
- Citation management
- Query decomposition for complex questions
- Hybrid search strategies