# Week 5: Advanced LangGraph - Multi-Agent Systems

## üìö Session Overview

**Duration:** 2 hours  
**Week:** 5  
**Instructor-Led Session**

---

## üéØ Learning Objectives

By the end of this session, you will be able to:
1. Build multi-agent systems with LangGraph
2. Implement human-in-the-loop workflows
3. Create sub-graphs and compose complex workflows
4. Use tool calling within graphs
5. Implement persistent state with checkpoints
6. Handle errors and implement retry logic

---

## üìã Prerequisites

- ‚úÖ Completed Week 1-4
- ‚úÖ Strong understanding of LangGraph basics
- ‚úÖ Comfortable with state management
- ‚úÖ Understanding of conditional routing

---

## ‚è±Ô∏è Estimated Time

- Setup & Review: 10 minutes
- Section 1 (Multi-Agent Systems): 30 minutes
- Section 2 (Human-in-the-Loop): 25 minutes
- Section 3 (Tool Calling): 25 minutes
- Section 4 (Advanced Patterns): 25 minutes
- Wrap-up & Q&A: 5 minutes

---

## üîß Setup

In [None]:
# Import required libraries
import os
from dotenv import load_dotenv
from typing import TypedDict, Annotated, Sequence, Literal
import operator
from datetime import datetime

# LangChain imports
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, BaseMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_core.output_parsers import StrOutputParser

# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver

# Load environment variables
load_dotenv()

# Initialize LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

print("‚úÖ Setup complete!")

---

# Section 1: Multi-Agent Systems (30 minutes)

## What are Multi-Agent Systems?

**Multi-agent systems** use multiple specialized agents working together to solve complex problems.

### Why Multi-Agent?

**Single Agent Limitations:**
- ‚ùå Jack of all trades, master of none
- ‚ùå Complex system prompts
- ‚ùå Hard to debug and maintain
- ‚ùå Limited by single model's capabilities

**Multi-Agent Benefits:**
- ‚úÖ Specialized agents for specific tasks
- ‚úÖ Modular and maintainable
- ‚úÖ Easier to test individual agents
- ‚úÖ Can use different models per agent
- ‚úÖ Parallel execution where possible

---

## Common Multi-Agent Patterns

### 1. **Sequential Handoff**
```
Agent A ‚Üí Agent B ‚Üí Agent C ‚Üí Result
```
Each agent passes work to the next specialist.

### 2. **Supervisor Pattern**
```
          Supervisor
         /     |     \
    Agent A  Agent B  Agent C
```
Supervisor routes tasks to appropriate workers.

### 3. **Collaborative**
```
Agent A ‚Üê‚Üí Agent B ‚Üê‚Üí Agent C
```
Agents communicate back and forth.

### 4. **Hierarchical**
```
     Manager Agent
         |
    Team Lead Agent
      /        \
  Worker A   Worker B
```
Multiple levels of coordination.

---

## 1.1: Building a Research Team (Supervisor Pattern)

Let's build a research team with:
- **Supervisor:** Routes tasks to specialists
- **Researcher:** Gathers information
- **Analyst:** Analyzes findings
- **Writer:** Creates final report

In [None]:
# Define state for research team
class ResearchState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    task: str
    research_findings: str
    analysis: str
    final_report: str
    next_agent: str

print("‚úÖ Research state defined")

In [None]:
# Create specialized agents

def supervisor(state: ResearchState) -> ResearchState:
    """
    Supervisor decides which agent should act next.
    """
    messages = state["messages"]
    
    # Determine next step based on current state
    if not state.get("research_findings"):
        next_agent = "researcher"
    elif not state.get("analysis"):
        next_agent = "analyst"
    elif not state.get("final_report"):
        next_agent = "writer"
    else:
        next_agent = "FINISH"
    
    print(f"üëî Supervisor: Routing to {next_agent}")
    
    return {
        "next_agent": next_agent,
        "messages": [SystemMessage(content=f"Routing to {next_agent}")]
    }

def researcher(state: ResearchState) -> ResearchState:
    """
    Researcher gathers information on the task.
    """
    task = state["task"]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a thorough researcher. Gather key information about the topic."),
        ("human", "Research this topic: {task}")
    ])
    
    chain = prompt | llm | StrOutputParser()
    findings = chain.invoke({"task": task})
    
    print(f"üî¨ Researcher: Completed research")
    
    return {
        "research_findings": findings,
        "messages": [AIMessage(content=f"Research complete: {findings[:100]}...")]
    }

def analyst(state: ResearchState) -> ResearchState:
    """
    Analyst analyzes the research findings.
    """
    findings = state["research_findings"]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an analytical expert. Analyze the research and draw insights."),
        ("human", "Analyze these findings: {findings}")
    ])
    
    chain = prompt | llm | StrOutputParser()
    analysis = chain.invoke({"findings": findings})
    
    print(f"üìä Analyst: Completed analysis")
    
    return {
        "analysis": analysis,
        "messages": [AIMessage(content=f"Analysis complete: {analysis[:100]}...")]
    }

def writer(state: ResearchState) -> ResearchState:
    """
    Writer creates the final report.
    """
    findings = state["research_findings"]
    analysis = state["analysis"]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a skilled technical writer. Create a clear, concise report."),
        ("human", "Create a report based on:\n\nFindings: {findings}\n\nAnalysis: {analysis}")
    ])
    
    chain = prompt | llm | StrOutputParser()
    report = chain.invoke({"findings": findings, "analysis": analysis})
    
    print(f"‚úçÔ∏è Writer: Completed report")
    
    return {
        "final_report": report,
        "messages": [AIMessage(content=f"Report complete: {report[:100]}...")]
    }

print("‚úÖ Agent functions created")

In [None]:
# Build the research team graph
research_workflow = StateGraph(ResearchState)

# Add agent nodes
research_workflow.add_node("supervisor", supervisor)
research_workflow.add_node("researcher", researcher)
research_workflow.add_node("analyst", analyst)
research_workflow.add_node("writer", writer)

# Set entry point
research_workflow.set_entry_point("supervisor")

# Define routing logic
def route_to_agent(state: ResearchState) -> str:
    """Route to the next agent based on supervisor decision."""
    return state["next_agent"]

# Add conditional edges from supervisor
research_workflow.add_conditional_edges(
    "supervisor",
    route_to_agent,
    {
        "researcher": "researcher",
        "analyst": "analyst",
        "writer": "writer",
        "FINISH": END
    }
)

# All agents return to supervisor
research_workflow.add_edge("researcher", "supervisor")
research_workflow.add_edge("analyst", "supervisor")
research_workflow.add_edge("writer", "supervisor")

# Compile
research_app = research_workflow.compile()

print("‚úÖ Research team graph created")

In [None]:
# Test the research team
initial_state = {
    "task": "The impact of artificial intelligence on healthcare",
    "messages": [],
    "research_findings": "",
    "analysis": "",
    "final_report": "",
    "next_agent": ""
}

print("üöÄ Starting research team workflow...\n")
print("="*70)

result = research_app.invoke(initial_state)

print("\n" + "="*70)
print("\nüìÑ Final Report:")
print(result["final_report"])

---

# Section 2: Human-in-the-Loop (25 minutes)

## What is Human-in-the-Loop?

**Human-in-the-loop (HITL)** allows humans to interact with the graph during execution.

### Use Cases:
- ‚úÖ Approval workflows (content moderation, financial decisions)
- ‚úÖ Quality control (review AI outputs before proceeding)
- ‚úÖ Collaborative tasks (AI suggests, human decides)
- ‚úÖ Sensitive operations (legal, medical, financial)
- ‚úÖ Learning systems (human feedback improves AI)

---

## How HITL Works in LangGraph

1. Graph pauses at specific nodes
2. Waits for human input/approval
3. Resumes with human feedback
4. Continues to completion

**Key Component:** Checkpointing (state persistence)

---

## 2.1: Content Approval Workflow

In [None]:
# Define state for approval workflow
class ApprovalState(TypedDict):
    content: str
    draft: str
    feedback: str
    approved: bool
    revision_count: int

# Generate content draft
def generate_draft(state: ApprovalState) -> ApprovalState:
    """Generate initial content draft."""
    content_request = state["content"]
    feedback = state.get("feedback", "")
    revision = state.get("revision_count", 0)
    
    if revision == 0:
        prompt = f"Create content about: {content_request}"
    else:
        prompt = f"Revise this content based on feedback.\n\nOriginal: {state.get('draft')}\n\nFeedback: {feedback}"
    
    draft = llm.invoke(prompt).content
    
    print(f"üìù Generated draft (revision {revision + 1})")
    print(f"Draft preview: {draft[:100]}...\n")
    
    return {
        "draft": draft,
        "revision_count": revision + 1
    }

# Human review node
def human_review(state: ApprovalState) -> ApprovalState:
    """
    Wait for human approval.
    This node will pause execution.
    """
    print("‚è∏Ô∏è Paused for human review...")
    # In production, this would integrate with a review UI
    # For demo, we'll simulate approval
    return state

# Check approval decision
def check_approval(state: ApprovalState) -> str:
    """Route based on approval decision."""
    if state.get("approved", False):
        return "approved"
    else:
        return "revise"

# Finalize
def finalize(state: ApprovalState) -> ApprovalState:
    """Finalize approved content."""
    print("‚úÖ Content approved and finalized!")
    return state

print("‚úÖ Approval workflow functions created")

In [None]:
# Build approval workflow with checkpointing
memory = MemorySaver()

approval_workflow = StateGraph(ApprovalState)

# Add nodes
approval_workflow.add_node("generate", generate_draft)
approval_workflow.add_node("review", human_review)
approval_workflow.add_node("finalize", finalize)

# Set entry
approval_workflow.set_entry_point("generate")

# Add edges
approval_workflow.add_edge("generate", "review")

# Conditional routing from review
approval_workflow.add_conditional_edges(
    "review",
    check_approval,
    {
        "approved": "finalize",
        "revise": "generate"  # Loop back
    }
)

approval_workflow.add_edge("finalize", END)

# Compile with checkpointer (enables pausing/resuming)
approval_app = approval_workflow.compile(
    checkpointer=memory,
    interrupt_before=["review"]  # Pause before review node
)

print("‚úÖ Approval workflow created with HITL")

In [None]:
# Demonstrate HITL workflow
config = {"configurable": {"thread_id": "approval_1"}}

initial_state = {
    "content": "Write a blog post about Python programming best practices",
    "draft": "",
    "feedback": "",
    "approved": False,
    "revision_count": 0
}

print("üöÄ Starting approval workflow...\n")
print("="*70)

# Step 1: Generate draft (will pause at review)
result = approval_app.invoke(initial_state, config=config)

print("\nüìã Current draft:")
print(result["draft"][:200] + "...")
print("\n" + "="*70)

# Simulate human decision: Request revision
print("\nüë§ Human: Requesting revision - make it more beginner-friendly")

# Update state with feedback
result["feedback"] = "Make this more beginner-friendly with simpler examples"
result["approved"] = False

# Step 2: Resume with feedback
print("\nüîÑ Resuming workflow with feedback...\n")
result = approval_app.invoke(result, config=config)

print("\nüìã Revised draft:")
print(result["draft"][:200] + "...")
print("\n" + "="*70)

# Simulate human approval
print("\nüë§ Human: Approving content")
result["approved"] = True

# Step 3: Finalize
final_result = approval_app.invoke(result, config=config)

print(f"\n‚úÖ Final revision count: {final_result['revision_count']}")

---

# Section 3: Tool Calling in Graphs (25 minutes)

## What is Tool Calling?

**Tools** allow LLMs to interact with external systems:
- Call APIs
- Query databases
- Perform calculations
- Search the web
- Execute code

---

## LangChain Tool Decorator

Use `@tool` to create tools from Python functions:

```python
@tool
def my_tool(param: str) -> str:
    """Tool description for the LLM."""
    return result
```

---

## 3.1: Create Custom Tools

In [None]:
# Define custom tools
@tool
def calculate(expression: str) -> str:
    """Evaluate a mathematical expression safely."""
    try:
        # Simple eval (in production, use safer alternatives)
        result = eval(expression, {"__builtins__": {}}, {})
        return f"Result: {result}"
    except Exception as e:
        return f"Error: {str(e)}"

@tool
def get_current_time() -> str:
    """Get the current date and time."""
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

@tool
def search_database(query: str) -> str:
    """Search a mock database for information."""
    # Mock database
    database = {
        "python": "Python is a high-level programming language.",
        "javascript": "JavaScript is a scripting language for web development.",
        "ai": "Artificial Intelligence enables machines to simulate human intelligence."
    }
    
    query_lower = query.lower()
    for key, value in database.items():
        if key in query_lower:
            return value
    
    return "No information found in database."

# List of tools
tools = [calculate, get_current_time, search_database]

print("‚úÖ Tools created:")
for tool in tools:
    print(f"  - {tool.name}: {tool.description}")

## 3.2: Build Agent with Tools

In [None]:
from langchain_core.messages import ToolMessage

# Define agent state
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]

# Bind tools to LLM
llm_with_tools = llm.bind_tools(tools)

# Agent node
def agent(state: AgentState) -> AgentState:
    """
    Agent decides whether to use tools or respond.
    """
    messages = state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}

# Create tool node using LangGraph's prebuilt ToolNode
tool_node = ToolNode(tools)

# Routing function
def should_continue(state: AgentState) -> Literal["tools", "end"]:
    """
    Determine if tools should be called or if we're done.
    """
    messages = state["messages"]
    last_message = messages[-1]
    
    # If there are tool calls, route to tools
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    # Otherwise, end
    return "end"

# Build graph
tool_workflow = StateGraph(AgentState)

# Add nodes
tool_workflow.add_node("agent", agent)
tool_workflow.add_node("tools", tool_node)

# Set entry
tool_workflow.set_entry_point("agent")

# Add conditional edges
tool_workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "tools": "tools",
        "end": END
    }
)

# Tools return to agent
tool_workflow.add_edge("tools", "agent")

# Compile
tool_app = tool_workflow.compile()

print("‚úÖ Tool-calling agent created")

In [None]:
# Test tool-calling agent
test_queries = [
    "What is 25 * 4 + 10?",
    "What time is it right now?",
    "Tell me about Python"
]

for query in test_queries:
    print(f"\n{'='*70}")
    print(f"‚ùì Query: {query}")
    print()
    
    result = tool_app.invoke({
        "messages": [HumanMessage(content=query)]
    })
    
    # Get final response
    final_message = result["messages"][-1]
    print(f"üí° Response: {final_message.content}")

---

# Section 4: Advanced Patterns (25 minutes)

## 4.1: Sub-Graphs (Graph Composition)

**Sub-graphs** allow you to compose complex workflows from simpler graphs.

In [None]:
# Define state for data processing pipeline
class DataState(TypedDict):
    raw_data: str
    cleaned_data: str
    processed_data: str
    result: str

# Sub-graph 1: Data Cleaning
def clean_data(state: DataState) -> DataState:
    """Clean the raw data."""
    raw = state["raw_data"]
    cleaned = raw.strip().lower()
    print(f"üßπ Cleaned data: {cleaned}")
    return {"cleaned_data": cleaned}

def validate_data(state: DataState) -> DataState:
    """Validate cleaned data."""
    cleaned = state["cleaned_data"]
    # Simple validation
    if len(cleaned) > 0:
        print("‚úÖ Data validated")
    return state

# Create cleaning sub-graph
cleaning_workflow = StateGraph(DataState)
cleaning_workflow.add_node("clean", clean_data)
cleaning_workflow.add_node("validate", validate_data)
cleaning_workflow.set_entry_point("clean")
cleaning_workflow.add_edge("clean", "validate")
cleaning_workflow.add_edge("validate", END)

# Sub-graph 2: Data Processing
def transform_data(state: DataState) -> DataState:
    """Transform the cleaned data."""
    cleaned = state["cleaned_data"]
    transformed = cleaned.upper()
    print(f"üîÑ Transformed data: {transformed}")
    return {"processed_data": transformed}

def analyze_data(state: DataState) -> DataState:
    """Analyze processed data."""
    processed = state["processed_data"]
    result = f"Analysis: {len(processed)} characters"
    print(f"üìä {result}")
    return {"result": result}

# Create processing sub-graph
processing_workflow = StateGraph(DataState)
processing_workflow.add_node("transform", transform_data)
processing_workflow.add_node("analyze", analyze_data)
processing_workflow.set_entry_point("transform")
processing_workflow.add_edge("transform", "analyze")
processing_workflow.add_edge("analyze", END)

# Main graph that uses sub-graphs
main_workflow = StateGraph(DataState)

# Add sub-graphs as nodes
main_workflow.add_node("cleaning_pipeline", cleaning_workflow.compile())
main_workflow.add_node("processing_pipeline", processing_workflow.compile())

# Connect sub-graphs
main_workflow.set_entry_point("cleaning_pipeline")
main_workflow.add_edge("cleaning_pipeline", "processing_pipeline")
main_workflow.add_edge("processing_pipeline", END)

# Compile main graph
pipeline_app = main_workflow.compile()

print("‚úÖ Composed pipeline with sub-graphs created")

In [None]:
# Test composed pipeline
test_data = {
    "raw_data": "  Hello World  ",
    "cleaned_data": "",
    "processed_data": "",
    "result": ""
}

print("üöÄ Running composed pipeline...\n")
print("="*70)

result = pipeline_app.invoke(test_data)

print("\n" + "="*70)
print(f"\nüìä Final Result: {result['result']}")

## 4.2: Error Handling and Retry Logic

In [None]:
# Define state with error tracking
class RobustState(TypedDict):
    task: str
    result: str
    error_count: int
    max_retries: int
    error_message: str

# Simulated unreliable operation
def unreliable_operation(state: RobustState) -> RobustState:
    """
    Operation that might fail.
    """
    import random
    
    error_count = state.get("error_count", 0)
    
    # Simulate 50% failure rate
    if random.random() < 0.5 and error_count < 2:
        error_count += 1
        print(f"‚ùå Operation failed (attempt {error_count})")
        return {
            "error_count": error_count,
            "error_message": "Random failure occurred"
        }
    else:
        print("‚úÖ Operation succeeded")
        return {
            "result": f"Completed: {state['task']}",
            "error_message": ""
        }

# Check if should retry
def should_retry(state: RobustState) -> str:
    """Decide whether to retry or finish."""
    if state.get("result"):
        return "success"
    elif state.get("error_count", 0) >= state.get("max_retries", 3):
        return "max_retries"
    else:
        return "retry"

# Handle max retries
def handle_failure(state: RobustState) -> RobustState:
    """Handle permanent failure."""
    print("‚ö†Ô∏è Max retries exceeded")
    return {
        "result": f"Failed after {state['error_count']} attempts"
    }

# Build retry workflow
retry_workflow = StateGraph(RobustState)

# Add nodes
retry_workflow.add_node("operation", unreliable_operation)
retry_workflow.add_node("handle_failure", handle_failure)

# Set entry
retry_workflow.set_entry_point("operation")

# Add conditional edges
retry_workflow.add_conditional_edges(
    "operation",
    should_retry,
    {
        "success": END,
        "retry": "operation",  # Loop back
        "max_retries": "handle_failure"
    }
)

retry_workflow.add_edge("handle_failure", END)

# Compile
retry_app = retry_workflow.compile()

print("‚úÖ Retry workflow created")

In [None]:
# Test retry logic
test_state = {
    "task": "Process important data",
    "result": "",
    "error_count": 0,
    "max_retries": 3,
    "error_message": ""
}

print("üöÄ Testing retry logic...\n")
print("="*70)

result = retry_app.invoke(test_state)

print("\n" + "="*70)
print(f"\nüìä Result: {result['result']}")
print(f"Retry count: {result.get('error_count', 0)}")

---

# üéØ Summary & Key Takeaways

## What We Learned:

### 1. **Multi-Agent Systems**
- Supervisor pattern for coordinating specialists
- Sequential handoff between agents
- Benefits of specialized agents

### 2. **Human-in-the-Loop**
- Pausing execution with `interrupt_before`
- Using checkpointers for state persistence
- Resuming workflows with human feedback
- Approval and review workflows

### 3. **Tool Calling**
- Creating tools with `@tool` decorator
- Binding tools to LLMs
- Using ToolNode for execution
- Conditional routing with tool calls

### 4. **Advanced Patterns**
- Sub-graphs for modular workflows
- Error handling and retry logic
- Complex state management
- Graph composition

---

## üìù Next Steps:

### Exercises for This Week:

**Exercise 1 (Due Monday):** `02_exercise_customer_service_team.ipynb`
- Build multi-agent customer service system
- Implement supervisor pattern
- Add human escalation

**Exercise 2 (Due Friday):** `03_exercise_research_agent.ipynb`
- Create research agent with tools
- Implement web search and analysis
- Add quality checks and iterations

---

## ü§î Reflection Questions:

1. When should you use multi-agent vs single agent?
2. What are the trade-offs of human-in-the-loop?
3. How do tools extend LLM capabilities?
4. When is sub-graph composition useful?

---

## üìö Additional Resources:

- [LangGraph Multi-Agent Tutorial](https://langchain-ai.github.io/langgraph/tutorials/multi_agent/)
- [Human-in-the-Loop Guide](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/)
- [Tool Calling Documentation](https://python.langchain.com/docs/modules/agents/tools/)

---

**Next Week:** Final Project - Build Your Own AI Agent! üöÄ