# agent.py Interactive Tutorial

This notebook walks through the `Agent` class in `agent.py`. The content is adapted from `walkthrou.md` and checked against the actual code so the explanations stay accurate. Code cells are runnable without POP by using a small mock LLM stream.


## High-level purpose

`agent.py` defines `Agent`, a stateful conversation orchestrator that:
- Owns conversation state (system prompt, model, messages, tools, streaming flags).
- Exposes high-level APIs to `prompt`, `continue_run`, `steer`, and `follow_up`.
- Bridges your app or UI to the low-level event loop in `agent_loop.py`.
- Lets you subscribe to events like message streaming and tool execution.

Think of `Agent` as the facade and state container, and `agent_loop.py` as the engine.


In [1]:
import asyncio
import time

from agent import Agent
from agent.agent_types import (
    AgentEvent,
    AgentMessage,
    AgentTool,
    AgentToolResult,
    ImageContent,
    TextContent,
    ThinkingLevel,
)

from dotenv import load_dotenv
load_dotenv()


ImportError: attempted relative import with no known parent package

## Dependencies and POP defaults

`Agent` depends on `agent_loop.agent_loop` and `agent_loop.agent_loop_continue`, uses `EventStream`, and imports types from `agent_types`.

It tries to import POP to get defaults:
- If POP is available, it calls `POP.get_model("google", "gemini-2.5-flash-lite-preview-06-17")` for the default model.
- If POP is missing, the default model is `{"provider": "unknown", "id": "unknown", "api": None}`.
- If POP is missing and you do not supply a `stream_fn`, `Agent` sets `stream_fn = None` and raises later when you call `prompt` or `continue_run`.


## Default message conversion

`_default_convert_to_llm` filters messages by role (`user`, `assistant`, `toolResult`) and converts them to dicts via `AgentMessage.to_dict()`.
If conversion fails, it falls back to a best-effort dict with `role`, `content` (via `vars`), and `timestamp`.

You can override this with `opts["convert_to_llm"]` when constructing `Agent`.


In [None]:
async def convert_to_llm_minimal(messages):
    # Minimal conversion that keeps only user, assistant, and toolResult roles.
    llm_msgs = []
    for m in messages:
        if m.role not in {"user", "assistant", "toolResult"}:
            continue
        llm_msgs.append(m.to_dict())
    return llm_msgs


## Construction and options

`Agent(opts=...)` accepts optional keys:
- `initial_state`: override fields on `AgentState` (only known fields are applied).
- `convert_to_llm`: custom message conversion function.
- `transform_context`: optional pruning or augmentation before each LLM call.
- `steering_mode`: `"one-at-a-time"` (default) or `"all"`.
- `follow_up_mode`: `"one-at-a-time"` (default) or `"all"`.
- `stream_fn`: LLM transport function (defaults to `POP.stream.stream` if available).
- `session_id`: forwarded to the provider.
- `get_api_key`: async hook to fetch an API key per call.
- `thinking_budgets`: custom token budgets for reasoning.
- `max_retry_delay_ms`: maximum backoff delay.

The default `AgentState` includes:
- `system_prompt`, `model`, `thinking_level`, `tools`, `messages`
- `is_streaming`, `stream_message`
- `pending_tool_calls`
- `error`

Also present in the public API (not just setters):
- `get_steering_mode()` and `get_follow_up_mode()`
- `continue_()` as an alias for `continue_run()`


In [None]:
from POP.stream import stream

agent = Agent(
    opts={
        "stream_fn": stream,
        "initial_state": {
            "system_prompt": "You are a helpful assistant."
                    },
        "session_id": "demo-session",
    }
)


for k, v in agent._state.__dict__.items():
    print(f"{k}: {v}")

ImportError: attempted relative import with no known parent package

## State mutators and getters

`agent.state` is read-only by convention; use setter methods instead:
- `set_system_prompt`, `set_model`, `set_thinking_level`
- `set_tools`, `replace_messages`, `append_message`
- `set_steering_mode`, `get_steering_mode`
- `set_follow_up_mode`, `get_follow_up_mode`


In [None]:
agent.set_system_prompt("System prompt for this run.")
# agent.set_model({"provider": "openai", "id": "gpt-5-nano", "api": None})
agent.set_model({"provider": "openai", "id": None, "api": None})
agent.set_tools([])

agent.get_steering_mode(), agent.get_follow_up_mode()


('one-at-a-time', 'one-at-a-time')

## Queues: steering and follow-up

- `steer(message)` queues a message that can be injected mid-turn by the loop.
- `follow_up(message)` queues a message used when the agent would otherwise finish.
- Modes control dequeue behavior: `"one-at-a-time"` or `"all"`.
- Clearing helpers: `clear_steering_queue`, `clear_follow_up_queue`, `clear_all_queues`.
- `clear_messages` removes conversation history but does not change tools or system prompt.


In [None]:
msg = AgentMessage(
    role="user",
    content=[TextContent(type="text", text="Please be concise.")],
    timestamp=time.time(),
)

agent.steer(msg)
agent.follow_up(msg)

print(agent._state)


agent.set_steering_mode("one-at-a-time")
agent.set_follow_up_mode("one-at-a-time")

for k, v in agent._state.__dict__.items():
    if k == "messages":
        print(f"{k}:")
        for m in v:
            print(f"  - {m.role}: {[c.text for c in m.content if isinstance(c, TextContent)]}")
        continue
    print(f"{k}: {v}")

agent.clear_all_queues()


AgentState(system_prompt='System prompt for this run.', model={'provider': 'openai', 'id': None, 'api': None}, thinking_level='off', tools=[], messages=[], is_streaming=False, stream_message=None, pending_tool_calls=set(), error=None)
system_prompt: System prompt for this run.
model: {'provider': 'openai', 'id': None, 'api': None}
thinking_level: off
tools: []
messages:
is_streaming: False
stream_message: None
pending_tool_calls: set()
error: None


## Event subscription and event flow

`subscribe(fn)` registers a listener that receives every event emitted by the loop. Listeners run synchronously; exceptions are swallowed to avoid breaking the agent.

Event types observed by `Agent` include:
- `message_start`, `message_update`, `message_end`
- `tool_execution_start`, `tool_execution_end`
- `turn_end`, `agent_end`

`Agent` tracks `pending_tool_calls` using `tool_execution_start/end` events, and updates `state.error` when a `turn_end` event contains an assistant error message.


In [None]:
events = []

def on_event(e):
    events.append(e.get("type"))

unsubscribe = agent.subscribe(on_event)

# Jupyter supports top-level await.
await agent.prompt("Hello")

unsubscribe()

events


['agent_start',
 'turn_start',
 'message_start',
 'message_end',
 'message_start',
 'message_update',
 'message_update',
 'message_update',
 'message_end',
 'turn_end',
 'agent_end']

In [None]:
for k, v in agent._state.__dict__.items():
    if k == "messages":
        print(f"{k}:")
        for m in v:
            print(f"  - {m.role}: {[c.text for c in m.content if isinstance(c, TextContent)]}")
        continue
    print(f"{k}: {v}")

system_prompt: System prompt for this run.
model: {'provider': 'openai', 'id': None, 'api': None}
thinking_level: off
tools: []
messages:
  - user: ['Hello']
  - assistant: ['Hello! How can I help you today? I can answer questions, brainstorm ideas, draft or edit text, help with coding or math, plan projects, or just chat. What would you like to do?']
is_streaming: False
stream_message: None
pending_tool_calls: set()
error: None


## prompt and continue_run

`prompt(input, images=None)` accepts:
- `str` (wrapped into a user `AgentMessage` with `TextContent` and a timestamp)
- `AgentMessage`
- `Sequence[AgentMessage]`

Guardrails:
- If `is_streaming` is `True`, `prompt` raises a `RuntimeError` (use `steer` or `follow_up` instead).
- `continue_run` requires existing messages and the last message must not be an assistant message.
- `continue_()` is an alias for backward compatibility.


In [None]:
agent2 = Agent()
agent2.set_model({"provider": "gemini", "id": None, "api": None})

# Create a state where the last message is a user message.
agent2.replace_messages([
    AgentMessage(
        role="user",
        content=[TextContent(type="text", text="Continue from here.")],
        timestamp=time.time(),
    )
])

await agent2.continue_run()
for k, v in agent2._state.__dict__.items():
    if k == "messages":
        print(f"{k}:")
        for m in v:
            print(f"  - {m.role}: {[c.text for c in m.content if isinstance(c, TextContent)]}")
        continue
    print(f"{k}: {v}")

[ initialize ] default_model: {'provider': 'gemini', 'id': 'gemini-2.5-flash-lite', 'api': None}.


## Abort, wait_for_idle, and reset

- `abort()` sets an internal event that the loop checks to stop work.
- `wait_for_idle()` awaits the internal idle flag to know when a run finishes.
- `reset()` clears messages, streaming flags, pending tool calls, and queues.
  It does not change system prompt, model, tools, session id, or other config.

On exceptions inside `_run_loop`, `Agent` creates an assistant error message with:
- `stop_reason = "aborted"` if the abort event was set, otherwise `"error"`
- `error_message = str(exc)`
This error message is appended to history and an `agent_end` event is emitted.


In [None]:
# Example usage (safe to call even without a running stream)
agent.reset()

# If a long run is in progress:
# agent.abort()
# await agent.wait_for_idle()
