<a href="https://colab.research.google.com/github/intimanjunath/AI-Agents-Design-Patterns-using-LangGraph/blob/main/LangGraph_Agent_Design_Patterns_ipynb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
import os

# Your LangSmith API key
os.environ["LANGCHAIN_API_KEY"]      = "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"

# Enable the new tracing system
os.environ["LANGCHAIN_TRACING_V2"]   = "true"

# (Optional) Project and session naming‚Äîhelps you organize traces in the UI
os.environ["LANGCHAIN_PROJECT"]      = "AgentPatternsDemo"
os.environ["LANGCHAIN_SESSION_NAME"] = "PromptChaining_LangGraph"

#Pattern 1Ô∏è‚É£: Prompt Chaining
A classic pipeline where each LLM output feeds directly into the next step.

In [8]:
# Imports
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda
from typing import TypedDict

In [9]:
# Define the shared state schema
class ChainState(TypedDict):
    text: str
    summary: str
    translation: str
    final_output: str

# Step 1: Summarize
def summarize(state: ChainState) -> ChainState:
    summary = f"This is a brief summary of: {state['text'][:60]}..."
    print("üìù Summarized")
    return {**state, "summary": summary}

# Step 2: Translate to French
def translate_to_french(state: ChainState) -> ChainState:
    translated = f"(French) Ceci est un r√©sum√© de: {state['summary']}"
    print("üåê Translated")
    return {**state, "translation": translated}

# Step 3: Add Greeting
def add_greeting(state: ChainState) -> ChainState:
    output = "Bonjour! " + state["translation"]
    print("üëã Added Greeting")
    return {**state, "final_output": output}


In [10]:
builder = StateGraph(ChainState)
builder.add_node("summarizer", RunnableLambda(summarize))
builder.add_node("translator", RunnableLambda(translate_to_french))
builder.add_node("greeting", RunnableLambda(add_greeting))

builder.set_entry_point("summarizer")
builder.add_edge("summarizer", "translator")
builder.add_edge("translator", "greeting")
builder.add_edge("greeting", END)

graph = builder.compile()

In [11]:
input_state = {
    "text":       "Artificial Intelligence is transforming industries through automation, personalization, and data-driven decision-making.",
    "summary":    "",
    "translation":"",
    "final_output":""
}

result = graph.invoke(input_state)
print("\nüßæ Final Output:\n", result["final_output"])

üìù Summarized
üåê Translated
üëã Added Greeting

üßæ Final Output:
 Bonjour! (French) Ceci est un r√©sum√© de: This is a brief summary of: Artificial Intelligence is transforming industries through a...


#Section 2: Parallelization
Let‚Äôs implement the Parallelization Pattern next. We'll use the ThreadPoolExecutor approach to simulate parallel LLM calls in a single node.

In [16]:
# Update session for LangSmith tracing
import os
os.environ["LANGCHAIN_SESSION_NAME"] = "Parallelization_LangGraph"

# Imports (if not already present)
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda
from typing import TypedDict
from typing_extensions import Annotated
from concurrent.futures import ThreadPoolExecutor

In [17]:
# Shared state schema
class ParallelState(TypedDict):
    query: Annotated[str, "shared"]
    science_answer: str
    analogy_answer: str
    merged_output: str

# Science Expert
def explain_scientifically(state: ParallelState) -> ParallelState:
    print("üî¨ Science Expert responding...")
    return {
        **state,
        "science_answer": f"{state['query']} causes global warming due to greenhouse gases."
    }

# Analogy Expert
def explain_with_analogy(state: ParallelState) -> ParallelState:
    print("üé® Analogy Expert responding...")
    return {
        **state,
        "analogy_answer": f"{state['query']} is like wrapping the Earth in a blanket that traps heat."
    }

# Parallel runner node
def parallel_runner(state: ParallelState) -> ParallelState:
    with ThreadPoolExecutor() as exe:
        sci_fut = exe.submit(explain_scientifically, state)
        ana_fut = exe.submit(explain_with_analogy, state)
        sci_res = sci_fut.result()
        ana_res = ana_fut.result()
    return {
        **state,
        "science_answer": sci_res["science_answer"],
        "analogy_answer": ana_res["analogy_answer"]
    }

# Merge responses
def merge_responses(state: ParallelState) -> ParallelState:
    print("üîó Merging both responses...")
    combined = (
        f"Scientific Explanation:\n{state['science_answer']}\n\n"
        f"Analogy:\n{state['analogy_answer']}"
    )
    return {**state, "merged_output": combined}

In [18]:
#Build & Compile Graph

builder = StateGraph(ParallelState)
builder.add_node("parallel", RunnableLambda(parallel_runner))
builder.add_node("merge", RunnableLambda(merge_responses))

builder.set_entry_point("parallel")
builder.add_edge("parallel", "merge")
builder.add_edge("merge", END)

graph = builder.compile()

In [19]:
#Invoke & Inspect

input_state = {
    "query": "Climate change",
    "science_answer": "",
    "analogy_answer": "",
    "merged_output": ""
}

result = graph.invoke(input_state)
print("\nüßæ Final Combined Output:\n", result["merged_output"])

üî¨ Science Expert responding...
üé® Analogy Expert responding...
üîó Merging both responses...

üßæ Final Combined Output:
 Scientific Explanation:
Climate change causes global warming due to greenhouse gases.

Analogy:
Climate change is like wrapping the Earth in a blanket that traps heat.


#Section 3: Orchestrator‚ÄìWorker Pattern. You‚Äôll split a long text into two parts, have two workers summarize each, then synthesize the final summary.

In [20]:
# Switch to Orchestrator‚ÄìWorker session
import os
os.environ["LANGCHAIN_SESSION_NAME"] = "OrchestratorWorker_LangGraph"

# Imports
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda
from typing import TypedDict

In [27]:
from typing import TypedDict

# State schema
class TaskPlanState(TypedDict):
    project_idea: str
    backend_plan: str
    frontend_plan: str
    full_plan: str

# Orchestrator: dispatch project into parts
def orchestrator_task_split(state: TaskPlanState) -> TaskPlanState:
    print("üß† Orchestrator split idea into backend + frontend.")
    return {**state}

# Backend worker
def backend_worker(state: TaskPlanState) -> TaskPlanState:
    print("üßë‚Äçüíª Backend worker generating tasks...")
    backend = (
        "- Build intent classification model\n"
        "- Set up database (MongoDB)\n"
        "- Create API with FastAPI\n"
        "- Integrate logging and analytics"
    )
    return {**state, "backend_plan": backend}

# Frontend worker
def frontend_worker(state: TaskPlanState) -> TaskPlanState:
    print("üé® Frontend worker generating tasks...")
    frontend = (
        "- Design chatbot UI (React)\n"
        "- Add voice input support\n"
        "- Integrate with backend API\n"
        "- Create responsive layout"
    )
    return {**state, "frontend_plan": frontend}

# Synthesizer: merge both plans
def synthesize_task_plan(state: TaskPlanState) -> TaskPlanState:
    print("üßæ Synthesizing full plan...")
    plan = f"üßë‚Äçüíª Backend:\n{state['backend_plan']}\n\nüé® Frontend:\n{state['frontend_plan']}"
    return {**state, "full_plan": plan}

In [28]:
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda

builder = StateGraph(TaskPlanState)
builder.add_node("orchestrator", RunnableLambda(orchestrator_task_split))
builder.add_node("backend_worker", RunnableLambda(backend_worker))
builder.add_node("frontend_worker", RunnableLambda(frontend_worker))
builder.add_node("synthesizer", RunnableLambda(synthesize_task_plan))

builder.set_entry_point("orchestrator")
builder.add_edge("orchestrator", "backend_worker")
builder.add_edge("backend_worker", "frontend_worker")
builder.add_edge("frontend_worker", "synthesizer")
builder.add_edge("synthesizer", END)

graph = builder.compile()

In [29]:
input_state = {
    "project_idea": "Build an AI-powered chatbot for customer service",
    "backend_plan": "",
    "frontend_plan": "",
    "full_plan": ""
}

result = graph.invoke(input_state)
print("\nüßæ Final Project Plan:\n")
print(result["full_plan"])

üß† Orchestrator split idea into backend + frontend.
üßë‚Äçüíª Backend worker generating tasks...
üé® Frontend worker generating tasks...
üßæ Synthesizing full plan...

üßæ Final Project Plan:

üßë‚Äçüíª Backend:
- Build intent classification model
- Set up database (MongoDB)
- Create API with FastAPI
- Integrate logging and analytics

üé® Frontend:
- Design chatbot UI (React)
- Add voice input support
- Integrate with backend API
- Create responsive layout


# Section 4: Evaluator‚ÄìOptimizer Pattern

We‚Äôll generate 3 taglines for an AI product, and score them for creativity (mocked).

The agent will:

üß† Generator: Creates 3 taglines

üß™ Evaluator: Assigns a creativity score to each

üèÜ Aggregator: Picks the highest-scoring tagline and formats output



In [30]:
from typing import TypedDict, List
from langgraph.graph import StateGraph, END, START
from langchain_core.runnables import RunnableLambda

# LangSmith session for this pattern
import os
os.environ["LANGCHAIN_SESSION_NAME"] = "EvaluatorOptimizer_LangGraph"

# Graph state
class EvalState(TypedDict):
    product: str
    taglines: List[str]
    scores: List[float]
    best_tagline: str
    final_output: str

In [31]:
# Generator
def generate_taglines(state: EvalState) -> EvalState:
    print("üß† Generating taglines...")
    product = state["product"]
    taglines = [
        f"Talk to {product}, not just about it.",
        f"{product}: Intelligence at your service.",
        f"{product} that listens, learns, and responds."
    ]
    return {**state, "taglines": taglines}

# Evaluator (mocked creativity score)
def evaluate_taglines(state: EvalState) -> EvalState:
    print("üß™ Evaluating taglines...")
    scores = [len(t) % 10 + 1 for t in state["taglines"]]  # Fake scoring logic
    return {**state, "scores": scores}

# Aggregator: pick best tagline + format output
def select_best_tagline(state: EvalState) -> EvalState:
    print("üèÜ Selecting best tagline...")
    best_index = state["scores"].index(max(state["scores"]))
    best_tagline = state["taglines"][best_index]

    output = f"üéØ Generated Taglines for '{state['product']}':\n"
    for i, (tag, score) in enumerate(zip(state["taglines"], state["scores"]), 1):
        output += f"{i}. \"{tag}\" ‚Äî Creativity Score: {score}/10\n"

    output += f"\nüèÜ Top Choice: \"{best_tagline}\""
    return {**state, "best_tagline": best_tagline, "final_output": output}

In [32]:
#build graph
builder = StateGraph(EvalState)
builder.add_node("generator", RunnableLambda(generate_taglines))
builder.add_node("evaluator", RunnableLambda(evaluate_taglines))
builder.add_node("aggregator", RunnableLambda(select_best_tagline))

builder.set_entry_point("generator")
builder.add_edge("generator", "evaluator")
builder.add_edge("evaluator", "aggregator")
builder.add_edge("aggregator", END)

graph = builder.compile()

In [33]:
input_state = {
    "product": "ChatMind AI",
    "taglines": [],
    "scores": [],
    "best_tagline": "",
    "final_output": ""
}

result = graph.invoke(input_state)
print("\nüßæ Final Output:\n")
print(result["final_output"])

üß† Generating taglines...
üß™ Evaluating taglines...
üèÜ Selecting best tagline...

üßæ Final Output:

üéØ Generated Taglines for 'ChatMind AI':
1. "Talk to ChatMind AI, not just about it." ‚Äî Creativity Score: 10/10
2. "ChatMind AI: Intelligence at your service." ‚Äî Creativity Score: 3/10
3. "ChatMind AI that listens, learns, and responds." ‚Äî Creativity Score: 8/10

üèÜ Top Choice: "Talk to ChatMind AI, not just about it."


#Next: Section 5 ‚Äì Routing Pattern
Would you like to continue to Routing now, where we:

Accept a user query like ‚ÄúTell me the news‚Äù

Route it dynamically to either:

üóûÔ∏è NewsAgent

‚òÄÔ∏è WeatherAgent

ü§ñ FallbackAgent


In [34]:
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda

# LangSmith session
import os
os.environ["LANGCHAIN_SESSION_NAME"] = "Routing_LangGraph"

# State definition
class RouteState(TypedDict):
    query: str
    route: str
    response: str
    final_output: str

In [35]:
# Router: detect intent
def router_node(state: RouteState) -> RouteState:
    print("üß≠ Routing based on query...")
    q = state["query"].lower()
    if "news" in q:
        return {**state, "route": "news"}
    elif "weather" in q:
        return {**state, "route": "weather"}
    else:
        return {**state, "route": "fallback"}

# News Agent
def news_agent(state: RouteState) -> RouteState:
    print("üóû News agent responding...")
    return {**state, "response": "Breaking News: AI is now writing your code!"}

# Weather Agent
def weather_agent(state: RouteState) -> RouteState:
    print("‚òÄÔ∏è Weather agent responding...")
    return {**state, "response": "It's sunny and 24¬∞C in your area."}

# Fallback Agent
def fallback_agent(state: RouteState) -> RouteState:
    print("ü§ñ Fallback agent responding...")
    return {**state, "response": "I'm not sure how to answer that, but I'm learning!"}

# Aggregator
def route_aggregator(state: RouteState) -> RouteState:
    icon = {"news": "üóû", "weather": "‚òÄÔ∏è", "fallback": "ü§ñ"}.get(state["route"], "‚ùì")
    result = f"üì¨ Query: \"{state['query']}\"\n{icon} Agent says: {state['response']}"
    return {**state, "final_output": result}

In [36]:
builder = StateGraph(RouteState)

# Add nodes
builder.add_node("router", RunnableLambda(router_node))
builder.add_node("news", RunnableLambda(news_agent))
builder.add_node("weather", RunnableLambda(weather_agent))
builder.add_node("fallback", RunnableLambda(fallback_agent))
builder.add_node("aggregator", RunnableLambda(route_aggregator))

# Entry + conditional routing
builder.set_entry_point("router")
builder.add_conditional_edges("router", lambda state: state["route"], {
    "news": "news",
    "weather": "weather",
    "fallback": "fallback"
})

# Chain each agent to the aggregator
builder.add_edge("news", "aggregator")
builder.add_edge("weather", "aggregator")
builder.add_edge("fallback", "aggregator")
builder.add_edge("aggregator", END)

graph = builder.compile()

In [37]:
# Try different queries here:
test_query = "What's the weather like today?"

input_state = {
    "query": test_query,
    "route": "",
    "response": "",
    "final_output": ""
}

result = graph.invoke(input_state)

print("\nüßæ Final Output:\n")
print(result["final_output"])

üß≠ Routing based on query...
‚òÄÔ∏è Weather agent responding...

üßæ Final Output:

üì¨ Query: "What's the weather like today?"
‚òÄÔ∏è Agent says: It's sunny and 24¬∞C in your area.


#Section 6: Autonomous Agent Loop Pattern
üß† Goal:
Simulate an agent that:

Picks a tool based on the task

Uses the tool

Gets feedback

Repeats until goal is complete


In [43]:
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda

import os
os.environ["LANGCHAIN_SESSION_NAME"] = "AutonomousAgent_Rewriter_Loop"

# Shared state
class RewriteState(TypedDict):
    text: str
    rewritten: str
    word_count: int
    done: bool
    final_output: str

In [51]:
# Rewriter tool (mock LLM improving conciseness)
def rewrite_tool(state: RewriteState) -> RewriteState:
    print("‚úçÔ∏è Rewriting text...")

    original = state["rewritten"] or state["text"]

    # Fake smarter rewriter (replace long phrases with short ones)
    replacements = {
        "LangGraph allows for": "LangGraph enables",
        "structured LLM workflows": "LLM pipelines",
        "using graphs to coordinate decisions, tools, and memory":
            "via graphs coordinating decisions"
    }

    rewritten = original
    for long, short in replacements.items():
        rewritten = rewritten.replace(long, short)

    return {**state, "rewritten": rewritten}

# Evaluator tool: checks if rewritten version is short enough
def evaluator_tool(state: RewriteState) -> RewriteState:
    rewritten = state["rewritten"]
    count = len(rewritten.split())
    print(f"üìè Evaluating: {count} words.")
    done = count <= 20
    output = (
        f"‚úÖ Final rewritten version:\n{rewritten}"
        if done else
        f"üîÅ Still too long ({count} words). Retrying..."
    )
    return {
        **state,
        "word_count": count,
        "done": done,
        "final_output": output if done else ""
    }

# Decision controller: loop until goal met
def control_next(state: RewriteState):
    print("üß† Agent controller deciding next step...")
    return "__end__" if state["done"] else "rewrite"

In [52]:
builder = StateGraph(RewriteState)
builder.add_node("rewrite", RunnableLambda(rewrite_tool))
builder.add_node("evaluate", RunnableLambda(evaluator_tool))

builder.set_entry_point("rewrite")
builder.add_edge("rewrite", "evaluate")
builder.add_conditional_edges("evaluate", control_next, {
    "rewrite": "rewrite",
    "__end__": END
})

graph = builder.compile()

In [53]:
input_state = {
    "text": "LangGraph allows for structured LLM workflows using graphs to coordinate decisions, tools, and memory.",
    "rewritten": "",
    "word_count": 0,
    "done": False,
    "final_output": ""
}

result = graph.invoke(input_state)

print("\nüßæ Final Output:\n")
print(result["final_output"])

‚úçÔ∏è Rewriting text...
üìè Evaluating: 8 words.
üß† Agent controller deciding next step...

üßæ Final Output:

‚úÖ Final rewritten version:
LangGraph enables LLM pipelines via graphs coordinating decisions.
