# Introduction to LangGraph
## Building your first graph
Let's build a simple graph to see how these pieces fit together. This example creates a basic question-answering system with optional retrieval:

In [None]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

class SimpleState(TypedDict):
    question: str
    needs_search: bool
    context: str
    answer: str

# Load environment variables
load_dotenv()

# Verify the API key is loaded
if not os.getenv("OPENAI_API_KEY"):
    raise ValueError("OPENAI_API_KEY not found in environment variables")

llm = ChatOpenAI(model="gpt-4o-mini")

def classify_question(state: SimpleState) -> SimpleState:
    """Decide if we need to search for information."""
    response = llm.invoke(
        f"Does this question need external information? Answer yes/no: {state['question']}"
    )
    needs_search = "yes" in response.content.lower()
    return {"needs_search": needs_search}

def search_info(state: SimpleState) -> SimpleState:
    """Simulate searching for information."""
    # In real code, this would query a vector database
    context = f"Retrieved context for: {state['question']}"
    return {"context": context}

def generate_answer(state: SimpleState) -> SimpleState:
    """Generate the final answer."""
    context = state.get("context", "No additional context.")
    response = llm.invoke(
        f"Question: {state['question']}\nContext: {context}\nAnswer:"
    )
    return {"answer": response.content}

def route_after_classify(state: SimpleState) -> Literal["search", "generate"]:
    """Route based on classification."""
    if state["needs_search"]:
        return "search"
    return "generate"

# Build the graph
graph = StateGraph(SimpleState)

graph.add_node("classify", classify_question)
graph.add_node("search", search_info)
graph.add_node("generate", generate_answer)

graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_after_classify)
graph.add_edge("search", "generate")
graph.add_edge("generate", END)

# Compile and run
app = graph.compile()
result = app.invoke({"question": "What is the capital of France?"})
print(result["answer"])


## Visualizing graphs

In [None]:
import grandalf  # Install first with `uv pip install grandalf`

# ASCII visualization
print(app.get_graph().draw_ascii())

# Or get a Mermaid diagram
print(app.get_graph().draw_mermaid())


# The Agentic RAG Architecture

In [None]:
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END, add_messages
from langchain_qdrant import QdrantVectorStore
from langchain_openai import OpenAIEmbeddings
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
import json
import re

# Initialize embeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# Create in-memory Qdrant client
qdrant_client = QdrantClient(":memory:")

# Collection name
COLLECTION_NAME = "study_materials"

# Create collection with proper dimensions (1536 for text-embedding-3-small)
qdrant_client.create_collection(
    collection_name=COLLECTION_NAME,
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)

# Create LangChain vector store wrapper
vector_store = QdrantVectorStore(
    client=qdrant_client,
    collection_name=COLLECTION_NAME,
    embedding=embeddings
)

print(f"Created Qdrant collection: {COLLECTION_NAME}")

class AgenticRAGState(TypedDict):
    messages: Annotated[list, add_messages]
    query: str
    query_type: str              # factual, conceptual, procedural
    complexity: str              # simple, moderate, complex
    needs_retrieval: bool
    search_queries: list[str]    # Generated search queries
    retrieved_docs: list         # Documents from vector store
    retrieval_sufficient: bool   # Did we get enough?
    response: str
    confidence: float            # 0-1 confidence score
    iteration: int               # Track retrieval iterations


def parse_json_response(text: str, default: dict = None) -> dict:
    """Extract and parse JSON from LLM response, handling markdown code blocks."""
    if not text or not text.strip():
        return default or {}
    
    # Try to find JSON in code blocks first
    code_block_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', text)
    if code_block_match:
        text = code_block_match.group(1).strip()
    
    # Try to find JSON object or array
    json_match = re.search(r'(\{[\s\S]*\}|\[[\s\S]*\])', text)
    if json_match:
        text = json_match.group(1)
    
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return default or {}


def analyze_query_node(state: AgenticRAGState) -> AgenticRAGState:
    """Analyze the query to determine handling strategy."""
    query = state["query"]
    analysis_prompt = f"""Analyze this query and respond with JSON:

Query: {query}

Determine:
1. query_type: "factual" (specific facts), "conceptual" (understanding),
   "procedural" (how-to), or "comparison" (comparing things)
2. complexity: "simple" (direct answer), "moderate" (some context needed),
   "complex" (multiple aspects, deep context)
3. needs_retrieval: true if this needs information from documents,
   false if general knowledge suffices
4. search_queries: if retrieval needed, list 1-3 effective search queries

Respond ONLY with JSON: {{"query_type": "...", "complexity": "...",
"needs_retrieval": true/false, "search_queries": [...]}}"""

    response = llm.invoke(analysis_prompt)
    default = {"query_type": "conceptual", "complexity": "moderate", 
               "needs_retrieval": True, "search_queries": [query]}
    analysis = parse_json_response(response.content, default)
    return {
        "query_type": analysis.get("query_type", "conceptual"),
        "complexity": analysis.get("complexity", "moderate"),
        "needs_retrieval": analysis.get("needs_retrieval", True),
        "search_queries": analysis.get("search_queries", [query]),
        "iteration": 0
    }


def retrieve_node(state: AgenticRAGState) -> AgenticRAGState:
    """Retrieve documents with strategy based on query complexity."""
    complexity = state["complexity"]
    search_queries = state["search_queries"]
    k_values = {"simple": 2, "moderate": 4, "complex": 6}
    k = k_values.get(complexity, 3)

    all_docs = []
    for query in search_queries:
        docs = vector_store.similarity_search(query, k=k)
        all_docs.extend(docs)

    seen = set()
    unique_docs = []
    for doc in all_docs:
        content_hash = hash(doc.page_content)
        if content_hash not in seen:
            seen.add(content_hash)
            unique_docs.append(doc)
    return {"retrieved_docs": unique_docs}


def evaluate_node(state: AgenticRAGState) -> AgenticRAGState:
    """Evaluate if retrieved documents can answer the query."""
    query = state["query"]
    docs = state["retrieved_docs"]
    iteration = state.get("iteration", 0)
    
    # If no docs retrieved, return low confidence
    if not docs:
        return {
            "retrieval_sufficient": iteration >= 2,
            "confidence": 0.3,
            "iteration": iteration + 1
        }
    
    context = "\n\n".join([doc.page_content for doc in docs])

    eval_prompt = f"""Evaluate if this context can answer the query:

Query: {query}

Retrieved Context:
{context[:3000]}

Assess:
1. relevance: Are these documents about the right topic? (0-1)
2. completeness: Do they contain enough to fully answer? (0-1)
3. confidence: How confident can the answer be? (0-1)

Respond with JSON: {{"relevance": 0.8, "completeness": 0.7, "confidence": 0.75}}"""

    response = llm.invoke(eval_prompt)
    default = {"relevance": 0.7, "completeness": 0.6, "confidence": 0.65}
    scores = parse_json_response(response.content, default)
    
    confidence = scores.get("confidence", 0.65)
    sufficient = confidence >= 0.7 or iteration >= 2
    return {
        "retrieval_sufficient": sufficient,
        "confidence": confidence,
        "iteration": iteration + 1
    }


def generate_node(state: AgenticRAGState) -> AgenticRAGState:
    """Generate response using retrieved context."""
    query = state["query"]
    docs = state["retrieved_docs"]
    confidence = state.get("confidence", 0.5)
    context = "\n\n".join([
        f"[{doc.metadata.get('source', 'unknown')}]:\n{doc.page_content}"
        for doc in docs
    ])

    generate_prompt = f"""Answer this query using the provided context:

Query: {query}

Context:
{context}

Instructions:
- Answer based on the context provided
- Cite sources when referencing specific information
- If context is insufficient, acknowledge limitations
- Be clear and educational in your explanation"""

    response = llm.invoke(generate_prompt)
    return {"response": response.content, "confidence": confidence}


def direct_answer_node(state: AgenticRAGState) -> AgenticRAGState:
    """Answer directly without retrieval."""
    query = state["query"]
    response = llm.invoke(f"Answer this question concisely: {query}")
    return {"response": response.content, "confidence": 0.9}


def route_after_analysis(state: AgenticRAGState) -> Literal["retrieve", "direct"]:
    """Route based on whether retrieval is needed."""
    if state["needs_retrieval"]:
        return "retrieve"
    return "direct"


def route_after_evaluation(state: AgenticRAGState) -> Literal["generate", "retrieve"]:
    """Route based on whether retrieval was sufficient."""
    if state["retrieval_sufficient"]:
        return "generate"
    return "retrieve"


# Build the graph
graph = StateGraph(AgenticRAGState)
graph.add_node("analyze", analyze_query_node)
graph.add_node("retrieve", retrieve_node)
graph.add_node("evaluate", evaluate_node)
graph.add_node("generate", generate_node)
graph.add_node("direct", direct_answer_node)

graph.set_entry_point("analyze")
graph.add_conditional_edges("analyze", route_after_analysis, {
    "retrieve": "retrieve",
    "direct": "direct"
})
graph.add_edge("retrieve", "evaluate")
graph.add_conditional_edges("evaluate", route_after_evaluation, {
    "generate": "generate",
    "retrieve": "retrieve"
})
graph.add_edge("generate", END)
graph.add_edge("direct", END)

agentic_rag = graph.compile()

In [None]:
# Try it out!
result = agentic_rag.invoke({
    "query": "What is the difference between RAG and fine-tuning?",
    "messages": []
})
print(f"Query Type: {result.get('query_type')}")
print(f"Complexity: {result.get('complexity')}")
print(f"Used Retrieval: {result.get('needs_retrieval')}")
print(f"Confidence: {result.get('confidence')}")
print(f"\nResponse:\n{result.get('response')}")

# Query Planning and Analysis

In [None]:
def extract_search_intent(query: str) -> dict:
    """Extract the underlying search intent from a user query."""

    intent_prompt = f"""Analyze this user query for search intent:

Query: "{query}"

Identify:
1. core_topic: The main subject (use technical terms)
2. specific_aspect: What specifically they want to know
3. related_concepts: Other topics that might be relevant
4. search_terms: 3-5 terms likely to appear in relevant documents

Example:
Query: "How do I make my chatbot remember stuff?"
- core_topic: "memory systems in LLM applications"
- specific_aspect: "implementing conversation persistence"
- related_concepts: ["context window", "vector stores", "session state"]
- search_terms: ["memory", "persistence", "conversation history", "state management"]

Respond with JSON."""

    response = llm.invoke(intent_prompt)
    return parse_json_response(response.content)


def decompose_complex_query(query: str) -> list[str]:
    """Break complex queries into searchable sub-questions."""

    decompose_prompt = f"""Break this complex query into simpler sub-questions:

Query: "{query}"

Rules:
- Each sub-question should be answerable with a focused search
- Cover all aspects of the original query
- Keep sub-questions independent
- Maximum 4 sub-questions

If the query is already simple, return it unchanged.

Respond with JSON: {{"sub_questions": ["q1", "q2", ...]}}"""

    response = llm.invoke(decompose_prompt)
    result = parse_json_response(response.content)
    return result["sub_questions"]


def assess_complexity(query: str, query_type: str) -> str:
    """Determine query complexity for resource allocation."""

    complexity_prompt = f"""Assess the complexity of answering this query:

Query: "{query}"
Query Type: {query_type}

Complexity levels:
- SIMPLE: Direct fact or definition, single concept, one source sufficient
- MODERATE: Requires some explanation, few concepts, 2-3 sources helpful
- COMPLEX: Multiple aspects, comparison/evaluation, needs synthesis from many sources

Consider:
- How many distinct concepts are involved?
- Does it require comparison or evaluation?
- Is domain expertise needed?
- Would the answer need multiple sources?

Respond with just: SIMPLE, MODERATE, or COMPLEX"""

    response = llm.invoke(complexity_prompt)
    return response.content.strip().upper()

# Dynamic Retrieval Strategies

In [None]:
def should_retrieve(query: str, query_type: str) -> bool:
    """Determine if retrieval would help answer this query."""

    # Quick heuristics first
    general_patterns = ["what is 2+2", "hello", "how are you", "thanks"]
    if any(p in query.lower() for p in general_patterns):
        return False

    # LLM-based decision for ambiguous cases
    decision_prompt = f"""Should I search a knowledge base to answer this?

Query: "{query}"

The knowledge base contains technical documentation about AI engineering,
including RAG, agents, LangChain, embeddings, and related topics.

Answer YES if:
- The query is about these specific topics
- The answer requires factual information I might not have
- The user is asking about implementation details

Answer NO if:
- It's a general knowledge question
- It's conversational or social
- It's about basic programming not specific to AI engineering
- I can answer confidently from general knowledge

Respond with just: YES or NO"""

    response = llm.invoke(decision_prompt)
    return "YES" in response.content.upper()


def generate_search_queries(query: str, complexity: str) -> list[str]:
    """Generate effective search queries based on user query."""

    num_queries = {"SIMPLE": 1, "MODERATE": 2, "COMPLEX": 3}
    n = num_queries.get(complexity, 2)

    gen_prompt = f"""Generate {n} search queries to find information for:

User question: "{query}"

Guidelines:
- Use technical terms that would appear in documentation
- Make queries specific and focused
- Cover different angles of the question
- Each query should be 3-7 words

Respond with JSON: {{"queries": ["query1", "query2", ...]}}"""

    response = llm.invoke(gen_prompt)
    result = parse_json_response(response.content)
    return result["queries"]


def calculate_retrieval_k(complexity: str, query_type: str) -> int:
    """Calculate how many documents to retrieve."""

    base_k = {
        "SIMPLE": 2,
        "MODERATE": 4,
        "COMPLEX": 6
    }

    # Adjust for query type
    type_multiplier = {
        "comparison": 1.5,   # Need multiple perspectives
        "procedural": 1.0,   # Usually one good doc suffices
        "conceptual": 1.25,  # Benefits from multiple explanations
        "factual": 0.75     # Usually in one place
    }

    k = base_k.get(complexity, 3)
    k = int(k * type_multiplier.get(query_type, 1.0))

    return max(1, min(k, 10))  # Clamp between 1-10


def multi_query_retrieve(queries: list[str], k_per_query: int) -> list:
    """Retrieve documents using multiple queries and deduplicate."""

    all_docs = []
    seen_content = set()

    for query in queries:
        docs = vector_store.similarity_search(query, k=k_per_query)

        for doc in docs:
            # Deduplicate by content hash
            content_hash = hash(doc.page_content[:500])
            if content_hash not in seen_content:
                seen_content.add(content_hash)
                all_docs.append(doc)

    return all_docs

# Result Synthesis and Response Generation

In [None]:
def synthesize_response(query: str, docs: list, confidence: float) -> str:
    """Generate a synthesized response from multiple documents."""

    # Format documents with source attribution
    formatted_context = []
    for i, doc in enumerate(docs):
        source = doc.metadata.get("source", f"Document {i+1}")
        formatted_context.append(f"[Source: {source}]\n{doc.page_content}")

    context = "\n\n---\n\n".join(formatted_context)

    confidence_instruction = ""
    if confidence < 0.7:
        confidence_instruction = """
Note: Retrieved information may be incomplete. Acknowledge any gaps
and avoid overstating certainty."""

    synthesis_prompt = f"""Answer this question by synthesizing the provided sources:

Question: {query}

Sources:
{context}
{confidence_instruction}

Instructions:
- Synthesize information from multiple sources when relevant
- Cite sources naturally (e.g., "According to the RAG fundamentals guide...")
- If sources provide different perspectives, acknowledge them
- Be educational and clear
- If information is incomplete, say so rather than guessing"""

    response = llm.invoke(synthesis_prompt)
    return response.content


def check_for_conflicts(docs: list) -> list[str]:
    """Identify potential conflicts in retrieved documents."""

    context = "\n\n".join([doc.page_content for doc in docs])

    conflict_prompt = f"""Review these documents for conflicting information:

{context}

Identify any contradictory claims or recommendations.
If conflicts exist, explain what differs and why both might be valid.

Respond with JSON: {{"has_conflicts": true/false, "conflicts": [...]}}"""

    response = llm.invoke(conflict_prompt)
    result = parse_json_response(response.content)
    return result.get("conflicts", [])


def calculate_confidence(query: str, docs: list, relevance_score: float) -> float:
    """Calculate confidence score for the response."""

    # Factor 1: Retrieval relevance
    retrieval_confidence = relevance_score

    # Factor 2: Coverage - do docs address the query's main aspects?
    coverage_prompt = f"""Rate how well these documents cover this query (0-1):
Query: {query}
Docs cover: {[doc.page_content[:200] for doc in docs]}
Respond with just a number."""

    coverage = float(llm.invoke(coverage_prompt).content.strip())

    # Factor 3: Consistency - do docs agree?
    if len(docs) > 1:
        consistency = 0.9  # Assume consistent unless conflicts detected
    else:
        consistency = 0.8  # Single source is less reliable

    # Weighted combination
    confidence = (
        0.4 * retrieval_confidence +
        0.4 * coverage +
        0.2 * consistency
    )

    return round(confidence, 2)

# Handling Ambiguity and Uncertainty

In [None]:
def detect_ambiguity(query: str, conversation_history: list) -> dict:
    """Detect if query is ambiguous and might need clarification."""

    ambiguity_prompt = f"""Analyze this query for ambiguity:

Query: "{query}"
Conversation context: {conversation_history[-3:] if conversation_history else 'None'}

Check for:
- Unclear pronouns ("it", "this", "that") without referent
- Missing context that makes the query interpretable multiple ways
- Vague terms that could mean different things
- References to prior conversation that isn't available

Respond with JSON:
{{
    "is_ambiguous": true/false,
    "ambiguity_type": "pronoun/context/vague/reference" or null,
    "clarifying_question": "What would help?" or null,
    "best_interpretation": "Most likely meaning if we proceed"
}}"""

    response = llm.invoke(ambiguity_prompt)
    return parse_json_response(response.content)


def should_clarify(ambiguity_analysis: dict) -> bool:
    """Decide whether to ask for clarification."""

    if not ambiguity_analysis["is_ambiguous"]:
        return False

    # High-risk ambiguities warrant clarification
    high_risk_types = ["pronoun", "reference"]
    if ambiguity_analysis["ambiguity_type"] in high_risk_types:
        return True

    # For vague queries, proceed with best interpretation
    # but note the assumption in the response
    return False


def generate_uncertain_response(query: str, confidence: float, reason: str) -> str:
    """Generate response that acknowledges uncertainty."""

    uncertain_prompt = f"""Generate a helpful response that acknowledges limitations:

Query: {query}
Confidence level: {confidence}
Uncertainty reason: {reason}

Guidelines:
- Be honest about what you don't know
- Share whatever relevant information you do have
- Suggest how the user might find better information
- Don't apologize excessively, just be straightforward

Example: "I found some information about X, but the documents don't directly
address Y. Here's what I can tell you... For more specific information about Y,
you might want to..."""

    response = llm.invoke(uncertain_prompt)
    return response.content

# Putting It All Together

In [None]:
# Example: Complete Agentic RAG Pipeline
# First, load and index the documents from the documents directory

from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Load all markdown files from the documents directory
loader = DirectoryLoader(
    "documents",
    glob="**/*.md",
    loader_cls=TextLoader
)
documents = loader.load()
print(f"Loaded {len(documents)} documents")

# Split into chunks
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
chunks = text_splitter.split_documents(documents)
print(f"Split into {len(chunks)} chunks")

# Add to vector store
vector_store.add_documents(chunks)
print(f"Indexed {len(chunks)} chunks in vector store\n")

# Now run the agentic RAG pipeline
test_query = "How does RAG compare to fine-tuning for customizing LLM behavior?"

print("=" * 60)
print("Running Agentic RAG Pipeline")
print("=" * 60)

result = agentic_rag.invoke({
    "query": test_query,
    "messages": []
})

print(f"Query: {test_query}\n")
print(f"Query type: {result.get('query_type')}")
print(f"Complexity: {result.get('complexity')}")
print(f"Used retrieval: {result.get('needs_retrieval')}")
print(f"Documents retrieved: {len(result.get('retrieved_docs', []))}")
print(f"Confidence: {result.get('confidence')}")
print(f"Iterations: {result.get('iteration')}")

print("\n" + "=" * 60)
print("RESPONSE")
print("=" * 60)
print(result.get('response'))

# Using Open-Source Models

In [None]:
# Verify Ollama is running and models are available
from langchain_ollama import ChatOllama, OllamaEmbeddings

# Test connection to Ollama
try:
    # Try a smaller model first (3B instead of 20B)
    test_llm = ChatOllama(model="llama3.2:3b", temperature=0)
    test_response = test_llm.invoke("Say 'Ollama is working!' in exactly 3 words.")
    print(f"Chat Model Test: {test_response.content}")
    
    test_embeddings = OllamaEmbeddings(model="nomic-embed-text")
    test_vector = test_embeddings.embed_query("test")
    print(f"Embedding Model Test: Vector dimension = {len(test_vector)}")
    print("\nOllama is ready!")
except Exception as e:
    print(f"Error connecting to Ollama: {e}")
    print("\nMake sure:")
    print("1. Ollama is installed: https://ollama.com/")
    print("2. Ollama is running: 'ollama serve'")
    print("3. Models are pulled: 'ollama pull llama3.2:3b' and 'ollama pull nomic-embed-text'")