## Enhanced Sidekick with Adaptive Clarification

### Personal Co-worker with Planner Node

In [None]:
import os
import uuid
import asyncio
import requests
import nest_asyncio
from typing import Annotated

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from dotenv import load_dotenv
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from typing import List, Any, Optional, Dict
from pydantic import BaseModel, Field
from datetime import datetime
from playwright.async_api import async_playwright
from langchain_community.agent_toolkits import PlayWrightBrowserToolkit
from langchain.agents import Tool
from langchain_community.agent_toolkits import FileManagementToolkit
from langchain_community.tools.wikipedia.tool import WikipediaQueryRun
from langchain_experimental.tools import PythonREPLTool
from langchain_community.utilities import GoogleSerperAPIWrapper
from langchain_community.utilities.wikipedia import WikipediaAPIWrapper
import gradio as gr

In [None]:
load_dotenv(override=True)
nest_asyncio.apply()

### Define State / Structured Outputs

In [None]:
class State(TypedDict):
    messages: Annotated[List[Any], add_messages]
    success_criteria: str
    feedback_on_work: Optional[str]
    success_criteria_met: bool
    user_input_needed: bool
    clarifying_questions_asked: int
    planning_complete: bool


class EvaluatorOutput(BaseModel):
    feedback: str = Field(description="Feedback on the assistant's response")
    success_criteria_met: bool = Field(description="Whether the success criteria have been met")
    user_input_needed: bool = Field(
        description="True if more input is needed from the user, or clarifications, or the assistant is stuck"
    )


class PlannerOutput(BaseModel):
    clarification_question: Optional[str] = Field(
        description="A single clarifying question to ask the user, or None if everything is clear"
    )
    ready_to_proceed: bool = Field(
        description="True if the request and success criteria are clear enough to proceed to the worker"
    )
    reasoning: str = Field(
        description="Brief explanation of why clarification is needed or why ready to proceed"
    )

### Sidekick Class with Planner, Worker, and Evaluator

In [None]:
class Sidekick:
    def __init__(self):
        self.worker_llm_with_tools = None
        self.evaluator_llm_with_output = None
        self.planner_llm_with_output = None
        self.tools = None
        self.graph = None
        self.sidekick_id = str(uuid.uuid4())
        self.memory = MemorySaver()
        self.browser = None
        self.playwright = None

    async def setup(self):
        self.tools = await self.get_all_tools()
        worker_llm = ChatOpenAI(model="gpt-4o-mini")
        self.worker_llm_with_tools = worker_llm.bind_tools(self.tools)
        evaluator_llm = ChatOpenAI(model="gpt-4o-mini")
        self.evaluator_llm_with_output = evaluator_llm.with_structured_output(EvaluatorOutput)
        planner_llm = ChatOpenAI(model="gpt-4o-mini")
        self.planner_llm_with_output = planner_llm.with_structured_output(PlannerOutput)
        await self.build_graph()

    async def get_all_tools(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(headless=False)
        toolkit = PlayWrightBrowserToolkit.from_browser(async_browser=self.browser)
        browser_tools = toolkit.get_tools()

        pushover_token = os.getenv("PUSHOVER_TOKEN")
        pushover_user = os.getenv("PUSHOVER_USER")
        pushover_url = "https://api.pushover.net/1/messages.json"

        def push(text: str):
            print(f"[PUSH] Attempting to send notification: {text[:50]}...")
            if not pushover_token or not pushover_user:
                error_msg = "Notification not configured - missing PUSHOVER_TOKEN or PUSHOVER_USER in environment"
                print(f"[PUSH ERROR] {error_msg}")
                return error_msg
            print("[PUSH] Sending to Pushover API...")
            try:
                response = requests.post(pushover_url, data={"token": pushover_token, "user": pushover_user, "message": text})
                print(f"[PUSH] Response status: {response.status_code}")
                if response.status_code == 200:
                    success_msg = "Notification sent successfully"
                    print(f"[PUSH SUCCESS] {success_msg}")
                    return success_msg
                error_msg = f"Notification failed: {response.status_code} - {response.text}"
                print(f"[PUSH ERROR] {error_msg}")
                return error_msg
            except Exception as e:
                error_msg = f"Notification exception: {str(e)}"
                print(f"[PUSH EXCEPTION] {error_msg}")
                return error_msg

        push_tool = Tool(
            name="send_push_notification",
            func=push,
            description="Use this tool when you want to send a push notification"
        )

        file_toolkit = FileManagementToolkit(root_dir="sandbox")
        file_tools = file_toolkit.get_tools()

        serper = GoogleSerperAPIWrapper()
        search_tool = Tool(
            name="search",
            func=serper.run,
            description="Use this tool when you want to get the results of an online web search"
        )

        wikipedia = WikipediaAPIWrapper()
        wiki_tool = WikipediaQueryRun(api_wrapper=wikipedia)

        python_repl = PythonREPLTool()

        mailgun_api_key = os.getenv("MAILGUN_API_KEY")
        mailgun_domain = os.getenv("MAILGUN_DOMAIN")
        mailgun_from = os.getenv("MAILGUN_FROM_EMAIL")

        def send_email(to_email: str, subject: str, body: str):
            print(f"[EMAIL] Attempting to send email to: {to_email}")
            print(f"[EMAIL] Subject: {subject}")
            if not mailgun_api_key or not mailgun_domain:
                error_msg = "Email not configured - missing MAILGUN_API_KEY or MAILGUN_DOMAIN in environment"
                print(f"[EMAIL ERROR] {error_msg}")
                return error_msg
            print("[EMAIL] Sending via Mailgun API...")
            try:
                url = f"https://api.mailgun.net/v3/{mailgun_domain}/messages"
                response = requests.post(
                    url,
                    auth=("api", mailgun_api_key),
                    data={"from": mailgun_from, "to": to_email, "subject": subject, "text": body}
                )
                print(f"[EMAIL] Response status: {response.status_code}")
                if response.status_code == 200:
                    success_msg = "Email sent successfully"
                    print(f"[EMAIL SUCCESS] {success_msg}")
                    return success_msg
                error_msg = f"Email failed: {response.status_code} - {response.text}"
                print(f"[EMAIL ERROR] {error_msg}")
                return error_msg
            except Exception as e:
                error_msg = f"Email exception: {str(e)}"
                print(f"[EMAIL EXCEPTION] {error_msg}")
                return error_msg

        email_tool = Tool(
            name="send_email",
            func=send_email,
            description="Use this tool to send an email. Input should be: to_email, subject, body separated by ||| like: user@example.com|||Subject|||Body text"
        )

        return browser_tools + file_tools + [push_tool, search_tool, python_repl, wiki_tool, email_tool]

    def planner(self, state: State) -> Dict[str, Any]:
        if state.get("planning_complete"):
            return {"planning_complete": True}

        questions_asked = state.get("clarifying_questions_asked", 0)
        max_questions = 3

        system_message = f"""You are a planning agent that clarifies user requests before work begins.
Your job is to ensure the user's intention and success criteria are crystal clear.

If anything is ambiguous or unclear about the request or success criteria, ask ONE clarifying question in a conversational, friendly tone.
Once you're satisfied everything is clear, set ready_to_proceed to True.

You have asked {questions_asked} out of a maximum of {max_questions} clarifying questions.
If you've reached the maximum, you must proceed even if things aren't perfectly clear.

Guidelines:
- Ask questions conversationally, not in an itemized list
- Focus on understanding the user's true intent
- Consider what information would help the worker succeed
- If the request is already clear, proceed immediately

Success criteria provided by user: {state["success_criteria"]}
"""

        messages = state["messages"]
        found_system_message = False
        for message in messages:
            if isinstance(message, SystemMessage):
                message.content = system_message
                found_system_message = True

        if not found_system_message:
            messages = [SystemMessage(content=system_message)] + messages

        result = self.planner_llm_with_output.invoke(messages)

        if result.ready_to_proceed or questions_asked >= max_questions:
            return {"planning_complete": True}

        if result.clarification_question:
            return {
                "messages": [AIMessage(content=result.clarification_question)],
                "clarifying_questions_asked": questions_asked + 1,
            }

        return {"planning_complete": True}

    def planner_router(self, state: State) -> str:
        if state.get("planning_complete"):
            return "worker"
        return "END"

    def worker(self, state: State) -> Dict[str, Any]:
        system_message = f"""You are a helpful assistant that can use tools to complete tasks.
You keep working on a task until either you have a question or clarification for the user, or the success criteria is met.
You have many tools to help you, including tools to browse the internet, navigating and retrieving web pages.
You have a tool to run python code, but note that you would need to include a print() statement if you wanted to receive output.
You can send emails using the send_email tool.
The current date and time is {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}

This is the success criteria:
{state["success_criteria"]}
You should reply either with a question for the user about this assignment, or with your final response.
If you have a question for the user, you need to reply by clearly stating your question. An example might be:

Question: please clarify whether you want a summary or a detailed answer

If you've finished, reply with the final answer, and don't ask a question; simply reply with the answer.
"""

        if state.get("feedback_on_work"):
            system_message += f"""
Previously you thought you completed the assignment, but your reply was rejected because the success criteria was not met.
Here is the feedback on why this was rejected:
{state["feedback_on_work"]}
With this feedback, please continue the assignment, ensuring that you meet the success criteria or have a question for the user."""

        found_system_message = False
        messages = state["messages"]
        for message in messages:
            if isinstance(message, SystemMessage):
                message.content = system_message
                found_system_message = True

        if not found_system_message:
            messages = [SystemMessage(content=system_message)] + messages

        response = self.worker_llm_with_tools.invoke(messages)

        return {"messages": [response]}

    def worker_router(self, state: State) -> str:
        last_message = state["messages"][-1]

        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            return "tools"
        return "evaluator"

    def format_conversation(self, messages: List[Any]) -> str:
        conversation = "Conversation history:\n\n"
        for message in messages:
            if isinstance(message, HumanMessage):
                conversation += f"User: {message.content}\n"
            elif isinstance(message, AIMessage):
                text = message.content or "[Tools use]"
                conversation += f"Assistant: {text}\n"
        return conversation

    def evaluator(self, state: State) -> State:
        last_response = state["messages"][-1].content

        system_message = """You are an evaluator that determines if a task has been completed successfully by an Assistant.
Be practical and generous in your evaluation. If the Assistant has made a genuine attempt and completed the main objective, accept it.
Only reject if critical parts are clearly missing or the Assistant is asking questions."""

        user_message = f"""You are evaluating a conversation between the User and Assistant.

The entire conversation history:
{self.format_conversation(state["messages"])}

Success criteria: {state["success_criteria"]}

Final response from Assistant: {last_response}

Evaluation guidelines:
- If files were created using tools, assume they were successful
- If notifications were sent using tools, assume they were successful  
- If the Assistant completed the main task, accept it even if minor details are imperfect
- Only mark user_input_needed=True if the Assistant explicitly asks a question
- Give the Assistant the benefit of the doubt on tool usage
- Accept reasonable efforts that meet the core success criteria
"""
        if state["feedback_on_work"]:
            user_message += f"\nPrevious feedback: {state['feedback_on_work']}\nIf the Assistant is stuck in a loop or repeating, mark user_input_needed=True."

        evaluator_messages = [
            SystemMessage(content=system_message),
            HumanMessage(content=user_message),
        ]

        eval_result = self.evaluator_llm_with_output.invoke(evaluator_messages)
        new_state = {
            "messages": [{"role": "assistant", "content": f"Evaluator Feedback: {eval_result.feedback}"}],
            "feedback_on_work": eval_result.feedback,
            "success_criteria_met": eval_result.success_criteria_met,
            "user_input_needed": eval_result.user_input_needed,
        }
        return new_state

    def route_based_on_evaluation(self, state: State) -> str:
        if state["success_criteria_met"] or state["user_input_needed"]:
            return "END"
        return "worker"

    async def build_graph(self):
        graph_builder = StateGraph(State)

        graph_builder.add_node("planner", self.planner)
        graph_builder.add_node("worker", self.worker)
        graph_builder.add_node("tools", ToolNode(tools=self.tools))
        graph_builder.add_node("evaluator", self.evaluator)

        graph_builder.add_conditional_edges("planner", self.planner_router, {"worker": "worker", "END": END})
        graph_builder.add_conditional_edges("worker", self.worker_router, {"tools": "tools", "evaluator": "evaluator"})
        graph_builder.add_edge("tools", "worker")
        graph_builder.add_conditional_edges("evaluator", self.route_based_on_evaluation, {"worker": "worker", "END": END})
        graph_builder.add_edge(START, "planner")

        self.graph = graph_builder.compile(checkpointer=self.memory)

    async def run_superstep(self, message, success_criteria, history):
        config = {
            "configurable": {"thread_id": self.sidekick_id},
            "recursion_limit": 50
        }

        state = {
            "messages": message,
            "success_criteria": success_criteria or "The answer should be clear and accurate",
            "feedback_on_work": None,
            "success_criteria_met": False,
            "user_input_needed": False,
            "clarifying_questions_asked": 0,
            "planning_complete": False,
        }
        result = await self.graph.ainvoke(state, config=config)
        user = {"role": "user", "content": message}
        reply = {"role": "assistant", "content": result["messages"][-2].content}
        feedback = {"role": "assistant", "content": result["messages"][-1].content}
        return history + [user, reply, feedback]

    def cleanup(self):
        if self.browser:
            try:
                loop = asyncio.get_running_loop()
                loop.create_task(self.browser.close())
                if self.playwright:
                    loop.create_task(self.playwright.stop())
            except RuntimeError:
                asyncio.run(self.browser.close())
                if self.playwright:
                    asyncio.run(self.playwright.stop())

### Initialize Sidekick and Gradio Interface

In [None]:
sidekick = None


async def init_sidekick():
    global sidekick
    sidekick = Sidekick()
    await sidekick.setup()


def make_thread_id():
    return str(uuid.uuid4())


async def process_message(message, success_criteria, history, thread):
    global sidekick
    if not sidekick:
        await init_sidekick()
    return await sidekick.run_superstep(message, success_criteria, history)


async def reset():
    return "", "", None, make_thread_id()

### Launch Sidekick UI

In [None]:
with gr.Blocks(theme=gr.themes.Default(primary_hue="emerald")) as demo:
    gr.Markdown("## Sidekick Personal Co-worker")
    thread = gr.State(make_thread_id())

    with gr.Row():
        chatbot = gr.Chatbot(label="Sidekick", height=300, type="messages")
    with gr.Group():
        with gr.Row():
            message = gr.Textbox(show_label=False, placeholder="Your request to your sidekick")
        with gr.Row():
            success_criteria = gr.Textbox(show_label=False, placeholder="What are your success criteria?")
    with gr.Row():
        reset_button = gr.Button("Reset", variant="stop")
        go_button = gr.Button("Go!", variant="primary")
    message.submit(process_message, [message, success_criteria, chatbot, thread], [chatbot])
    success_criteria.submit(process_message, [message, success_criteria, chatbot, thread], [chatbot])
    go_button.click(process_message, [message, success_criteria, chatbot, thread], [chatbot])
    reset_button.click(reset, [], [message, success_criteria, chatbot, thread])


demo.launch(debug=True)