# Multi-Agent Collaboration: Worker-Evaluator Pattern

This notebook implements a sophisticated two-agent system:
1. **Worker Agent**: Executes tasks using browser automation tools
2. **Evaluator Agent**: Validates output against success criteria, providing feedback loops

**Pattern:** This "worker + evaluator" architecture ensures quality control through iterative refinementâ€”the evaluator can reject unsatisfactory work and request revisions until success criteria are met.

**Use Cases:**
- Content generation with quality gates
- Autonomous task execution with validation
- Self-healing workflows

In [None]:
# Import dependencies
from typing import Annotated, TypedDict, List, Dict, Any, Optional
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langchain_community.agent_toolkits import PlayWrightBrowserToolkit
from langchain_community.tools.playwright.utils import create_async_playwright_browser
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import add_messages
from pydantic import BaseModel, Field
from IPython.display import Image, display
import gradio as gr
import uuid
import nest_asyncio
from dotenv import load_dotenv

In [None]:
# Initialize Environment
load_dotenv(override=True)
nest_asyncio.apply()

## Define State & Output Schemas

In [None]:
# Evaluator Structured Output
class EvaluatorOutput(BaseModel):
    feedback: str = Field(description="Feedback on the worker's response")
    success_criteria_met: bool = Field(description="Whether success criteria have been met")
    user_input_needed: bool = Field(description="True if more user input is required")

# State Schema
class State(TypedDict):
    messages: Annotated[List[Any], add_messages]
    success_criteria: str
    feedback_on_work: Optional[str]
    success_criteria_met: bool
    user_input_needed: bool

## Configure Tools & LLMs

In [None]:
# Playwright Browser Tools
async_browser = create_async_playwright_browser(headless=False)
toolkit = PlayWrightBrowserToolkit.from_browser(async_browser=async_browser)
tools = toolkit.get_tools()

# LLM Clients
worker_llm = ChatOpenAI(model="gpt-4o-mini")
worker_llm_with_tools = worker_llm.bind_tools(tools)

evaluator_llm = ChatOpenAI(model="gpt-4o-mini")
evaluator_llm_with_output = evaluator_llm.with_structured_output(EvaluatorOutput)

## Define Agent Nodes

In [None]:
# Worker Agent Node
def worker(state: State) -> Dict[str, Any]:
    """Executes the task with tools, incorporating evaluator feedback if rejected previously."""
    system_message = f"""You are a helpful assistant with browser automation tools.
Work until you complete the task, have a question for the user, or the success criteria is met.

Success Criteria:
{state['success_criteria']}

If you have a question, clearly state it. Example: "Question: Do you want a summary or detailed answer?"
If finished, provide the final answer without asking a question.
"""
    
    # Incorporate rejection feedback if present
    if state.get("feedback_on_work"):
        system_message += f"\n\nPrevious Rejection Feedback:\n{state['feedback_on_work']}\nPlease revise accordingly."
    
    # Update or add system message
    messages = state["messages"].copy()
    found_system = any(isinstance(m, SystemMessage) for m in messages)
    if found_system:
        for msg in messages:
            if isinstance(msg, SystemMessage):
                msg.content = system_message
    else:
        messages = [SystemMessage(content=system_message)] + messages
    
    response = worker_llm_with_tools.invoke(messages)
    return {"messages": [response]}

In [None]:
# Evaluator Agent Node
def evaluator(state: State) -> State:
    """Evaluates worker output against success criteria."""
    last_response = state["messages"][-1].content

    system_message = """You evaluate whether an assistant's response meets success criteria. 
Provide feedback and determine if: (1) criteria met, (2) user input needed."""

    conversation = format_conversation(state['messages'])
    user_message = f"""Conversation:
{conversation}

Success Criteria:
{state['success_criteria']}

Assistant's Final Response:
{last_response}

Provide feedback and decision.
"""

    evaluator_messages = [SystemMessage(content=system_message), HumanMessage(content=user_message)]
    eval_result = evaluator_llm_with_output.invoke(evaluator_messages)

    return {
        "messages": [{"role": "assistant", "content": f"Evaluator: {eval_result.feedback}"}],
        "feedback_on_work": eval_result.feedback,
        "success_criteria_met": eval_result.success_criteria_met,
        "user_input_needed": eval_result.user_input_needed
    }

def format_conversation(messages: List[Any]) -> str:
    """Formats message history for evaluator."""
    conversation = "Conversation History:\n"
    for message in messages:
        if isinstance(message, HumanMessage):
            conversation += f"User: {message.content}\n"
        elif isinstance(message, AIMessage):
            text = message.content or "[Tool Execution]"
            conversation += f"Assistant: {text}\n"
    return conversation

## Define Routing Logic

In [None]:
# Route from Worker
def worker_router(state: State) -> str:
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    else:
        return "evaluator"

# Route from Evaluator
def route_based_on_evaluation(state: State) -> str:
    if state["success_criteria_met"] or state["user_input_needed"]:
        return "END"
    else:
        return "worker"  # Retry loop

## Build & Compile Graph

In [None]:
# Build Graph
graph_builder = StateGraph(State)

# Add Nodes
graph_builder.add_node("worker", worker)
graph_builder.add_node("tools", ToolNode(tools=tools))
graph_builder.add_node("evaluator", evaluator)

# Add Edges
graph_builder.add_conditional_edges("worker", worker_router, {"tools": "tools", "evaluator": "evaluator"})
graph_builder.add_edge("tools", "worker")
graph_builder.add_conditional_edges("evaluator", route_based_on_evaluation, {"worker": "worker", "END": END})
graph_builder.add_edge(START, "worker")

# Compile
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))

## Launch Interactive UI

In [None]:
# Workflow Execution
def make_thread_id() -> str:
    return str(uuid.uuid4())

async def process_message(message, success_criteria, history, thread):
    config = {"configurable": {"thread_id": thread}}
    state = {
        "messages": message,
        "success_criteria": success_criteria,
        "feedback_on_work": None,
        "success_criteria_met": False,
        "user_input_needed": False
    }
    result = await graph.ainvoke(state, config=config)
    
    user_msg = {"role": "user", "content": message}
    reply = {"role": "assistant", "content": result["messages"][-2].content}
    feedback = {"role": "assistant", "content": result["messages"][-1].content}
    return history + [user_msg, reply, feedback]

async def reset():
    return "", "", None, make_thread_id()

# Gradio Interface
with gr.Blocks(theme=gr.themes.Default(primary_hue="emerald")) as demo:
    gr.Markdown("## Sidekick: Autonomous Co-Worker")
    thread = gr.State(make_thread_id())
    
    chatbot = gr.Chatbot(label="Sidekick", height=300, type="messages")
    message = gr.Textbox(placeholder="Your request")
    success_criteria = gr.Textbox(placeholder="Success criteria")
    
    with gr.Row():
        reset_button = gr.Button("Reset", variant="stop")
        go_button = gr.Button("Go!", variant="primary")
    
    go_button.click(process_message, [message, success_criteria, chatbot, thread], [chatbot])
    reset_button.click(reset, [], [message, success_criteria, chatbot, thread])

demo.launch()