# Human-in-the-Loop (HITL) Agent Loop Workflow

In this notebook, we'll extend the basic agent loop workflow to include a human-in-the-loop (HITL) workflow. This will allow us to specify which tools require human review and approval before they are executed.

In [None]:
%pip install llama-index

In [1]:
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."

## Agent Configuration

In order to capture which tools require human review and approval, we'll need to extend our agent configuration to include a list of tools that require HITL.

In [2]:
from pydantic import BaseModel, Field,ConfigDict

from llama_index.core.tools import BaseTool

class AgentConfig(BaseModel):
    """Used to configure an agent."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    name: str
    description: str
    system_prompt: str | None = None
    tools: list[BaseTool] | None = None
    tools_requiring_human_confirmation: list[str] = Field(default_factory=list)

We can copy our previous agent configuration and extend it to include a list of tools that require HITL.

In [3]:
from llama_index.core.tools import FunctionTool

def add_two_numbers(a: int, b: int) -> int:
    """Used to add two numbers together."""
    return a + b

add_two_numbers_tool = FunctionTool.from_defaults(fn=add_two_numbers)

agent_config = AgentConfig(
    name="Addition Agent",
    description="Used to add two numbers together.",
    system_prompt="You are an agent that adds two numbers together. Do not help the user with anything else.",
    tools=[add_two_numbers_tool],
    tools_requiring_human_confirmation=["add_two_numbers"],
)

## Workflow Definition

With our agent configuration, we can define a workflow that uses the configuration to implement a basic agent loop with HITL.

This workflow will:
- initialize the global context with passed in parameters
- call an LLM with the system prompt and chat history
- parse the tool calls from the LLM response
  - if there are no tool calls, the workflow will stop
  - if there are tool calls, the workflow will execute the tool calls
    - if the tool call requires HITL, the workflow will emit an event to transfer control to a human
      - the human can approve or reject the tool call
    - collect the results of the tool calls
    - update the chat history with the tool call results
    - loop back and call the LLM again with the updated chat history

In [4]:
from llama_index.core.workflow import Event, Workflow, Context, StartEvent, StopEvent, step
from llama_index.core.workflow.events import InputRequiredEvent, HumanResponseEvent
from llama_index.core.llms import ChatMessage, LLM
from llama_index.core.tools import ToolSelection
from llama_index.llms.openai import OpenAI


class LLMCallEvent(Event):
    pass

class ToolCallEvent(Event):
    pass

class ToolCallResultEvent(Event):
    chat_message: ChatMessage

class ProgressEvent(Event):
    msg: str

# Our two new events!
class ToolRequestEvent(InputRequiredEvent):
    tool_name: str
    tool_id: str
    tool_kwargs: dict

class ToolApprovedEvent(HumanResponseEvent):
    tool_name: str
    tool_id: str
    tool_kwargs: dict
    approved: bool
    response: str | None = None
    

class HITLAgent(Workflow):

    @step
    async def setup(
        self, ctx: Context, ev: StartEvent
    ) -> LLMCallEvent:
        """Sets up the workflow, validates inputs, and stores them in the context."""
        agent_config = ev.get("agent_config")
        user_msg = ev.get("user_msg")
        llm: LLM = ev.get("llm", default=OpenAI(model="gpt-4o", temperature=0.3))
        chat_history = ev.get("chat_history", default=[])
        initial_state = ev.get("initial_state", default={})

        if (
            user_msg is None
            or llm is None
            or chat_history is None
        ):
            raise ValueError(
                "User message, llm, and chat_history are required!"
            )

        if not llm.metadata.is_function_calling_model:
            raise ValueError("LLM must be a function calling model!")

        await ctx.set("agent_config", agent_config)
        await ctx.set("llm", llm)

        chat_history.append(ChatMessage(role="user", content=user_msg))
        await ctx.set("chat_history", chat_history)

        await ctx.set("user_state", initial_state)

        return LLMCallEvent()

    @step
    async def speak_with_agent(
        self, ctx: Context, ev: LLMCallEvent
    ) -> ToolCallEvent | StopEvent:
        """Speaks with the active sub-agent and handles tool calls (if any)."""
        # Setup the agent 
        agent_config: AgentConfig = await ctx.get("agent_config")
        chat_history = await ctx.get("chat_history")
        llm = await ctx.get("llm")

        user_state = await ctx.get("user_state")
        user_state_str = "\n".join([f"{k}: {v}" for k, v in user_state.items()])
        system_prompt = (
            agent_config.system_prompt.strip()
            + f"\n\nHere is the current user state:\n{user_state_str}"
        )

        llm_input = [ChatMessage(role="system", content=system_prompt)] + chat_history
        tools = agent_config.tools

        response = await llm.achat_with_tools(tools, chat_history=llm_input)

        tool_calls: list[ToolSelection] = llm.get_tool_calls_from_response(
            response, error_on_no_tool_call=False
        )
        if len(tool_calls) == 0:
            chat_history.append(response.message)
            await ctx.set("chat_history", chat_history)
            return StopEvent(
                result={
                    "response": response.message.content,
                    "chat_history": chat_history,
                }
            )

        await ctx.set("num_tool_calls", len(tool_calls))

        # New logic for HITL
        for tool_call in tool_calls:
            if tool_call.tool_name in agent_config.tools_requiring_human_confirmation:
                ctx.write_event_to_stream(
                    ToolRequestEvent(
                        prefix=f"Tool {tool_call.tool_name} requires human approval.",
                        tool_name=tool_call.tool_name,
                        tool_kwargs=tool_call.tool_kwargs,
                        tool_id=tool_call.tool_id,
                    )
                )
            else:
                ctx.send_event(
                    ToolCallEvent(tool_call=tool_call, tools=agent_config.tools)
                )

        chat_history.append(response.message)
        await ctx.set("chat_history", chat_history)
    
    @step
    async def handle_tool_approval(
        self, ctx: Context, ev: ToolApprovedEvent
    ) -> ToolCallEvent | ToolCallResultEvent:
        """Handles the approval or rejection of a tool call."""
        if ev.approved:
            agent_config = await ctx.get("agent_config")
            return ToolCallEvent(
                tools=agent_config.tools,
                tool_call=ToolSelection(
                    tool_id=ev.tool_id,
                    tool_name=ev.tool_name,
                    tool_kwargs=ev.tool_kwargs,
                ),
            )
        else:
            return ToolCallResultEvent(
                chat_message=ChatMessage(
                    role="tool",
                    content=ev.response or "Tool call was not approved.",
                )
            )

    @step(num_workers=4)
    async def handle_tool_call(
        self, ctx: Context, ev: ToolCallEvent
    ) -> ToolCallResultEvent:
        """Handles the execution of a tool call."""
        tool_call = ev.tool_call
        tools_by_name = {tool.metadata.get_name(): tool for tool in ev.tools}

        tool_msg = None

        tool = tools_by_name.get(tool_call.tool_name)
        additional_kwargs = {
            "tool_call_id": tool_call.tool_id,
            "name": tool.metadata.get_name(),
        }
        if not tool:
            tool_msg = ChatMessage(
                role="tool",
                content=f"Tool {tool_call.tool_name} does not exist",
                additional_kwargs=additional_kwargs,
            )

        try:
            tool_output = await tool.acall(**tool_call.tool_kwargs)

            tool_msg = ChatMessage(
                role="tool",
                content=tool_output.content,
                additional_kwargs=additional_kwargs,
            )
        except Exception as e:
            tool_msg = ChatMessage(
                role="tool",
                content=f"Encountered error in tool call: {e}",
                additional_kwargs=additional_kwargs,
            )

        ctx.write_event_to_stream(
            ProgressEvent(
                msg=f"Tool {tool_call.tool_name} called with {tool_call.tool_kwargs} returned {tool_msg.content}"
            )
        )

        return ToolCallResultEvent(chat_message=tool_msg)

    @step
    async def aggregate_tool_results(
        self, ctx: Context, ev: ToolCallResultEvent
    ) -> LLMCallEvent:
        """Collects the results of all tool calls and updates the chat history."""
        num_tool_calls = await ctx.get("num_tool_calls")
        results = ctx.collect_events(ev, [ToolCallResultEvent] * num_tool_calls)
        if not results:
            return

        chat_history = await ctx.get("chat_history")
        for result in results:
            chat_history.append(result.chat_message)
        await ctx.set("chat_history", chat_history)

        return LLMCallEvent()

## Try it out!

With our workflow defined, we can now try it out!

In [5]:
from llama_index.llms.openai import OpenAI

llm = OpenAI(model="gpt-4o", temperature=0.3)
workflow = HITLAgent()

handler = workflow.run(
    agent_config=agent_config,
    user_msg="What is 10 + 10?",
    chat_history=[],
    initial_state={"user_name": "Logan"},
    llm=llm,
)

async for event in handler.stream_events():
    if isinstance(event, ProgressEvent):
        print(event.msg)
    elif isinstance(event, ToolRequestEvent):
        print(f"Tool {event.tool_name} requires human approval. Approving!")
        # TODO: Implement your own logic to approve or reject the tool call
        # TODO: Try to reject the tool call and see what happens!
        handler.ctx.send_event(ToolApprovedEvent(
            approved=True,
            tool_name=event.tool_name,
            tool_id=event.tool_id,
            tool_kwargs=event.tool_kwargs,
        ))

print("-----------")

final_result = await handler
print(final_result["response"])

Tool add_two_numbers requires human approval. Approving!
Tool add_two_numbers called with {'a': 10, 'b': 10} returned 20
-----------
10 + 10 is 20.


Since the chat history is passed in each time, we will need to manage it for the next run. A useful way to do this is with a memory buffer.

In [6]:
from llama_index.core.memory import ChatMemoryBuffer

memory = ChatMemoryBuffer.from_defaults(
    llm=llm,
)

memory.set(final_result["chat_history"])

Now, lets run again with chat history managed by the memory buffer!

In [7]:
handler = workflow.run(
    agent_config=agent_config,
    user_msg="What is the capital of Canada?",
    chat_history=memory.get(),
    initial_state={"user_name": "Logan"},
    llm=llm,
    memory=memory,
)

async for event in handler.stream_events():
    if isinstance(event, ProgressEvent):
        print(event.msg)

print("-----------")

final_result = await handler
print(final_result["response"])

-----------
I'm here to help with adding two numbers together.


In [8]:
memory.set(final_result["chat_history"])