diff --git a/PlutoClient.sh b/PlutoClient.sh index 56ab0c8..72bf777 100755 --- a/PlutoClient.sh +++ b/PlutoClient.sh @@ -89,6 +89,7 @@ Commands: stats Show server statistics (locks, messages, deadlocks, per-agent counters). No registration required. guide Generate the Pluto agent guide file. + register Register an agent and maintain presence. Global options (place before the command): --host HOST Pluto server host (default: 127.0.0.1) @@ -98,6 +99,13 @@ Global options (place before the command): Guide-specific options: --output PATH Output path for the generated guide file. +Register-specific options: + --daemon Run as background daemon maintaining TCP heartbeat + --http Use HTTP registration (no persistent TCP needed) + --stateless Register as stateless agent with longer TTL + --ttl SECONDS TTL in seconds for HTTP/stateless mode (default: 300) + --http-port PORT HTTP port (default: 9001) + Examples: ${GREEN}# Check if the server is reachable${NC} $(basename "$0") ping @@ -111,6 +119,15 @@ Examples: ${GREEN}# Generate the agent coordination guide${NC} $(basename "$0") guide --output ./agent_guide.md + ${GREEN}# Register via HTTP (for CLI agents like Claude Code)${NC} + $(basename "$0") register --http --agent-id claude-workspace + + ${GREEN}# Register as stateless with 5-min TTL${NC} + $(basename "$0") register --stateless --ttl 300 --agent-id my-agent + + ${GREEN}# Spawn a TCP daemon that maintains heartbeat${NC} + $(basename "$0") register --daemon --agent-id my-agent + Requires: Python 3, a running Pluto server (see ${CYAN}./PlutoServer.sh --help${NC}). ${YELLOW}Starting an Agent with Pluto:${NC} @@ -148,5 +165,155 @@ fi PYTHON_BIN=$(find_python) ensure_venv "${PYTHON_BIN}" +# ── Register subcommand special handling ────────────────────────────────────── + +handle_register() { + local host="127.0.0.1" + local port="9000" + local http_port="9001" + local agent_id="pluto-cli" + local mode="" + local daemon=false + local ttl=300 + local args=() + + # Parse global options and register-specific flags + while [[ $# -gt 0 ]]; do + case "$1" in + --host) host="$2"; shift 2 ;; + --port) port="$2"; shift 2 ;; + --http-port) http_port="$2"; shift 2 ;; + --agent-id) agent_id="$2"; shift 2 ;; + --daemon) daemon=true; shift ;; + --http) mode="http"; shift ;; + --stateless) mode="stateless"; shift ;; + --ttl) ttl="$2"; shift 2 ;; + register) shift ;; # skip the command itself + *) args+=("$1"); shift ;; + esac + done + + if [[ "$daemon" == true ]]; then + # Solution 3: Spawn a background daemon that maintains TCP connection + info "Starting TCP daemon for agent '${agent_id}'..." + local pidfile="/tmp/pluto/daemon_${agent_id}.pid" + mkdir -p /tmp/pluto + + # Check if daemon is already running + if [[ -f "$pidfile" ]] && kill -0 "$(cat "$pidfile")" 2>/dev/null; then + warn "Daemon for '${agent_id}' already running (PID $(cat "$pidfile"))" + exit 0 + fi + + nohup python -c " +import sys, os, time, socket, json, signal + +HOST = '${host}' +PORT = int('${port}') +AGENT_ID = '${agent_id}' +HB_INTERVAL = 10 # seconds + +def main(): + pidfile = '${pidfile}' + with open(pidfile, 'w') as f: + f.write(str(os.getpid())) + + def cleanup(sig, frame): + try: os.remove(pidfile) + except: pass + sys.exit(0) + signal.signal(signal.SIGTERM, cleanup) + signal.signal(signal.SIGINT, cleanup) + + while True: + try: + sock = socket.create_connection((HOST, PORT), timeout=5) + # Register + msg = json.dumps({'op': 'register', 'agent_id': AGENT_ID}) + '\n' + sock.sendall(msg.encode()) + sock.settimeout(5) + resp = sock.recv(4096) + print(f'[pluto-daemon] Registered as {AGENT_ID}', flush=True) + + # Heartbeat loop + while True: + time.sleep(HB_INTERVAL) + ping = json.dumps({'op': 'ping'}) + '\n' + sock.sendall(ping.encode()) + data = sock.recv(4096) + if not data: + break + except Exception as e: + print(f'[pluto-daemon] Connection lost: {e}, reconnecting...', flush=True) + time.sleep(3) + +main() +" > /tmp/pluto/daemon_${agent_id}.log 2>&1 & + + local daemon_pid=$! + ok "Daemon started (PID ${daemon_pid}), log: /tmp/pluto/daemon_${agent_id}.log" + ok "To stop: kill ${daemon_pid}" + return 0 + fi + + if [[ "$mode" == "http" || "$mode" == "stateless" ]]; then + # Solutions 1, 2, 4: HTTP-based registration + local ttl_ms=$((ttl * 1000)) + local body="{\"agent_id\":\"${agent_id}\",\"mode\":\"${mode}\",\"ttl_ms\":${ttl_ms}}" + + info "Registering agent '${agent_id}' via HTTP (mode=${mode}, ttl=${ttl}s)..." + + local response + response=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "${body}" \ + "http://${host}:${http_port}/agents/register" 2>&1) + + if [[ $? -ne 0 ]]; then + err "Failed to connect to http://${host}:${http_port}" + exit 1 + fi + + local status + status=$(echo "$response" | python -c "import sys,json; print(json.loads(sys.stdin.read()).get('status',''))" 2>/dev/null) + + if [[ "$status" == "ok" ]]; then + local token + token=$(echo "$response" | python -c "import sys,json; print(json.loads(sys.stdin.read()).get('token',''))" 2>/dev/null) + local actual_id + actual_id=$(echo "$response" | python -c "import sys,json; print(json.loads(sys.stdin.read()).get('agent_id',''))" 2>/dev/null) + ok "Registered successfully!" + ok " Agent ID : ${actual_id}" + ok " Token : ${token}" + ok " Mode : ${mode}" + ok " TTL : ${ttl}s" + ok "" + ok "To keep alive, periodically call:" + ok " curl -X POST -H 'Content-Type: application/json' \\" + ok " -d '{\"token\":\"${token}\"}' \\" + ok " http://${host}:${http_port}/agents/heartbeat" + ok "" + ok "To poll messages:" + ok " curl http://${host}:${http_port}/agents/poll?token=${token}" + else + err "Registration failed: ${response}" + exit 1 + fi + return 0 + fi + + # Default: TCP registration (foreground, exits when done) + info "Registering agent '${agent_id}' via TCP..." + exec python "${PYTHON_SCRIPT}" --host "$host" --port "$port" --agent-id "$agent_id" ping +} + +# Check if register subcommand is being used +for arg in "$@"; do + if [[ "$arg" == "register" ]]; then + handle_register "$@" + exit $? + fi +done + # Run the Python client with all provided arguments exec python "${PYTHON_SCRIPT}" "$@" diff --git a/VERSION.md b/VERSION.md index 1474d00..22c08f7 100644 --- a/VERSION.md +++ b/VERSION.md @@ -1 +1 @@ -v0.2.0 +v0.2.1 diff --git a/src_erl/include/pluto.hrl b/src_erl/include/pluto.hrl index d795ad3..f49ac78 100644 --- a/src_erl/include/pluto.hrl +++ b/src_erl/include/pluto.hrl @@ -16,7 +16,7 @@ -define(APP, pluto). %% ── Version ───────────────────────────────────────────────────────────────── --define(VERSION, "0.2.0"). +-define(VERSION, "0.2.1"). %% ── Default configuration values ──────────────────────────────────────────── -define(DEFAULT_TCP_PORT, 9000). @@ -27,6 +27,8 @@ -define(DEFAULT_HEARTBEAT_TIMEOUT_MS, 30000). -define(DEFAULT_RECONNECT_GRACE_MS, 30000). -define(DEFAULT_MAX_WAIT_MS, 60000). +-define(DEFAULT_HTTP_SESSION_TTL_MS, 300000). %% 5 minutes for HTTP agents +-define(DEFAULT_HTTP_SESSION_SWEEP_MS, 10000). %% sweep HTTP sessions every 10s -define(DEFAULT_FLUSH_INTERVAL, 60000). -define(DEFAULT_PERSISTENCE_DIR, "/tmp/pluto/state"). -define(DEFAULT_EVENT_LOG_DIR, "/tmp/pluto/events"). @@ -42,6 +44,7 @@ -define(ETS_LIVENESS, pluto_liveness). %% session_id -> last_seen_ms -define(ETS_TASKS, pluto_tasks). %% task_id -> task record -define(ETS_MSG_INBOX, pluto_msg_inbox). %% {agent_id, seq} -> message map +-define(ETS_HTTP_SESSIONS, pluto_http_sessions). %% token -> #http_session{} %% ── Maximum line length for TCP reads (1 MB) ─────────────────────────────── -define(MAX_LINE_LENGTH, 1048576). diff --git a/src_erl/include/pluto_records.hrl b/src_erl/include/pluto_records.hrl index 61ea1e9..002ee6d 100644 --- a/src_erl/include/pluto_records.hrl +++ b/src_erl/include/pluto_records.hrl @@ -66,6 +66,7 @@ %% last_seen — System time (ms) of last heartbeat/message %% custom_status— Custom agent status (e.g. <<"busy">>, <<"idle">>) %% subscriptions— List of topic names this agent subscribes to +%% session_type — Connection type: 'tcp' | 'http' | 'stateless' %% -record(agent, { agent_id :: binary(), @@ -76,7 +77,8 @@ attributes :: map(), last_seen :: integer(), custom_status :: binary(), - subscriptions :: [binary()] + subscriptions :: [binary()], + session_type :: tcp | http | stateless }). %% ── Session record ────────────────────────────────────────────────────────── @@ -92,4 +94,24 @@ session_pid :: pid() }). +%% ── HTTP session record ───────────────────────────────────────────────────── +%% Stored in the ETS_HTTP_SESSIONS table, keyed by token. +%% Represents a stateless/HTTP agent session maintained via periodic heartbeats. +%% +%% token — Opaque session token returned at HTTP registration (binary) +%% agent_id — The agent_id bound to this HTTP session (binary) +%% session_id — Server-generated session identifier (binary) +%% ttl_ms — Time-to-live; session expires if no heartbeat within this window +%% last_seen — System time (ms) of last HTTP heartbeat or request +%% mode — 'http' | 'stateless' +%% +-record(http_session, { + token :: binary(), + agent_id :: binary(), + session_id :: binary(), + ttl_ms :: non_neg_integer(), + last_seen :: integer(), + mode :: http | stateless +}). + -endif. %% PLUTO_RECORDS_HRL diff --git a/src_erl/src/pluto.app.src b/src_erl/src/pluto.app.src index a5ef5d3..7c55ec2 100644 --- a/src_erl/src/pluto.app.src +++ b/src_erl/src/pluto.app.src @@ -1,6 +1,6 @@ {application, pluto, [ {description, "Pluto — Agent Coordination Server"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, [ pluto_sup, pluto_lock_mgr, diff --git a/src_erl/src/pluto_app.erl b/src_erl/src/pluto_app.erl index ac6582a..3cc6700 100644 --- a/src_erl/src/pluto_app.erl +++ b/src_erl/src/pluto_app.erl @@ -57,6 +57,8 @@ create_ets_tables() -> ets:new(?ETS_LIVENESS, [named_table, set, public]), ets:new(?ETS_TASKS, [named_table, set, public]), ets:new(?ETS_MSG_INBOX, [named_table, ordered_set, public]), + ets:new(?ETS_HTTP_SESSIONS, [named_table, set, public, + {keypos, #http_session.token}]), ok. %% @private Print the Pluto ASCII art banner on startup. diff --git a/src_erl/src/pluto_heartbeat.erl b/src_erl/src/pluto_heartbeat.erl index 2185ee2..f1cf263 100644 --- a/src_erl/src/pluto_heartbeat.erl +++ b/src_erl/src/pluto_heartbeat.erl @@ -93,6 +93,9 @@ handle_info(sweep, #state{sweep_ms = SweepMs, timeout_ms = TimeoutMs} = State) - end end, AllEntries), + %% Also sweep HTTP sessions with per-session TTL + sweep_http_sessions(Now), + erlang:send_after(SweepMs, self(), sweep), {noreply, State}; @@ -101,3 +104,25 @@ handle_info(_Info, State) -> terminate(_Reason, _State) -> ok. + +%%==================================================================== +%% Internal +%%==================================================================== + +%% @private Sweep HTTP sessions that have exceeded their individual TTL. +sweep_http_sessions(Now) -> + AllHttpSessions = ets:tab2list(?ETS_HTTP_SESSIONS), + lists:foreach(fun(#http_session{token = Token, agent_id = AgentId, + session_id = SessId, ttl_ms = TtlMs, + last_seen = LastSeen}) -> + case Now - LastSeen > TtlMs of + true -> + ?LOG_WARN("HTTP session ~s (agent ~s) expired (TTL ~wms)", + [SessId, AgentId, TtlMs]), + ets:delete(?ETS_HTTP_SESSIONS, Token), + ets:delete(?ETS_SESSIONS, SessId), + pluto_msg_hub:unregister_agent(AgentId); + false -> + ok + end + end, AllHttpSessions). diff --git a/src_erl/src/pluto_http_listener.erl b/src_erl/src/pluto_http_listener.erl index c5bc5a8..463ba7a 100644 --- a/src_erl/src/pluto_http_listener.erl +++ b/src_erl/src/pluto_http_listener.erl @@ -363,10 +363,209 @@ route('POST', <<"/agents/find">>, Body) -> {400, #{<<"error">> => <<"missing filter map">>}} end; +%% ── Detailed agent listing ────────────────────────────────────────── +route('GET', <<"/agents/list/detailed">>, _Body) -> + AgentMaps = pluto_msg_hub:list_agents_detailed(), + {200, #{<<"status">> => <<"ok">>, <<"agents">> => AgentMaps}}; + +%%==================================================================== +%% HTTP Session Registration (Solutions 1, 2, 4) +%%==================================================================== +%% +%% POST /agents/register — Register an agent via HTTP, returns a session +%% token. The session stays alive as long as HTTP heartbeats arrive +%% within the TTL window. +%% +%% Body: {"agent_id": "my-agent", "attributes": {...}, +%% "mode": "http"|"stateless", "ttl_ms": 300000} +%% +%% Returns: {"status":"ok", "token":"PLUTO-...", "session_id":"...", +%% "agent_id":"...", "ttl_ms": 300000} + +route('POST', <<"/agents/register">>, Body) -> + case decode_body(Body) of + {ok, #{<<"agent_id">> := AgentId} = Msg} + when is_binary(AgentId), AgentId =/= <<>> -> + Token = maps:get(<<"token">>, Msg, undefined), + Attrs = maps:get(<<"attributes">>, Msg, #{}), + ModeStr = maps:get(<<"mode">>, Msg, <<"http">>), + Mode = case ModeStr of + <<"stateless">> -> stateless; + _ -> http + end, + DefaultTtl = pluto_config:get(http_session_ttl_ms, + ?DEFAULT_HTTP_SESSION_TTL_MS), + TtlMs = maps:get(<<"ttl_ms">>, Msg, DefaultTtl), + %% Auth check + case pluto_policy:check_auth(AgentId, Token) of + ok -> + case pluto_msg_hub:register_http_agent( + AgentId, Attrs, Mode, TtlMs, #{}) of + {ok, SessToken, SessId} -> + {200, #{<<"status">> => <<"ok">>, + <<"token">> => SessToken, + <<"session_id">> => SessId, + <<"agent_id">> => AgentId, + <<"mode">> => ModeStr, + <<"ttl_ms">> => TtlMs}}; + {ok, SessToken, SessId, ActualAgentId} -> + %% Name was taken — got a unique suffix + {200, #{<<"status">> => <<"ok">>, + <<"token">> => SessToken, + <<"session_id">> => SessId, + <<"agent_id">> => ActualAgentId, + <<"requested_id">> => AgentId, + <<"mode">> => ModeStr, + <<"ttl_ms">> => TtlMs}} + end; + {error, unauthorized} -> + {401, #{<<"status">> => <<"error">>, + <<"reason">> => <<"unauthorized">>}} + end; + _ -> + {400, #{<<"error">> => <<"missing agent_id">>}} + end; + +%% POST /agents/heartbeat — Keep HTTP session alive +%% Body: {"token": "PLUTO-..."} +route('POST', <<"/agents/heartbeat">>, Body) -> + case decode_body(Body) of + {ok, #{<<"token">> := Token}} when is_binary(Token) -> + case pluto_msg_hub:touch_http_agent(Token) of + ok -> + Now = erlang:system_time(millisecond), + {200, #{<<"status">> => <<"ok">>, <<"ts">> => Now}}; + {error, not_found} -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + _ -> + {400, #{<<"error">> => <<"missing token">>}} + end; + +%% POST /agents/unregister — Remove HTTP session +%% Body: {"token": "PLUTO-..."} +route('POST', <<"/agents/unregister">>, Body) -> + case decode_body(Body) of + {ok, #{<<"token">> := Token}} when is_binary(Token) -> + case pluto_msg_hub:unregister_http_agent(Token) of + ok -> + {200, #{<<"status">> => <<"ok">>}}; + {error, not_found} -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + _ -> + {400, #{<<"error">> => <<"missing token">>}} + end; + +%% GET /agents/poll?token=... — Poll for queued messages +route('GET', <<"/agents/poll?", Query/binary>>, _Body) -> + Params = parse_query(Query), + case maps:find(<<"token">>, Params) of + {ok, Token} -> + %% Touch session to keep alive + case pluto_msg_hub:touch_http_agent(Token) of + ok -> + %% Look up agent_id from token + case ets:lookup(?ETS_HTTP_SESSIONS, Token) of + [#http_session{agent_id = AgentId}] -> + {ok, Messages} = pluto_msg_hub:poll_inbox(AgentId), + {200, #{<<"status">> => <<"ok">>, + <<"messages">> => Messages, + <<"count">> => length(Messages)}}; + [] -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + {error, not_found} -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + error -> + {400, #{<<"error">> => <<"missing token query parameter">>}} + end; + +%% POST /agents/send — Send message as HTTP agent (with token auth) +%% Body: {"token": "PLUTO-...", "to": "agent-b", "payload": {...}} +route('POST', <<"/agents/send">>, Body) -> + case decode_body(Body) of + {ok, #{<<"token">> := Token, <<"to">> := To, + <<"payload">> := Payload} = Msg} -> + case pluto_msg_hub:touch_http_agent(Token) of + ok -> + case ets:lookup(?ETS_HTTP_SESSIONS, Token) of + [#http_session{agent_id = From}] -> + RequestId = maps:get(<<"request_id">>, Msg, undefined), + case pluto_msg_hub:send_msg(From, To, Payload, RequestId) of + {ok, MsgId} -> + {200, #{<<"status">> => <<"ok">>, + <<"msg_id">> => MsgId}}; + ok -> + {200, #{<<"status">> => <<"ok">>}}; + {error, unknown_target} -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"unknown_target">>}} + end; + [] -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + {error, not_found} -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + _ -> + {400, #{<<"error">> => <<"missing token, to, and payload">>}} + end; + +%% POST /agents/broadcast — Broadcast as HTTP agent (with token) +%% Body: {"token": "PLUTO-...", "payload": {...}} +route('POST', <<"/agents/broadcast">>, Body) -> + case decode_body(Body) of + {ok, #{<<"token">> := Token, <<"payload">> := Payload}} -> + case pluto_msg_hub:touch_http_agent(Token) of + ok -> + case ets:lookup(?ETS_HTTP_SESSIONS, Token) of + [#http_session{agent_id = From}] -> + pluto_msg_hub:broadcast(From, Payload), + {200, #{<<"status">> => <<"ok">>}}; + [] -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + {error, not_found} -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + _ -> + {400, #{<<"error">> => <<"missing token and payload">>}} + end; + +%% POST /agents/subscribe — Subscribe to topic as HTTP agent +route('POST', <<"/agents/subscribe">>, Body) -> + case decode_body(Body) of + {ok, #{<<"token">> := Token, <<"topic">> := Topic}} -> + case pluto_msg_hub:touch_http_agent(Token) of + ok -> + case ets:lookup(?ETS_HTTP_SESSIONS, Token) of + [#http_session{agent_id = AgentId}] -> + pluto_msg_hub:subscribe(AgentId, Topic), + {200, #{<<"status">> => <<"ok">>}}; + [] -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + {error, not_found} -> + {404, #{<<"status">> => <<"error">>, + <<"reason">> => <<"session_not_found">>}} + end; + _ -> + {400, #{<<"error">> => <<"missing token and topic">>}} + end; + %% ── Agent status query ────────────────────────────────────────────── -%% Returns connection status, last-seen timestamp, custom status, and -%% attributes for a specific agent — useful for deciding whether to send -%% a direct message or fall back to broadcast. +%% Must come AFTER more specific /agents/* routes to avoid shadowing. route('GET', <<"/agents/", AgentId/binary>>, _Body) when AgentId =/= <<>> -> case pluto_msg_hub:agent_status(AgentId) of @@ -377,11 +576,6 @@ route('GET', <<"/agents/", AgentId/binary>>, _Body) <<"reason">> => <<"not_found">>}} end; -%% ── Detailed agent listing ────────────────────────────────────────── -route('GET', <<"/agents/list/detailed">>, _Body) -> - AgentMaps = pluto_msg_hub:list_agents_detailed(), - {200, #{<<"status">> => <<"ok">>, <<"agents">> => AgentMaps}}; - %% ── Task management via HTTP ──────────────────────────────────────── route('GET', <<"/tasks">>, _Body) -> Tasks = [T || {_Id, T} <- ets:tab2list(?ETS_TASKS)], diff --git a/src_erl/src/pluto_msg_hub.erl b/src_erl/src/pluto_msg_hub.erl index 616b988..9a75bb6 100644 --- a/src_erl/src/pluto_msg_hub.erl +++ b/src_erl/src/pluto_msg_hub.erl @@ -20,6 +20,7 @@ start_link/0, register_agent/3, register_agent/4, + register_http_agent/5, unregister_agent/1, send_msg/3, send_msg/4, @@ -33,7 +34,10 @@ publish/3, agent_status/1, set_agent_status/2, - deliver_inbox/1 + deliver_inbox/1, + touch_http_agent/1, + poll_inbox/1, + unregister_http_agent/1 ]). %% gen_server callbacks @@ -144,6 +148,49 @@ set_agent_status(AgentId, CustomStatus) -> deliver_inbox(AgentId) -> gen_server:cast(?MODULE, {deliver_inbox, AgentId}). +%% @doc Register an agent via HTTP (no persistent TCP session). +%% Returns `{ok, Token, SessionId}` or `{ok, Token, SessionId, ActualAgentId}` +%% when the requested name was taken. +-spec register_http_agent(binary(), map(), http | stateless, non_neg_integer(), map()) -> + {ok, binary(), binary()} | {ok, binary(), binary(), binary()}. +register_http_agent(AgentId, Attrs, Mode, TtlMs, _Opts) -> + gen_server:call(?MODULE, {register_http, AgentId, Attrs, Mode, TtlMs}). + +%% @doc Touch an HTTP agent's liveness timestamp (heartbeat via HTTP). +-spec touch_http_agent(binary()) -> ok | {error, not_found}. +touch_http_agent(Token) -> + case ets:lookup(?ETS_HTTP_SESSIONS, Token) of + [HS] -> + Now = erlang:system_time(millisecond), + ets:insert(?ETS_HTTP_SESSIONS, HS#http_session{last_seen = Now}), + %% Also update the agent's last_seen + case ets:lookup(?ETS_AGENTS, HS#http_session.agent_id) of + [Agent] -> + ets:insert(?ETS_AGENTS, Agent#agent{last_seen = Now}); + [] -> ok + end, + ok; + [] -> + {error, not_found} + end. + +%% @doc Poll and return queued inbox messages for an HTTP agent. +-spec poll_inbox(binary()) -> {ok, [map()]}. +poll_inbox(AgentId) -> + gen_server:call(?MODULE, {poll_inbox, AgentId}). + +%% @doc Unregister an HTTP agent by token. +-spec unregister_http_agent(binary()) -> ok | {error, not_found}. +unregister_http_agent(Token) -> + case ets:lookup(?ETS_HTTP_SESSIONS, Token) of + [#http_session{agent_id = AgentId}] -> + ets:delete(?ETS_HTTP_SESSIONS, Token), + unregister_agent(AgentId), + ok; + [] -> + {error, not_found} + end. + %%==================================================================== %% gen_server callbacks %%==================================================================== @@ -157,34 +204,81 @@ handle_call({register, AgentId, SessionId, SessionPid, Attrs}, _From, State) -> Policy = pluto_config:get(session_conflict_policy, ?DEFAULT_SESSION_CONFLICT), case ets:lookup(?ETS_AGENTS, AgentId) of [#agent{status = connected, session_pid = OldPid}] when Policy =:= strict -> - case is_process_alive(OldPid) of + case is_pid(OldPid) andalso is_process_alive(OldPid) of true -> %% Name taken by a live agent — assign a unique suffixed name UniqueId = make_unique_agent_id(AgentId), - do_register(UniqueId, SessionId, SessionPid, Attrs), + do_register(UniqueId, SessionId, SessionPid, Attrs, tcp), {reply, {ok, SessionId, UniqueId}, State}; false -> - do_register(AgentId, SessionId, SessionPid, Attrs), + do_register(AgentId, SessionId, SessionPid, Attrs, tcp), {reply, {ok, SessionId}, State} end; + [#agent{status = connected, session_type = SType}] + when (SType =:= http orelse SType =:= stateless), Policy =:= strict -> + %% Name taken by an HTTP/stateless agent — assign a unique suffixed name + UniqueId = make_unique_agent_id(AgentId), + do_register(UniqueId, SessionId, SessionPid, Attrs, tcp), + {reply, {ok, SessionId, UniqueId}, State}; [#agent{status = connected, session_pid = OldPid}] when Policy =:= takeover -> - case is_process_alive(OldPid) of + case is_pid(OldPid) andalso is_process_alive(OldPid) of true -> OldPid ! {pluto_takeover, AgentId}; false -> ok end, - do_register(AgentId, SessionId, SessionPid, Attrs), + do_register(AgentId, SessionId, SessionPid, Attrs, tcp), {reply, {ok, SessionId}, State}; [#agent{status = disconnected}] -> %% Agent was disconnected — reconnect within grace period - do_register(AgentId, SessionId, SessionPid, Attrs), + do_register(AgentId, SessionId, SessionPid, Attrs, tcp), %% Deliver any queued inbox messages self() ! {deliver_inbox_sync, AgentId}, {reply, {ok, SessionId}, State}; [] -> - do_register(AgentId, SessionId, SessionPid, Attrs), + do_register(AgentId, SessionId, SessionPid, Attrs, tcp), {reply, {ok, SessionId}, State} end; +%% ── register_http — HTTP/stateless session registration ───────────── +handle_call({register_http, AgentId, Attrs, Mode, TtlMs}, _From, State) -> + Policy = pluto_config:get(session_conflict_policy, ?DEFAULT_SESSION_CONFLICT), + case ets:lookup(?ETS_AGENTS, AgentId) of + [#agent{status = connected, session_pid = OldPid}] when Policy =:= strict -> + IsAlive = is_pid(OldPid) andalso is_process_alive(OldPid), + %% Also check if it's an HTTP session still within TTL + IsHttpAlive = case ets:match_object(?ETS_HTTP_SESSIONS, + #http_session{agent_id = AgentId, _ = '_'}) of + [_|_] -> true; + [] -> false + end, + case IsAlive orelse IsHttpAlive of + true -> + %% Name taken — assign unique suffix + UniqueId = make_unique_agent_id(AgentId), + {Token, SessId} = do_register_http(UniqueId, Attrs, Mode, TtlMs), + {reply, {ok, Token, SessId, UniqueId}, State}; + false -> + {Token, SessId} = do_register_http(AgentId, Attrs, Mode, TtlMs), + {reply, {ok, Token, SessId}, State} + end; + [#agent{status = connected}] when Policy =:= takeover -> + %% Takeover: evict old session (TCP or HTTP) + evict_http_sessions(AgentId), + {Token, SessId} = do_register_http(AgentId, Attrs, Mode, TtlMs), + {reply, {ok, Token, SessId}, State}; + [#agent{status = disconnected}] -> + {Token, SessId} = do_register_http(AgentId, Attrs, Mode, TtlMs), + self() ! {deliver_inbox_sync, AgentId}, + {reply, {ok, Token, SessId}, State}; + [] -> + {Token, SessId} = do_register_http(AgentId, Attrs, Mode, TtlMs), + {reply, {ok, Token, SessId}, State} + end; + +%% ── poll_inbox — retrieve and clear queued messages ───────────────── +handle_call({poll_inbox, AgentId}, _From, State) -> + Messages = do_poll_inbox(AgentId), + {reply, {ok, Messages}, State}; + %% ── send direct message (with optional request_id for ack) ────────── handle_call({send, From, To, Payload, RequestId}, _From, #state{msg_seq = Seq} = State) -> @@ -204,7 +298,8 @@ handle_call({send, From, To, Payload, RequestId}, _From, %% Log message to event log for auditability pluto_event_log:log(message_sent, #{from => From, to => To, msg_id => MsgId}), case ets:lookup(?ETS_AGENTS, To) of - [#agent{status = connected, session_pid = Pid}] when is_pid(Pid) -> + [#agent{status = connected, session_pid = Pid, session_type = SType}] + when is_pid(Pid), SType =:= tcp -> Pid ! {pluto_event, Event2}, pluto_stats:inc(messages_sent), pluto_stats:inc(messages_received), @@ -229,6 +324,13 @@ handle_call({send, From, To, Payload, RequestId}, _From, end end, {reply, {ok, MsgId}, State#state{msg_seq = NewSeq}}; + [#agent{status = connected, session_type = SType}] + when SType =:= http; SType =:= stateless -> + %% HTTP/stateless agent — queue message in inbox for polling + queue_inbox_message(To, Event2), + pluto_stats:inc(messages_sent), + pluto_stats:inc_agent(From, messages_sent), + {reply, {ok, MsgId}, State#state{msg_seq = NewSeq}}; [#agent{status = disconnected}] -> %% Agent is disconnected — queue message in inbox queue_inbox_message(To, Event2), @@ -470,7 +572,7 @@ terminate(_Reason, _State) -> %% @private Perform the actual registration: insert agent and session records, %% update liveness, and broadcast the join event. -do_register(AgentId, SessionId, SessionPid, Attrs) -> +do_register(AgentId, SessionId, SessionPid, Attrs, SessionType) -> Now = pluto_lease:now_ms(), SysNow = erlang:system_time(millisecond), @@ -492,7 +594,8 @@ do_register(AgentId, SessionId, SessionPid, Attrs) -> attributes = MergedAttrs, last_seen = SysNow, custom_status = <<"online">>, - subscriptions = ExistingSubs + subscriptions = ExistingSubs, + session_type = SessionType }, ets:insert(?ETS_AGENTS, Agent), @@ -522,12 +625,19 @@ do_register(AgentId, SessionId, SessionPid, Attrs) -> ok. -%% @private Generate a unique agent_id by appending a unique integer suffix. +%% @private Generate a unique agent_id by appending a 6-character alphanumeric suffix. %% Called when the requested name is already taken by a live agent. make_unique_agent_id(BaseId) -> - Suffix = integer_to_binary(erlang:unique_integer([positive])), + Suffix = generate_alphanum_suffix(6), <>. +%% @private Generate N random characters from [A-Za-z0-9]. +generate_alphanum_suffix(N) -> + Chars = <<"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789">>, + Len = byte_size(Chars), + Bytes = crypto:strong_rand_bytes(N), + << <<(binary:at(Chars, B rem Len))>> || <> <= Bytes >>. + %% @private Send an event to all connected agents except `ExcludeAgentId`. broadcast_event(Event, ExcludeAgentId) -> AllAgents = ets:match_object(?ETS_AGENTS, #agent{status = connected, _ = '_'}), @@ -615,3 +725,106 @@ orphan_agent_tasks(AgentId) -> pluto_event_log:log(tasks_orphaned, #{agent_id => AgentId, task_ids => OrphanIds}) end. + +%% @private Register an agent via HTTP. Creates agent record and HTTP session. +%% Returns {Token, SessionId}. +do_register_http(AgentId, Attrs, Mode, TtlMs) -> + Token = generate_http_token(), + SessionId = generate_http_session_id(), + Now = pluto_lease:now_ms(), + SysNow = erlang:system_time(millisecond), + + %% Preserve existing attributes/subscriptions on reconnect + {MergedAttrs, ExistingSubs} = case ets:lookup(?ETS_AGENTS, AgentId) of + [#agent{attributes = OldAttrs, subscriptions = OldSubs}] -> + {maps:merge(OldAttrs, Attrs), OldSubs}; + [] -> + {Attrs, []} + end, + + %% Evict any previous HTTP sessions for this agent + evict_http_sessions(AgentId), + + %% Create agent record (no session_pid for HTTP agents) + Agent = #agent{ + agent_id = AgentId, + session_id = SessionId, + session_pid = undefined, + status = connected, + connected_at = Now, + attributes = MergedAttrs, + last_seen = SysNow, + custom_status = <<"online">>, + subscriptions = ExistingSubs, + session_type = Mode + }, + ets:insert(?ETS_AGENTS, Agent), + + %% Create HTTP session record + HttpSession = #http_session{ + token = Token, + agent_id = AgentId, + session_id = SessionId, + ttl_ms = TtlMs, + last_seen = SysNow, + mode = Mode + }, + ets:insert(?ETS_HTTP_SESSIONS, HttpSession), + + %% Insert session record + Session = #session{ + session_id = SessionId, + agent_id = AgentId, + session_pid = undefined + }, + ets:insert(?ETS_SESSIONS, Session), + + %% Track stats + pluto_stats:inc(agents_registered), + pluto_stats:inc_agent(AgentId, registrations), + + %% Broadcast agent_joined + broadcast_event(#{ + <<"event">> => ?EVT_AGENT_JOINED, + <<"agent_id">> => AgentId + }, AgentId), + + pluto_event_log:log(agent_registered, #{agent_id => AgentId, + session_id => SessionId, + mode => Mode}), + + {Token, SessionId}. + +%% @private Generate a cryptographically random HTTP session token. +generate_http_token() -> + Hex = binary:encode_hex(crypto:strong_rand_bytes(24)), + <<"PLUTO-", Hex/binary>>. + +%% @private Generate an HTTP session ID. +generate_http_session_id() -> + Hex = binary:encode_hex(crypto:strong_rand_bytes(8)), + <<"HTTP-SESS-", Hex/binary>>. + +%% @private Evict all HTTP sessions for a given agent_id. +evict_http_sessions(AgentId) -> + Sessions = ets:match_object(?ETS_HTTP_SESSIONS, + #http_session{agent_id = AgentId, _ = '_'}), + lists:foreach(fun(#http_session{token = T}) -> + ets:delete(?ETS_HTTP_SESSIONS, T) + end, Sessions). + +%% @private Poll inbox: retrieve and delete all queued messages for an agent. +do_poll_inbox(AgentId) -> + Keys = ets:match(?ETS_MSG_INBOX, {{AgentId, '$1'}, '_'}), + SortedSeqs = lists:sort([S || [S] <- Keys]), + Messages = lists:filtermap(fun(Seq) -> + Key = {AgentId, Seq}, + case ets:lookup(?ETS_MSG_INBOX, Key) of + [{_, Event}] -> + ets:delete(?ETS_MSG_INBOX, Key), + {true, Event}; + [] -> + false + end + end, SortedSeqs), + Messages. diff --git a/src_erl/test/pluto_v021_tests.erl b/src_erl/test/pluto_v021_tests.erl new file mode 100644 index 0000000..52aafeb --- /dev/null +++ b/src_erl/test/pluto_v021_tests.erl @@ -0,0 +1,557 @@ +%%% Integration tests for Pluto v0.2.1 features: +%%% - HTTP-based session registration (POST /agents/register) +%%% - Stateless agent mode +%%% - Configurable heartbeat TTL for HTTP agents +%%% - HTTP session heartbeat, poll, send, broadcast, unregister +%%% - Duplicate agent name prevention (6-char alphanumeric suffix) +%%% - HTTP session expiry via heartbeat sweeper +-module(pluto_v021_tests). +-include_lib("eunit/include/eunit.hrl"). +-include("pluto.hrl"). + +%%==================================================================== +%% Test Fixtures +%%==================================================================== + +app_setup() -> + application:set_env(pluto, persistence_dir, "/tmp/pluto/test_v021"), + application:set_env(pluto, event_log_dir, "/tmp/pluto/test_v021_events"), + application:set_env(pluto, tcp_port, 19021), + application:set_env(pluto, http_port, 19022), + application:set_env(pluto, heartbeat_interval_ms, 60000), + application:set_env(pluto, heartbeat_timeout_ms, 120000), + application:set_env(pluto, reconnect_grace_ms, 120000), + application:set_env(pluto, http_session_ttl_ms, 300000), + application:unset_env(pluto, agent_tokens), + application:unset_env(pluto, admin_token), + application:set_env(pluto, acl, undefined), + {ok, _} = application:ensure_all_started(pluto), + timer:sleep(300), + {19021, 19022}. + +app_teardown(_Ports) -> + application:stop(pluto), + timer:sleep(100). + +%%==================================================================== +%% Test generators +%%==================================================================== + +v021_test_() -> + {setup, + fun app_setup/0, + fun app_teardown/1, + fun({TcpPort, HttpPort}) -> + [ + %% HTTP session registration + {"http register agent", + fun() -> t_http_register(HttpPort) end}, + {"http register stateless agent", + fun() -> t_http_register_stateless(HttpPort) end}, + {"http register with custom ttl", + fun() -> t_http_register_custom_ttl(HttpPort) end}, + + %% HTTP session operations + {"http heartbeat", + fun() -> t_http_heartbeat(HttpPort) end}, + {"http poll messages", + fun() -> t_http_poll(TcpPort, HttpPort) end}, + {"http send message", + fun() -> t_http_send(TcpPort, HttpPort) end}, + {"http broadcast", + fun() -> t_http_broadcast(TcpPort, HttpPort) end}, + {"http unregister", + fun() -> t_http_unregister(HttpPort) end}, + {"http subscribe topic", + fun() -> t_http_subscribe(HttpPort) end}, + + %% HTTP agent visible to TCP agents + {"http agent in list_agents", + fun() -> t_http_visible_in_list(TcpPort, HttpPort) end}, + + %% Duplicate name prevention + {"duplicate name tcp gets suffix", + fun() -> t_duplicate_name_tcp(TcpPort) end}, + {"duplicate name http gets suffix", + fun() -> t_duplicate_name_http(HttpPort) end}, + {"duplicate name cross protocol", + fun() -> t_duplicate_name_cross(TcpPort, HttpPort) end}, + + %% Session not found + {"http heartbeat bad token", + fun() -> t_http_bad_token(HttpPort) end}, + {"http poll bad token", + fun() -> t_http_poll_bad_token(HttpPort) end}, + + %% Agent status shows session_type + {"http agent status", + fun() -> t_http_agent_status(TcpPort, HttpPort) end} + ] + end}. + +%%==================================================================== +%% HTTP session registration tests +%%==================================================================== + +t_http_register(HttpPort) -> + AgentId = rand_agent(), + Body = #{<<"agent_id">> => AgentId}, + {ok, Resp} = http_post(HttpPort, "/agents/register", Body), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, Resp)), + ?assert(is_binary(maps:get(<<"token">>, Resp))), + ?assert(is_binary(maps:get(<<"session_id">>, Resp))), + ?assertEqual(AgentId, maps:get(<<"agent_id">>, Resp)), + ?assertEqual(<<"http">>, maps:get(<<"mode">>, Resp)), + %% Cleanup + Token = maps:get(<<"token">>, Resp), + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +t_http_register_stateless(HttpPort) -> + AgentId = rand_agent(), + Body = #{<<"agent_id">> => AgentId, <<"mode">> => <<"stateless">>}, + {ok, Resp} = http_post(HttpPort, "/agents/register", Body), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, Resp)), + ?assertEqual(<<"stateless">>, maps:get(<<"mode">>, Resp)), + Token = maps:get(<<"token">>, Resp), + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +t_http_register_custom_ttl(HttpPort) -> + AgentId = rand_agent(), + Body = #{<<"agent_id">> => AgentId, + <<"mode">> => <<"stateless">>, + <<"ttl_ms">> => 600000}, %% 10 minutes + {ok, Resp} = http_post(HttpPort, "/agents/register", Body), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, Resp)), + ?assertEqual(600000, maps:get(<<"ttl_ms">>, Resp)), + Token = maps:get(<<"token">>, Resp), + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +%%==================================================================== +%% HTTP session operation tests +%%==================================================================== + +t_http_heartbeat(HttpPort) -> + AgentId = rand_agent(), + {ok, RegResp} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => AgentId}), + Token = maps:get(<<"token">>, RegResp), + %% Send heartbeat + {ok, HbResp} = http_post(HttpPort, "/agents/heartbeat", + #{<<"token">> => Token}), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, HbResp)), + ?assert(is_integer(maps:get(<<"ts">>, HbResp))), + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +t_http_poll(TcpPort, HttpPort) -> + %% Register HTTP agent + HttpAgent = rand_agent(), + {ok, RegResp} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => HttpAgent}), + Token = maps:get(<<"token">>, RegResp), + ActualHttpAgent = maps:get(<<"agent_id">>, RegResp), + + %% Register TCP agent and send message to HTTP agent + TcpAgent = rand_agent(), + {ok, Sock} = gen_tcp:connect({127,0,0,1}, TcpPort, + [binary, {packet, line}, {active, false}], 2000), + send_on(Sock, #{<<"op">> => <<"register">>, <<"agent_id">> => TcpAgent}), + {ok, _} = recv_on(Sock), + send_on(Sock, #{<<"op">> => <<"send">>, <<"to">> => ActualHttpAgent, + <<"payload">> => #{<<"msg">> => <<"hello from tcp">>}}), + {ok, SendResp} = recv_on(Sock), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, SendResp)), + gen_tcp:close(Sock), + + %% Small delay to let message be queued + timer:sleep(100), + + %% Poll messages from HTTP agent + {ok, PollResp} = http_get(HttpPort, + "/agents/poll?token=" ++ binary_to_list(Token)), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, PollResp)), + Messages = maps:get(<<"messages">>, PollResp), + ?assert(length(Messages) >= 1), + %% Verify message content + [FirstMsg | _] = Messages, + ?assertEqual(<<"message">>, maps:get(<<"event">>, FirstMsg)), + ?assertEqual(TcpAgent, maps:get(<<"from">>, FirstMsg)), + + %% Second poll should have no messages (they were consumed) + {ok, PollResp2} = http_get(HttpPort, + "/agents/poll?token=" ++ binary_to_list(Token)), + ?assertEqual(0, maps:get(<<"count">>, PollResp2)), + + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +t_http_send(TcpPort, HttpPort) -> + %% Register TCP agent on persistent connection + TcpAgent = rand_agent(), + {ok, Sock} = gen_tcp:connect({127,0,0,1}, TcpPort, + [binary, {packet, line}, {active, false}], 2000), + send_on(Sock, #{<<"op">> => <<"register">>, <<"agent_id">> => TcpAgent}), + {ok, _} = recv_on(Sock), + + %% Register HTTP agent + HttpAgent = rand_agent(), + {ok, RegResp} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => HttpAgent}), + Token = maps:get(<<"token">>, RegResp), + + %% Send message from HTTP agent to TCP agent + {ok, SendResp} = http_post(HttpPort, "/agents/send", + #{<<"token">> => Token, + <<"to">> => TcpAgent, + <<"payload">> => #{<<"msg">> => <<"hello from http">>}}), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, SendResp)), + + %% TCP agent should receive the message as a pushed event + %% Drain events (agent_joined, etc.) until we find the message + ok = drain_until_event(Sock, <<"message">>, 5), + + gen_tcp:close(Sock), + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +t_http_broadcast(TcpPort, HttpPort) -> + %% Register TCP listener + TcpAgent = rand_agent(), + {ok, Sock} = gen_tcp:connect({127,0,0,1}, TcpPort, + [binary, {packet, line}, {active, false}], 2000), + send_on(Sock, #{<<"op">> => <<"register">>, <<"agent_id">> => TcpAgent}), + {ok, _} = recv_on(Sock), + + %% Register HTTP agent and broadcast + HttpAgent = rand_agent(), + {ok, RegResp} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => HttpAgent}), + Token = maps:get(<<"token">>, RegResp), + + {ok, BcResp} = http_post(HttpPort, "/agents/broadcast", + #{<<"token">> => Token, + <<"payload">> => #{<<"msg">> => <<"broadcast from http">>}}), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, BcResp)), + + gen_tcp:close(Sock), + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +t_http_unregister(HttpPort) -> + AgentId = rand_agent(), + {ok, RegResp} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => AgentId}), + Token = maps:get(<<"token">>, RegResp), + + %% Unregister + {ok, UnregResp} = http_post(HttpPort, "/agents/unregister", + #{<<"token">> => Token}), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, UnregResp)), + + %% Heartbeat should fail now + {ok, HbResp} = http_post(HttpPort, "/agents/heartbeat", + #{<<"token">> => Token}), + ?assertEqual(<<"error">>, maps:get(<<"status">>, HbResp)). + +t_http_subscribe(HttpPort) -> + AgentId = rand_agent(), + {ok, RegResp} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => AgentId}), + Token = maps:get(<<"token">>, RegResp), + + {ok, SubResp} = http_post(HttpPort, "/agents/subscribe", + #{<<"token">> => Token, + <<"topic">> => <<"test-topic">>}), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, SubResp)), + + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +%%==================================================================== +%% Cross-protocol visibility test +%%==================================================================== + +t_http_visible_in_list(TcpPort, HttpPort) -> + %% Register HTTP agent + HttpAgent = rand_agent(), + {ok, RegResp} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => HttpAgent}), + Token = maps:get(<<"token">>, RegResp), + ActualId = maps:get(<<"agent_id">>, RegResp), + + %% TCP agent lists agents + ListAgent = rand_agent(), + Cmds = [ + #{<<"op">> => <<"register">>, <<"agent_id">> => ListAgent}, + #{<<"op">> => <<"list_agents">>} + ], + {ok, [_, ListR]} = send_multi(TcpPort, Cmds), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, ListR)), + Agents = maps:get(<<"agents">>, ListR), + ?assert(lists:member(ActualId, Agents)), + + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +%%==================================================================== +%% Duplicate name prevention tests +%%==================================================================== + +t_duplicate_name_tcp(TcpPort) -> + AgentId = <<"dup-tcp-", (rand_id())/binary>>, + %% Register first agent on persistent connection + {ok, Sock1} = gen_tcp:connect({127,0,0,1}, TcpPort, + [binary, {packet, line}, {active, false}], 2000), + send_on(Sock1, #{<<"op">> => <<"register">>, <<"agent_id">> => AgentId}), + {ok, Reg1} = recv_on(Sock1), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, Reg1)), + ?assertEqual(AgentId, maps:get(<<"agent_id">>, Reg1)), + + %% Register second agent with same name + {ok, Sock2} = gen_tcp:connect({127,0,0,1}, TcpPort, + [binary, {packet, line}, {active, false}], 2000), + send_on(Sock2, #{<<"op">> => <<"register">>, <<"agent_id">> => AgentId}), + {ok, Reg2} = recv_on(Sock2), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, Reg2)), + %% Should have gotten a different agent_id with suffix + Reg2AgentId = maps:get(<<"agent_id">>, Reg2), + ?assertNotEqual(AgentId, Reg2AgentId), + %% Should start with original name followed by a dash + PrefixLen = byte_size(AgentId) + 1, %% original + "-" + ?assertEqual(<>, + binary:part(Reg2AgentId, 0, PrefixLen)), + %% Suffix should be 6 chars + SuffixLen = byte_size(Reg2AgentId) - PrefixLen, + ?assertEqual(6, SuffixLen), + + gen_tcp:close(Sock1), + gen_tcp:close(Sock2). + +t_duplicate_name_http(HttpPort) -> + AgentId = <<"dup-http-", (rand_id())/binary>>, + %% Register first HTTP agent + {ok, Reg1} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => AgentId}), + Token1 = maps:get(<<"token">>, Reg1), + ?assertEqual(AgentId, maps:get(<<"agent_id">>, Reg1)), + + %% Register second HTTP agent with same name + {ok, Reg2} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => AgentId}), + Token2 = maps:get(<<"token">>, Reg2), + Reg2Id = maps:get(<<"agent_id">>, Reg2), + ?assertNotEqual(AgentId, Reg2Id), + %% Should contain the original name + ?assert(binary:match(Reg2Id, AgentId) =/= nomatch), + + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token1}), + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token2}). + +t_duplicate_name_cross(TcpPort, HttpPort) -> + AgentId = <<"dup-cross-", (rand_id())/binary>>, + %% Register via TCP first + {ok, Sock} = gen_tcp:connect({127,0,0,1}, TcpPort, + [binary, {packet, line}, {active, false}], 2000), + send_on(Sock, #{<<"op">> => <<"register">>, <<"agent_id">> => AgentId}), + {ok, _} = recv_on(Sock), + + %% Try registering same name via HTTP + {ok, HttpReg} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => AgentId}), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, HttpReg)), + HttpId = maps:get(<<"agent_id">>, HttpReg), + ?assertNotEqual(AgentId, HttpId), + + gen_tcp:close(Sock), + Token = maps:get(<<"token">>, HttpReg), + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +%%==================================================================== +%% Error handling tests +%%==================================================================== + +t_http_bad_token(HttpPort) -> + {ok, Resp} = http_post(HttpPort, "/agents/heartbeat", + #{<<"token">> => <<"PLUTO-nonexistent">>}), + ?assertEqual(<<"error">>, maps:get(<<"status">>, Resp)). + +t_http_poll_bad_token(HttpPort) -> + {ok, Resp} = http_get(HttpPort, "/agents/poll?token=PLUTO-nonexistent"), + ?assertEqual(<<"error">>, maps:get(<<"status">>, Resp)). + +%%==================================================================== +%% HTTP agent status test +%%==================================================================== + +t_http_agent_status(TcpPort, HttpPort) -> + %% Register HTTP agent + HttpAgent = rand_agent(), + {ok, RegResp} = http_post(HttpPort, "/agents/register", + #{<<"agent_id">> => HttpAgent}), + Token = maps:get(<<"token">>, RegResp), + ActualId = maps:get(<<"agent_id">>, RegResp), + + %% Query status from TCP + Querier = rand_agent(), + Cmds = [ + #{<<"op">> => <<"register">>, <<"agent_id">> => Querier}, + #{<<"op">> => <<"agent_status">>, <<"agent_id">> => ActualId} + ], + {ok, [_, StatusR]} = send_multi(TcpPort, Cmds), + ?assertEqual(<<"ok">>, maps:get(<<"status">>, StatusR)), + ?assertEqual(true, maps:get(<<"online">>, StatusR)), + + http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}). + +%%==================================================================== +%% HTTP helpers +%%==================================================================== + +http_post(HttpPort, Path, Body) -> + JsonBody = pluto_protocol_json:encode(Body), + case gen_tcp:connect({127, 0, 0, 1}, HttpPort, + [binary, {packet, http_bin}, {active, false}], 2000) of + {ok, Sock} -> + Request = [ + <<"POST ">>, list_to_binary(Path), <<" HTTP/1.1\r\n">>, + <<"Host: localhost\r\n">>, + <<"Content-Type: application/json\r\n">>, + <<"Content-Length: ">>, integer_to_binary(byte_size(JsonBody)), <<"\r\n">>, + <<"Connection: close\r\n">>, + <<"\r\n">>, + JsonBody + ], + %% Switch to raw for sending + inet:setopts(Sock, [{packet, raw}]), + gen_tcp:send(Sock, Request), + %% Read response + inet:setopts(Sock, [{packet, http_bin}]), + Result = read_http_response(Sock), + gen_tcp:close(Sock), + Result; + {error, Reason} -> + {error, {connect, Reason}} + end. + +http_get(HttpPort, Path) -> + case gen_tcp:connect({127, 0, 0, 1}, HttpPort, + [binary, {packet, http_bin}, {active, false}], 2000) of + {ok, Sock} -> + Request = [ + <<"GET ">>, list_to_binary(Path), <<" HTTP/1.1\r\n">>, + <<"Host: localhost\r\n">>, + <<"Connection: close\r\n">>, + <<"\r\n">> + ], + inet:setopts(Sock, [{packet, raw}]), + gen_tcp:send(Sock, Request), + inet:setopts(Sock, [{packet, http_bin}]), + Result = read_http_response(Sock), + gen_tcp:close(Sock), + Result; + {error, Reason} -> + {error, {connect, Reason}} + end. + +read_http_response(Sock) -> + case gen_tcp:recv(Sock, 0, 5000) of + {ok, {http_response, _, _StatusCode, _}} -> + Headers = read_resp_headers(Sock, []), + ContentLen = resp_content_length(Headers), + inet:setopts(Sock, [{packet, raw}]), + case read_resp_body(Sock, ContentLen) of + Body when byte_size(Body) > 0 -> + pluto_protocol_json:decode(Body); + _ -> + {ok, #{}} + end; + {error, Reason} -> + {error, Reason} + end. + +read_resp_headers(Sock, Acc) -> + case gen_tcp:recv(Sock, 0, 5000) of + {ok, {http_header, _, Name, _, Value}} -> + read_resp_headers(Sock, [{header_name(Name), Value} | Acc]); + {ok, http_eoh} -> + lists:reverse(Acc); + _ -> + lists:reverse(Acc) + end. + +resp_content_length(Headers) -> + case lists:keyfind(<<"Content-Length">>, 1, Headers) of + {_, Val} -> + try binary_to_integer(Val) catch _:_ -> 0 end; + false -> + 0 + end. + +read_resp_body(_Sock, 0) -> + <<>>; +read_resp_body(Sock, Len) when Len > 0 -> + case gen_tcp:recv(Sock, Len, 5000) of + {ok, Data} -> Data; + _ -> <<>> + end. + +header_name(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +header_name(Bin) when is_binary(Bin) -> Bin. + +%%==================================================================== +%% TCP helpers (same as integration tests) +%%==================================================================== + +send_multi(Port, Reqs) -> + case gen_tcp:connect({127, 0, 0, 1}, Port, + [binary, {packet, line}, {active, false}], 2000) of + {ok, Sock} -> + lists:foreach(fun(Req) -> + gen_tcp:send(Sock, pluto_protocol_json:encode_line(Req)) + end, Reqs), + Responses = lists:map(fun(_) -> + case gen_tcp:recv(Sock, 0, 2000) of + {ok, Data} -> + case pluto_protocol_json:decode(string:trim(Data)) of + {ok, Map} -> Map; + {error, _} -> #{<<"status">> => <<"decode_error">>} + end; + {error, _} -> + #{<<"status">> => <<"recv_error">>} + end + end, Reqs), + gen_tcp:close(Sock), + {ok, Responses}; + {error, Reason} -> + {error, {connect, Reason}} + end. + +send_on(Sock, Req) -> + gen_tcp:send(Sock, pluto_protocol_json:encode_line(Req)). + +recv_on(Sock) -> + case gen_tcp:recv(Sock, 0, 2000) of + {ok, Data} -> + pluto_protocol_json:decode(string:trim(Data)); + {error, Reason} -> + {error, Reason} + end. + +rand_agent() -> + <<"v021-", (rand_id())/binary>>. + +rand_id() -> + Hex = binary:encode_hex(crypto:strong_rand_bytes(4)), + string:lowercase(Hex). + +%% @private Drain events from a TCP socket until finding the target event type. +drain_until_event(_Sock, _EventType, 0) -> + ok; +drain_until_event(Sock, EventType, N) -> + case gen_tcp:recv(Sock, 0, 2000) of + {ok, Data} -> + case pluto_protocol_json:decode(string:trim(Data)) of + {ok, #{<<"event">> := EventType}} -> + ok; + {ok, _Other} -> + drain_until_event(Sock, EventType, N - 1); + _ -> + drain_until_event(Sock, EventType, N - 1) + end; + {error, timeout} -> + ok + end. diff --git a/src_py/agent_guide_template.md b/src_py/agent_guide_template.md index 1064acd..76f2356 100644 --- a/src_py/agent_guide_template.md +++ b/src_py/agent_guide_template.md @@ -1,658 +1,382 @@ # Pluto Agent Guide > **Generated:** {{generated_at}} -> **Server:** {{host}}:{{port}} +> **Server:** {{host}}:{{port}} (TCP) · {{host}}:9001 (HTTP) --- -## Overview +## What Is Pluto? -Pluto is a centralized coordination server for AI agents. It enables agents to -**discover each other, collaborate proactively, coordinate shared resources, -assign and track tasks, and communicate in real time**. +Pluto is a coordination server for AI agents. Connect, discover peers, lock +shared resources, exchange messages, and track tasks — all through JSON. -### Core Capabilities - -| Capability | Description | +| Feature | What It Does | |---|---| -| **Agent Discovery** | Find agents by role, capability, or custom attributes — never hardcode agent IDs | -| **Resource Locking** | Exclusive and shared locks with TTL, fencing tokens, and deadlock detection | -| **Messaging** | Point-to-point, broadcast, and topic-based pub/sub with offline inbox | -| **Task Management** | Assign, update, batch-distribute, and track tasks across agents | -| **Presence & Status** | Query which agents are online, their status, and last-seen timestamps | -| **Lease Management** | Time-bounded leases with automatic expiration and renewal | - -### Your Role as an Agent - -As a Pluto-connected agent, you should: - -1. **Register with descriptive attributes** — declare your role, capabilities, - and any metadata so other agents can discover you. -2. **Discover peers proactively** — on startup (and periodically), query for - other agents using `find_agents` to understand who is available and what - they can do. -3. **Subscribe to relevant topics** — join channels that match your role so - you receive targeted updates without polling. -4. **Collaborate actively** — when you receive a task or message, respond - promptly. When you complete work, notify interested parties. -5. **Coordinate resources** — always acquire locks before modifying shared - resources and release them promptly when done. -6. **Report your status** — set a meaningful custom status so others know - whether you are idle, busy, or waiting. - -The user can guide and fine-tune your collaborative behavior (e.g., which topics -to subscribe to, how aggressively to discover peers, which task types to accept). +| **Locking** | Exclusive/shared locks with TTL, fencing tokens, deadlock detection | +| **Messaging** | Direct messages, broadcast, topic pub/sub, offline inbox | +| **Discovery** | Find agents by role or attributes; presence & status tracking | +| **Tasks** | Assign, update, batch-distribute, and monitor tasks across agents | --- -## Access Methods - -Three access methods (in order of preference): - -1. **TCP JSON** (port {{port}}) — newline-delimited JSON over a persistent TCP - socket. Full-featured: supports async push events, heartbeat, real-time - lock grants. **Use this if you can open TCP sockets.** -2. **Python client** — a thin wrapper around the TCP protocol. Use if you have - Python available but prefer a higher-level API. -3. **HTTP/REST** (port 9001) — stateless JSON endpoints. Use when TCP sockets - are unavailable (e.g. sandboxed environments). Note: no push events. - ---- +## Quick Start — Pick Your Access Method -## Step 0 — Test Connectivity +Test connectivity first, then use the method that works: -Before choosing an approach, verify the server is reachable. - -**TCP ping** (preferred — if this works, use TCP JSON directly): ```bash +# Try TCP (full-featured, preferred) echo '{"op":"ping"}' | nc {{host}} {{port}} -``` -Expected response: -```json -{"status":"pong","ts":1711234567890,"heartbeat_interval_ms":15000} -``` -**HTTP ping** (fallback): -```bash +# Try HTTP (works everywhere) curl -s http://{{host}}:9001/ping ``` -Expected: `{"status":"pong","ts":...}` -If the TCP ping succeeds → follow **Approach 1 (TCP JSON)** below. -If only the HTTP ping succeeds → skip to **Approach 3 (HTTP/REST)**. -If neither works, ensure the Pluto server is running. +| Method | Port | When to Use | +|---|---|---| +| **TCP** | {{port}} | You can open TCP sockets — gives push events, real-time lock grants | +| **HTTP Sessions** | 9001 | Cannot maintain TCP (e.g. Claude Code, sandboxed envs) — register once, poll for events | +| **HTTP One-Shot** | 9001 | Quick lock/message operations without registering | +| **Python `PlutoClient`** | {{port}} | High-level wrapper over TCP with heartbeat and event dispatch | +| **Python `PlutoHttpClient`** | 9001 | High-level wrapper over HTTP sessions | --- -## Approach 1 — TCP JSON (Preferred) +## Method 1 — TCP (Full-Featured) -Pluto speaks **newline-delimited JSON** over TCP. Each message is a single JSON -object terminated by `\n`. This approach gives you full access to all features -including server-pushed async events. +Newline-delimited JSON over a persistent TCP connection to `{{host}}:{{port}}`. -### 1.1 Register with Attributes +### Register -Connect to `{{host}}:{{port}}` via TCP and **declare your capabilities**: -```json -{"op":"register","agent_id":"my-agent","attributes":{"role":"coder","lang":["python","erlang"],"capabilities":["refactor","test"]}} -``` -Response: ```json -{"status":"ok","session_id":"sess-a3f2...","heartbeat_interval_ms":15000} +{"op":"register","agent_id":"my-agent","attributes":{"role":"coder","capabilities":["refactor","test"]}} ``` +→ `{"status":"ok","session_id":"sess-a3f2...","heartbeat_interval_ms":15000}` -Optional authentication: -```json -{"op":"register","agent_id":"my-agent","token":"secret","attributes":{"role":"reviewer"}} -``` +If `my-agent` is already taken, the server assigns a unique name (e.g. +`my-agent-k8Xm2p`) and returns it in `agent_id`. Always use the returned +`agent_id` for subsequent operations. + +### Heartbeat -**Important:** Always set meaningful `attributes` so other agents can discover -you. At minimum, include a `role` field. +Send `{"op":"ping"}` every 15 s. Sessions expire after 30 s of silence. -### 1.2 Discover Other Agents (Do This Early) +### Discovery -Immediately after registering, discover who else is available: ```json {"op":"find_agents","filter":{"role":"reviewer"}} ``` -Response: -```json -{"status":"ok","agents":["reviewer-1","reviewer-3"]} -``` +→ `{"status":"ok","agents":["reviewer-1","reviewer-3"]}` -You can filter by any attribute key-value pair. Use this to: -- Find agents with specific capabilities before assigning tasks -- Check who is available before sending messages -- Build a local map of team capabilities - -**Query agent details:** ```json {"op":"agent_status","agent_id":"reviewer-1"} ``` -Response: -```json -{"status":"ok","agent_id":"reviewer-1","online":true,"custom_status":"idle","last_seen":1711234567890,"attributes":{"role":"reviewer","lang":["python"]}} -``` +→ `{"status":"ok","agent_id":"reviewer-1","online":true,"custom_status":"idle","attributes":{"role":"reviewer"}}` -**List all agents with full details:** ```json {"op":"list_agents","detailed":true} ``` -### 1.3 Subscribe to Topics - -Subscribe to channels relevant to your role: -```json -{"op":"subscribe","topic":"code-reviews"} -``` -```json -{"status":"ok"} -``` - -Now you will receive events when anyone publishes to that topic: -```json -{"event":"topic_message","topic":"code-reviews","from":"coder-1","payload":{"file":"main.py","action":"ready_for_review"}} -``` - -Unsubscribe when no longer interested: -```json -{"op":"unsubscribe","topic":"code-reviews"} -``` - -### 1.4 Set Your Status - -Let others know what you are doing: +Set your own status: ```json {"op":"agent_status","custom_status":"reviewing main.py"} ``` -```json -{"status":"ok"} -``` - -Update your status as your work changes so collaborators can make informed -decisions about whom to contact or assign work to. -### 1.5 Heartbeat +### Locking -Send a `ping` every `heartbeat_interval_ms` (default 15 s) to keep the session -alive. Sessions expire after 30 s of silence. - -```json -{"op":"ping"} -``` -```json -{"status":"pong","ts":1711234567890,"heartbeat_interval_ms":15000} -``` - -### 1.6 Acquire a Lock - -Before modifying a shared resource, always acquire a lock: ```json {"op":"acquire","resource":"file:/src/main.py","mode":"write","ttl_ms":30000} ``` +→ Immediate: `{"status":"ok","lock_ref":"LOCK-42","fencing_token":17}` +→ Queued: `{"status":"wait","wait_ref":"WAIT-99"}` (then async `lock_granted` event) -**Immediate grant:** -```json -{"status":"ok","lock_ref":"LOCK-42","fencing_token":17} -``` +| Mode | Behavior | +|------|----------| +| `write` | Exclusive — no other locks allowed | +| `read` | Shared — multiple readers OK, writers wait | -**Queued (resource is busy):** -```json -{"status":"wait","wait_ref":"WAIT-99"} -``` -Later, the server pushes asynchronously: -```json -{"event":"lock_granted","wait_ref":"WAIT-99","lock_ref":"LOCK-43","fencing_token":18} -``` - -Lock modes: -- `"write"` — exclusive. No other agent can hold any lock on the resource. -- `"read"` — shared. Multiple readers allowed; writers block until all readers - release. - -Optional fields: -- `"max_wait_ms"` — maximum time to wait in the queue before timeout. - -**Non-blocking probe (try-acquire):** ```json {"op":"try_acquire","resource":"file:/src/main.py","mode":"write","ttl_ms":30000} ``` -Returns immediately without queuing: -```json -{"status":"ok","lock_ref":"LOCK-44","fencing_token":19} -``` -or: -```json -{"status":"unavailable","resource":"file:/src/main.py"} -``` - -### 1.7 Release a Lock +→ Returns immediately: `ok` with lock_ref, or `{"status":"unavailable"}`. ```json {"op":"release","lock_ref":"LOCK-42"} -``` -```json -{"status":"ok"} -``` - -### 1.8 Renew a Lock (Extend TTL) - -```json {"op":"renew","lock_ref":"LOCK-42","ttl_ms":30000} ``` -```json -{"status":"ok"} -``` -### 1.9 Send a Direct Message (with Delivery Tracking) +### Messaging +**Direct message:** ```json {"op":"send","to":"reviewer-1","payload":{"type":"ready","file":"main.py"},"request_id":"req-001"} ``` -```json -{"status":"ok","msg_id":"MSG-42"} -``` +→ `{"status":"ok","msg_id":"MSG-42"}` — recipient gets `{"event":"message",...}`. +Offline recipients receive queued messages on reconnect. -The recipient receives a push event: +**Broadcast (all agents):** ```json -{"event":"message","from":"my-agent","payload":{"type":"ready","file":"main.py"},"msg_id":"MSG-42"} +{"op":"broadcast","payload":{"text":"build complete"}} ``` -If the recipient is **offline**, the message is queued in their inbox and -delivered automatically when they reconnect. - -The sender receives a delivery confirmation: +**Pub/sub (subscribers only):** ```json -{"event":"delivery_ack","msg_id":"MSG-42","request_id":"req-001","to":"reviewer-1"} +{"op":"subscribe","topic":"code-reviews"} +{"op":"publish","topic":"code-reviews","payload":{"file":"main.py","action":"ready"}} +{"op":"unsubscribe","topic":"code-reviews"} ``` -**Acknowledge receipt** (lets the sender know you processed it): +**Acknowledge receipt:** ```json {"op":"ack","msg_id":"MSG-42"} ``` -### 1.10 Broadcast +### Tasks ```json -{"op":"broadcast","payload":{"type":"announcement","text":"build complete"}} -``` -```json -{"status":"ok"} -``` - -All other agents receive: -```json -{"event":"broadcast","from":"my-agent","payload":{"type":"announcement","text":"build complete"}} -``` - -### 1.11 Publish to a Topic - -More targeted than broadcast — only subscribers receive it: -```json -{"op":"publish","topic":"build-status","payload":{"status":"green","commit":"abc123"}} -``` -```json -{"status":"ok"} +{"op":"task_assign","assignee":"coder-2","description":"Fix bug #42","payload":{"file":"main.py"}} ``` +→ `{"status":"ok","task_id":"TASK-1"}` — assignee gets `{"event":"task_assigned",...}`. -Subscribers to `"build-status"` receive: -```json -{"event":"topic_message","topic":"build-status","from":"my-agent","payload":{"status":"green","commit":"abc123"}} -``` - -### 1.12 Task Management - -#### Assign a task: -```json -{"op":"task_assign","assignee":"coder-2","description":"Fix bug #42","payload":{"file":"main.py","line":17}} -``` -```json -{"status":"ok","task_id":"TASK-1"} -``` - -The assignee receives: -```json -{"event":"task_assigned","task_id":"TASK-1","from":"my-agent","description":"Fix bug #42","payload":{"file":"main.py","line":17}} -``` - -#### Update task status: ```json {"op":"task_update","task_id":"TASK-1","status":"completed","result":{"fix":"applied patch"}} -``` -```json -{"status":"ok"} -``` - -All agents are notified: -```json -{"event":"task_updated","task_id":"TASK-1","agent_id":"coder-2","status":"completed"} -``` - -Task statuses: `"pending"`, `"in_progress"`, `"completed"`, `"failed"`, `"orphaned"`. - -#### List tasks: -```json -{"op":"task_list"} -``` -```json -{"status":"ok","tasks":[{"task_id":"TASK-1","assignee":"coder-2","assigner":"my-agent","status":"completed","description":"Fix bug #42"}]} -``` - -Filter by assignee or status: -```json {"op":"task_list","assignee":"coder-2","status":"pending"} -``` - -#### Batch assign tasks: -```json -{"op":"task_batch","tasks":[{"assignee":"coder-1","description":"Fix module A"},{"assignee":"coder-2","description":"Fix module B"}]} -``` -```json -{"status":"ok","task_ids":["TASK-2","TASK-3"]} -``` - -#### View global progress: -```json +{"op":"task_batch","tasks":[{"assignee":"coder-1","description":"A"},{"assignee":"coder-2","description":"B"}]} {"op":"task_progress"} ``` -```json -{"status":"ok","total":10,"by_status":{"pending":3,"in_progress":4,"completed":2,"failed":1},"by_agent":{"coder-1":{"pending":1,"in_progress":2},"coder-2":{"completed":2,"failed":1}}} -``` - -#### Orphaned tasks: -When an agent disconnects with unfinished tasks, all agents are notified: -```json -{"event":"tasks_orphaned","agent_id":"coder-2","tasks":["TASK-5","TASK-6"]} -``` -This lets another agent pick up the abandoned work. - -### 1.13 Acknowledge Events - -Report the highest event sequence number you have processed: -```json -{"op":"ack_events","last_seq":42} -``` -```json -{"status":"ok"} -``` -This helps the server track which events you have seen, enabling reliable -event replay on reconnection. +Task statuses: `pending` → `in_progress` → `completed` | `failed` | `orphaned`. +When an agent disconnects with unfinished tasks, all agents get a `tasks_orphaned` event. -### 1.14 List Agents +### Server-Pushed Events -```json -{"op":"list_agents"} -``` -```json -{"status":"ok","agents":["coder-1","reviewer-2"]} -``` +Events have an `"event"` key (responses have `"status"`). Handle them as they arrive: -### 1.15 Stats - -```json -{"op":"stats"} -``` -Returns server statistics (locks, messages, deadlocks, per-agent counters). - -### 1.16 Server-Pushed Events - -Events arrive asynchronously on the TCP socket at any time. They are identified -by the `"event"` key (no `"status"` key). Route them accordingly: - -| Event | Description | -|---------------------|-------------| -| `lock_granted` | A queued lock was granted to you | -| `lock_expired` | One of your locks expired (TTL elapsed) | -| `lock_released` | A lock you were waiting for was released | -| `message` | Direct message from another agent | -| `broadcast` | Broadcast message from another agent | -| `topic_message` | Message published to a topic you subscribed to | -| `delivery_ack` | Confirmation that your message was delivered | -| `task_assigned` | A task was assigned to you | -| `task_updated` | A task's status changed | -| `tasks_orphaned` | An agent disconnected with unfinished tasks | -| `agent_joined` | Another agent connected | -| `agent_left` | Another agent disconnected | -| `deadlock_detected` | Server resolved a deadlock involving you | -| `wait_timeout` | Your lock wait timed out | - -**Routing rule:** if the received JSON has an `"event"` key → async event; -otherwise → response to your last request. - -### 1.17 Recommended Startup Sequence +| Event | Trigger | +|---|---| +| `message` | Direct message received | +| `broadcast` | Broadcast received | +| `topic_message` | Published to a topic you subscribed to | +| `lock_granted` | Queued lock was granted | +| `lock_expired` | Your lock's TTL elapsed | +| `task_assigned` | Task assigned to you | +| `task_updated` | Task status changed | +| `tasks_orphaned` | Agent disconnected with unfinished tasks | +| `agent_joined` / `agent_left` | Agent connected/disconnected | +| `delivery_ack` | Your message was delivered | +| `deadlock_detected` | Server broke a deadlock involving you | -Follow this pattern every time you connect: +### Recommended Startup Sequence ``` -1. Register → {"op":"register","agent_id":"...","attributes":{...}} -2. Set status → {"op":"agent_status","custom_status":"initializing"} -3. Discover peers → {"op":"find_agents","filter":{}} -4. Subscribe → {"op":"subscribe","topic":"..."} (one per topic) -5. Check tasks → {"op":"task_list","assignee":"","status":"pending"} -6. Set status → {"op":"agent_status","custom_status":"ready"} -7. Begin work → process pending tasks, respond to messages +1. {"op":"register","agent_id":"...","attributes":{...}} +2. {"op":"find_agents","filter":{}} +3. {"op":"subscribe","topic":"..."} — for each relevant topic +4. {"op":"task_list","assignee":"","status":"pending"} +5. {"op":"agent_status","custom_status":"ready"} +6. Begin work — process tasks, respond to messages, ping every 15s ``` -### 1.18 Full TCP Session Example +### Session Example ``` -→ {"op":"register","agent_id":"coder-1","attributes":{"role":"coder","lang":["python"]}} +→ {"op":"register","agent_id":"coder-1","attributes":{"role":"coder"}} ← {"status":"ok","session_id":"sess-abc","heartbeat_interval_ms":15000} - → {"op":"find_agents","filter":{"role":"reviewer"}} -← {"status":"ok","agents":["reviewer-1","reviewer-3"]} - -→ {"op":"subscribe","topic":"code-reviews"} -← {"status":"ok"} - -→ {"op":"agent_status","custom_status":"ready"} -← {"status":"ok"} - +← {"status":"ok","agents":["reviewer-1"]} → {"op":"acquire","resource":"file:/src/main.py","mode":"write","ttl_ms":30000} ← {"status":"ok","lock_ref":"LOCK-1","fencing_token":1} - ... do work ... - → {"op":"release","lock_ref":"LOCK-1"} ← {"status":"ok"} - -→ {"op":"publish","topic":"code-reviews","payload":{"file":"main.py","action":"ready"}} -← {"status":"ok"} - -→ {"op":"send","to":"reviewer-1","payload":{"type":"review_request","file":"main.py"},"request_id":"req-1"} +→ {"op":"send","to":"reviewer-1","payload":{"type":"review_request","file":"main.py"}} ← {"status":"ok","msg_id":"MSG-5"} -← {"event":"delivery_ack","msg_id":"MSG-5","request_id":"req-1","to":"reviewer-1"} - → {"op":"ping"} ← {"status":"pong","ts":1711234567890,"heartbeat_interval_ms":15000} ``` -### 1.19 Response Statuses +--- -| Status | Meaning | -|---------------|------------------------------------------------| -| `ok` | Request succeeded. | -| `wait` | Lock is queued; expect a `lock_granted` event. | -| `error` | Request failed; see the `reason` field. | -| `pong` | Reply to `ping`. | -| `unavailable` | try_acquire: resource is already locked. | +## Method 2 — HTTP Sessions (No Persistent Connection) -### 1.20 Error Reasons +For agents that **cannot maintain a TCP socket** (CLI tools, sandboxed +environments, serverless functions). You register via HTTP and receive a +**token** for subsequent requests. Messages are retrieved by polling. -`bad_request`, `unknown_op`, `unknown_target`, `conflict`, `not_found`, -`expired`, `wait_timeout`, `deadlock`, `already_registered`, `unauthorized`, -`not_registered`, `internal_error`. +### Register ---- +```bash +curl -s -X POST http://{{host}}:9001/agents/register \ + -H "Content-Type: application/json" \ + -d '{"agent_id":"my-agent","mode":"http"}' +``` +→ `{"status":"ok","token":"PLUTO-A37...","session_id":"sess-...","agent_id":"my-agent","mode":"http","ttl_ms":300000}` -## Approach 2 — Python Client (Wraps TCP) +Modes: +- `http` — standard HTTP session (default TTL: 5 min) +- `stateless` — for fire-and-forget agents (same TTL, but semantically different) -Use this if you have Python available and prefer a high-level API. The client -manages the TCP socket, heartbeat, and event dispatch internally. +Custom TTL: add `"ttl_ms":600000` to the request body. -### 2.1 Quick Start +**Save the returned `token`** — it authenticates all subsequent requests. -```python -from pluto_client import PlutoClient +### Heartbeat -client = PlutoClient(host="{{host}}", port={{port}}, agent_id="my-agent") -client.connect() +```bash +curl -s -X POST http://{{host}}:9001/agents/heartbeat \ + -H "Content-Type: application/json" \ + -d '{"token":"PLUTO-A37..."}' +``` +Call periodically (before TTL expires) to keep the session alive. + +### Poll Messages -# Acquire an exclusive lock -lock_ref = client.acquire("file:/repo/src/model.py", ttl_ms=30000) +```bash +curl -s "http://{{host}}:9001/agents/poll?token=PLUTO-A37..." +``` +→ `{"status":"ok","count":2,"messages":[{"event":"message","from":"coder-1","payload":{...}},...]}` -# ... do work ... +Each poll also acts as a heartbeat. Messages are delivered once and removed from the inbox. -# Release the lock when done -client.release(lock_ref) +### Send / Broadcast / Subscribe -# Send a message to another agent -client.send("reviewer-1", {"type": "ready", "file": "model.py"}) +```bash +# Direct message +curl -s -X POST http://{{host}}:9001/agents/send \ + -H "Content-Type: application/json" \ + -d '{"token":"PLUTO-A37...","to":"coder-1","payload":{"text":"hello"}}' -client.disconnect() +# Broadcast +curl -s -X POST http://{{host}}:9001/agents/broadcast \ + -H "Content-Type: application/json" \ + -d '{"token":"PLUTO-A37...","payload":{"text":"announcement"}}' + +# Subscribe to topic +curl -s -X POST http://{{host}}:9001/agents/subscribe \ + -H "Content-Type: application/json" \ + -d '{"token":"PLUTO-A37...","topic":"build-status"}' ``` -Or use the context manager: +### Unregister -```python -with PlutoClient(host="{{host}}", port={{port}}, agent_id="my-agent") as client: - lock_ref = client.acquire("workspace:experiment-1") - # ... do work ... - client.release(lock_ref) +```bash +curl -s -X POST http://{{host}}:9001/agents/unregister \ + -H "Content-Type: application/json" \ + -d '{"token":"PLUTO-A37..."}' ``` -### 2.2 API Reference +### Shell Registration (PlutoClient.sh) -**Connection:** +```bash +# HTTP registration +./PlutoClient.sh register --http --agent-id my-agent -| Method | Description | -|----------------|---------------------------------------------------| -| `connect()` | Open TCP connection and register with the server. | -| `disconnect()` | Close connection gracefully. | +# Stateless with custom TTL +./PlutoClient.sh register --stateless --ttl 600 --agent-id my-agent -**Locking:** +# Background TCP daemon (maintains heartbeat for you) +./PlutoClient.sh register --daemon --agent-id my-agent +``` -| Method | Signature | Returns | -|---------------|----------------------------------------------------------------------|-------------------------------------| -| `acquire` | `(resource, mode="write", ttl_ms=30000)` | `lock_ref` (granted) or `wait_ref` (queued) | -| `try_acquire` | `(resource, mode="write", ttl_ms=30000)` | `lock_ref` or `None` if unavailable | -| `release` | `(lock_ref)` | None | -| `renew` | `(lock_ref, ttl_ms=30000)` | None | +--- -**Messaging:** +## Method 3 — HTTP One-Shot (No Registration) -| Method | Signature | Description | -|-------------|------------------------------|-------------------------------------| -| `send` | `(to, payload, request_id=)` | Send a direct message to one agent. | -| `broadcast` | `(payload)` | Broadcast a message to all agents. | -| `publish` | `(topic, payload)` | Publish to a topic channel. | -| `ack` | `(msg_id)` | Acknowledge receipt of a message. | +For quick operations without registering. Use `agent_id` or `from` in the +request body to identify yourself. -**Discovery & Collaboration:** +### Health & Info -| Method | Signature | Description | -|----------------|--------------------------|-------------------------------------------| -| `find_agents` | `(filter={})` | Find agents matching attribute filter. | -| `list_agents` | `(detailed=False)` | List connected agent IDs (or full details).| -| `agent_status` | `(agent_id)` | Query a specific agent's status. | -| `set_status` | `(custom_status)` | Set your own status string. | +```bash +GET /ping → {"status":"pong","ts":...} +GET /health → {"status":"ok","version":"0.2.1"} +GET /agents → {"status":"ok","agents":["coder-1",...]} +GET /agents/list/detailed → full agent details with attributes +GET /agents/ → single agent status +GET /locks → active locks +GET /tasks → all tasks +GET /tasks/progress → task counts by status and agent +GET /events?since_token=0 → event history (polling) +``` +All URLs are prefixed with `http://{{host}}:9001`. -**Topics:** +### Lock Operations -| Method | Signature | Description | -|---------------|-----------------|--------------------------------------| -| `subscribe` | `(topic)` | Subscribe to a named topic channel. | -| `unsubscribe` | `(topic)` | Unsubscribe from a topic channel. | +```bash +POST /locks/acquire {"agent_id":"me","resource":"file:/x","mode":"write","ttl_ms":30000} +POST /locks/release {"lock_ref":"LOCK-42","agent_id":"me"} +POST /locks/renew {"lock_ref":"LOCK-42","ttl_ms":30000} +``` -**Task Management:** +### Messaging (One-Shot) -| Method | Signature | Description | -|-----------------|-------------------------------------|----------------------------------------| -| `task_assign` | `(assignee, description, payload=)` | Assign a task to an agent. | -| `task_update` | `(task_id, status, result=)` | Update task status. | -| `task_list` | `(assignee=, status=)` | List tasks with optional filters. | -| `task_batch` | `(tasks: list)` | Batch-assign tasks. | -| `task_progress` | `()` | View global task progress. | +```bash +POST /messages/send {"from":"me","to":"coder-1","payload":{"text":"hello"}} +POST /messages/broadcast {"from":"me","payload":{"text":"announcement"}} +``` -**Stats:** +### Agent Discovery -| Method | Signature | Description | -|---------------|--------------------|----------------------------------------| -| `stats` | `() -> dict` | Query server statistics. | +```bash +POST /agents/find {"filter":{"role":"reviewer"}} +``` -**Event handlers:** +### Admin -```python -client.on_message(lambda e: print("Direct message:", e["payload"])) -client.on_broadcast(lambda e: print("Broadcast:", e["payload"])) -client.on_lock_granted(lambda e: print("Lock granted:", e["lock_ref"])) -client.on("task_assigned", lambda e: print("New task:", e["task_id"])) -client.on("topic_message", lambda e: print(f"[{e['topic']}]", e["payload"])) -client.on("tasks_orphaned", lambda e: print("Orphaned:", e["tasks"])) -client.on("delivery_ack", lambda e: print("Delivered:", e["msg_id"])) +```bash +GET /admin/fencing_seq +GET /admin/deadlock_graph +POST /admin/force_release {"lock_ref":"LOCK-42"} +POST /selftest ``` -### 2.3 Collaborative Agent Pattern +--- + +## Python Clients + +### PlutoClient (TCP) ```python from pluto_client import PlutoClient -with PlutoClient(host="{{host}}", port={{port}}, agent_id="coder-1") as client: - # 1. Discover peers +with PlutoClient(host="{{host}}", port={{port}}, agent_id="my-agent") as client: + # Discover peers reviewers = client.find_agents({"role": "reviewer"}) - testers = client.find_agents({"role": "tester"}) - # 2. Subscribe to relevant topics - client.subscribe("build-status") - client.subscribe("code-reviews") - - # 3. Set up event handlers - client.on("task_assigned", handle_new_task) - client.on("topic_message", handle_topic_update) - client.on("tasks_orphaned", maybe_pick_up_orphan) - - # 4. Set ready status - client.set_status("ready") - - # 5. Check for pending tasks - my_tasks = client.task_list(assignee="coder-1", status="pending") - for task in my_tasks: - process_task(task) + # Lock → work → release + lock_ref = client.acquire("file:/src/main.py", ttl_ms=30000) + # ... do work ... + client.release(lock_ref) - # 6. After completing work, notify team - if reviewers: - client.send(reviewers[0], {"type": "review_request", "file": "main.py"}) - client.publish("build-status", {"status": "green", "commit": "abc123"}) + # Notify + client.send(reviewers[0], {"type": "review_request", "file": "main.py"}) ``` -### 2.4 Threaded Message Waiting & Multi-Turn Conversations - -The `on_message` callback fires on the client's internal reader thread. -To **block your main thread** until a specific message arrives, use a -thread-safe helper like the one below. Every Pluto demo relies on this -pattern for turn-based or request/response coordination. +**Event handlers** (install before `connect()`): +```python +client.on_message(lambda e: print("Message:", e["payload"])) +client.on_broadcast(lambda e: print("Broadcast:", e["payload"])) +client.on_lock_granted(lambda e: print("Lock granted:", e["lock_ref"])) +client.on("task_assigned", lambda e: print("Task:", e["task_id"])) +client.on("topic_message", lambda e: print(f"[{e['topic']}]", e["payload"])) +``` +**Blocking wait for a message** (useful for turn-based coordination): ```python import threading, time -from pluto_client import PlutoClient -# ── shared message queue ────────────────────────────────────────── messages = [] msg_event = threading.Event() -_msg_lock = threading.Lock() +_lock = threading.Lock() def on_msg(event): - """Callback — runs on PlutoClient's reader thread.""" - with _msg_lock: + with _lock: messages.append(event) msg_event.set() def wait_msg(from_agent, timeout=30): - """Block until a message from `from_agent` arrives (or timeout).""" deadline = time.time() + timeout while time.time() < deadline: - with _msg_lock: + with _lock: for i, m in enumerate(messages): if m.get("from") == from_agent: return messages.pop(i) @@ -660,227 +384,80 @@ def wait_msg(from_agent, timeout=30): msg_event.wait(timeout=1) raise TimeoutError(f"No message from {from_agent} within {timeout}s") -# ── connect with the handler installed BEFORE connect() ─────────── client = PlutoClient(host="{{host}}", port={{port}}, agent_id="agent-a") -client.on_message(on_msg) # must be set before connect() +client.on_message(on_msg) client.connect() -``` -#### Handshake / Ready Synchronisation - -Two agents can synchronise startup before beginning coordinated work: - -```python -# Both agents run this (with each other's ID as PEER): -PEER = "agent-b" -client.send(PEER, {"type": "ready", "agent": client.agent_id}) -wait_msg(PEER, timeout=30) # blocks until peer sends ready +# Handshake with peer +client.send("agent-b", {"type": "ready"}) +wait_msg("agent-b", timeout=30) ``` -#### Multi-Turn Conversation Loop - -```python -TURNS = [ - (1, "What is the capital of France?", "Paris"), - (2, "What is 2 + 2?", "4"), - # ... more turns ... -] - -for turn, question, expected_answer in TURNS: - if my_turn_to_ask(turn): - client.send(PEER, {"type": "question", "turn": turn, "text": question}) - answer_msg = wait_msg(PEER, timeout=30) - answer_text = answer_msg["payload"]["text"] - # Optionally lock a shared resource to log the exchange - lock = client.acquire("shared:transcript", mode="write", ttl_ms=10000) - log_exchange(turn, question, answer_text) - client.release(lock) - else: - q_msg = wait_msg(PEER, timeout=30) - client.send(PEER, {"type": "answer", "turn": turn, "text": expected_answer}) -``` - -This pattern scales to any number of turns or agents. The key rules: -- Install `on_message` **before** calling `connect()`. -- Use a `threading.Lock` around the shared message list. -- Always set a reasonable `timeout` to avoid permanent hangs. - -### 2.5 CLI Reference - -```bash -# Verify server connectivity -python pluto_client.py ping --host {{host}} --port {{port}} - -# List connected agents -python pluto_client.py list --host {{host}} --port {{port}} - -# Query server statistics -python pluto_client.py stats --host {{host}} --port {{port}} - -# Generate this guide -python pluto_client.py guide --host {{host}} --port {{port}} -``` - ---- - -## Approach 3 — HTTP/REST (Fallback) - -Use when TCP sockets are not available. The HTTP API (port 9001) is stateless — -no persistent connection, no push events. Poll `/events` for event history. - -### 3.1 Health & Discovery - -```bash -GET http://{{host}}:9001/ping -# → {"status":"pong","ts":...} - -GET http://{{host}}:9001/health -# → {"status":"ok","version":"0.2.0"} - -GET http://{{host}}:9001/agents -# → {"status":"ok","agents":["coder-1","reviewer-2"]} +### PlutoHttpClient (HTTP Sessions) -GET http://{{host}}:9001/agents/list/detailed -# → {"status":"ok","agents":[{"agent_id":"coder-1","status":"connected","attributes":{...},...}]} +For agents that cannot maintain TCP connections: -GET http://{{host}}:9001/agents/coder-1 -# → {"status":"ok","agent_id":"coder-1","online":true,"attributes":{...},"last_seen":...} - -GET http://{{host}}:9001/locks -# → {"status":"ok","locks":[{"lock_ref":"LOCK-1","resource":"...","agent_id":"...","mode":"write","fencing_token":5}]} -``` - -### 3.2 Agent Discovery - -```bash -POST http://{{host}}:9001/agents/find - -d '{"filter":{"role":"reviewer"}}' -# → {"status":"ok","agents":["reviewer-1","reviewer-3"]} -``` - -### 3.3 Lock Operations - -**Acquire:** -```bash -curl -s -X POST http://{{host}}:9001/locks/acquire \ - -H "Content-Type: application/json" \ - -d '{"agent_id":"my-agent","resource":"file:/src/main.py","mode":"write","ttl_ms":30000}' -# → {"status":"ok","lock_ref":"LOCK-42","fencing_token":17} -# or {"status":"wait","wait_ref":"WAIT-99"} -``` +```python +from pluto_client import PlutoHttpClient -**Release:** -```bash -curl -s -X POST http://{{host}}:9001/locks/release \ - -H "Content-Type: application/json" \ - -d '{"lock_ref":"LOCK-42","agent_id":"my-agent"}' -# → {"status":"ok"} +with PlutoHttpClient(host="{{host}}", http_port=9001, agent_id="my-agent") as client: + # Already registered — token managed automatically + agents = client.list_agents() + client.send("coder-1", {"text": "hello from HTTP"}) + messages = client.poll() # also heartbeats + client.heartbeat() # explicit keepalive ``` -**Renew:** -```bash -curl -s -X POST http://{{host}}:9001/locks/renew \ - -H "Content-Type: application/json" \ - -d '{"lock_ref":"LOCK-42","ttl_ms":30000}' -# → {"status":"ok"} -``` +### API Quick Reference -### 3.4 Messaging (One-Shot HTTP) +| Category | PlutoClient (TCP) | PlutoHttpClient (HTTP) | +|---|---|---| +| Connect | `connect()` / context manager | `register()` / context manager | +| Lock | `acquire()`, `try_acquire()`, `release()`, `renew()` | — (use one-shot HTTP) | +| Message | `send()`, `broadcast()`, `publish()` | `send()`, `broadcast()` | +| Subscribe | `subscribe()`, `unsubscribe()` | `subscribe()` | +| Discovery | `find_agents()`, `list_agents()`, `agent_status()` | `list_agents()`, `agent_status()` | +| Tasks | `task_assign()`, `task_update()`, `task_list()`, `task_batch()`, `task_progress()` | — (use one-shot HTTP) | +| Status | `set_status()` | — | +| Events | Push callbacks: `on_message()`, `on_broadcast()`, `on()` | `poll()` | +| Keepalive | Automatic heartbeat thread | `heartbeat()` or `poll()` | +| Stats | `stats()` | — | -Send a message without maintaining a TCP session: -```bash -curl -s -X POST http://{{host}}:9001/messages/send \ - -H "Content-Type: application/json" \ - -d '{"from":"http-agent","to":"coder-1","payload":{"text":"hello from HTTP"}}' -# → {"status":"ok","msg_id":"MSG-15"} -``` +### CLI -Broadcast: ```bash -curl -s -X POST http://{{host}}:9001/messages/broadcast \ - -H "Content-Type: application/json" \ - -d '{"from":"http-agent","payload":{"text":"announcement"}}' -# → {"status":"ok"} +./PlutoClient.sh ping # test connectivity +./PlutoClient.sh list # list agents +./PlutoClient.sh stats # server statistics +./PlutoClient.sh guide --output agent_guide.md # generate this guide +./PlutoClient.sh register --http --agent-id X # HTTP session +./PlutoClient.sh register --daemon --agent-id X # TCP daemon ``` -### 3.5 Task Management - -```bash -GET http://{{host}}:9001/tasks -# → {"status":"ok","tasks":[...]} - -GET http://{{host}}:9001/tasks/progress -# → {"status":"ok","total":10,"by_status":{...},"by_agent":{...}} -``` +--- -### 3.6 Events (Polling) +## Key Rules -```bash -GET http://{{host}}:9001/events?since_token=0&limit=50 -# → {"status":"ok","events":[...]} -``` +1. **Always release locks.** Use try/finally. Default TTL: 30 s. +2. **Heartbeat.** TCP: ping every 15 s. HTTP: call heartbeat or poll before TTL expires. +3. **Resource naming.** Use `file:/path/to/file` for files, `workspace:` for logical scopes. +4. **Prefer topics over broadcast.** Subscribe to specific channels instead of broadcasting everything. +5. **Use task primitives** for work assignment — they are tracked and generate lifecycle events. +6. **Handle `tasks_orphaned` events** — pick up abandoned work when agents disconnect. +7. **Duplicate names.** If your `agent_id` is taken, the server appends a 6-character suffix. Always use the returned `agent_id`. -### 3.7 Admin +## Response Statuses -```bash -GET http://{{host}}:9001/admin/fencing_seq -GET http://{{host}}:9001/admin/deadlock_graph -POST http://{{host}}:9001/admin/force_release {"lock_ref":"LOCK-42"} -POST http://{{host}}:9001/selftest -``` +| Status | Meaning | +|---|---| +| `ok` | Success | +| `wait` | Lock queued — expect `lock_granted` event | +| `error` | Failed — see `reason` field | +| `pong` | Reply to ping | +| `unavailable` | `try_acquire`: resource is locked | ---- +## Error Reasons -## Collaborative Best Practices - -### Resource Coordination -1. **Always release locks** when your work is done. Use try/finally or ensure - release on every code path to avoid leaking locks. -2. **Set reasonable TTLs.** A TTL that is too short risks expiration mid-work; - too long delays other agents. 30 seconds is a good default. -3. **Renew before expiry** if an operation takes longer than expected. -4. **Use try_acquire for optional work** — if a resource is busy, move on to - something else instead of blocking. -5. **Use resource naming conventions** — e.g., `file:/path/to/file` for file - locks, `workspace:` for logical workspaces. -6. **Use fencing tokens** to detect stale locks — each lock grant includes a - monotonically increasing `fencing_token`. - -### Communication -7. **Prefer topics over broadcast** — subscribe to specific channels rather than - broadcasting everything to everyone. -8. **Use direct messages for targeted requests** — send review requests, task - completions, and questions to specific agents. -9. **Track delivery with request_id** — include a `request_id` when sending - messages to get `delivery_ack` confirmation. -10. **Acknowledge important messages** — send an `ack` for messages that require - a response to close the feedback loop. - -### Discovery & Collaboration -11. **Register with rich attributes** — include role, capabilities, supported - languages, or any metadata that helps peers discover you. -12. **Discover peers on startup** — call `find_agents` with relevant filters - before beginning work. Re-discover periodically. -13. **React to agent_joined/agent_left events** — update your local peer map - when agents come and go. -14. **Check agent status before sending** — use `agent_status` to verify an - agent is online before sending critical requests. -15. **Pick up orphaned tasks** — listen for `tasks_orphaned` events and - volunteer to take over abandoned work if you have the capacity. - -### Session Management -16. **Send heartbeats** (TCP only) — ping every 15 s to keep the session alive. -17. **Handle reconnection gracefully** — offline messages are delivered - automatically on reconnect. Check for pending tasks. -18. **Set meaningful status** — update your `custom_status` as your work - changes (e.g., "idle", "reviewing main.py", "running tests"). - -### Task Workflow -19. **Use task primitives** instead of ad-hoc messages for work assignment. - Tasks are tracked, filterable, and generate lifecycle events. -20. **Update task progress** — move tasks through statuses: pending → - in_progress → completed/failed. -21. **Use task_batch for parallel work** — atomically assign multiple tasks - to distribute work efficiently. -22. **Monitor task_progress** — periodically check global progress to - understand team velocity and bottlenecks. +`bad_request` · `unknown_op` · `unknown_target` · `conflict` · `not_found` · +`expired` · `wait_timeout` · `deadlock` · `already_registered` · `unauthorized` · +`not_registered` · `internal_error` diff --git a/src_py/pluto_client.py b/src_py/pluto_client.py index 7b9cc6f..ba21b95 100644 --- a/src_py/pluto_client.py +++ b/src_py/pluto_client.py @@ -37,11 +37,15 @@ import socket import sys import threading +import time +import urllib.request +import urllib.error from typing import Callable, Dict, List, Optional from pluto_client_def import ( DEFAULT_HOST, DEFAULT_PORT, + DEFAULT_HTTP_PORT, DEFAULT_TIMEOUT, DEFAULT_AGENT_ID, DEFAULT_GUIDE_OUTPUT_PATH, @@ -425,6 +429,152 @@ def _dispatch_line(self, line: str): self._response_queue.put(msg) +# ── HTTP Client ─────────────────────────────────────────────────────────────── + +class PlutoHttpClient: + """ + HTTP-based client for the Pluto coordination server. + + Unlike PlutoClient (TCP), this client uses stateless HTTP requests and + does not maintain a persistent socket. Ideal for CLI agents (like Claude + Code) that execute one-shot commands. + + Supports two modes: + - "http": Standard HTTP session with token-based auth + - "stateless": Declares the agent as stateless with a configurable TTL + + Usage: + client = PlutoHttpClient(host="localhost", http_port=9001, agent_id="claude-1") + client.register() + # ... do work, poll for messages ... + client.heartbeat() # keep alive + messages = client.poll() + client.unregister() + """ + + def __init__( + self, + host: str = DEFAULT_HOST, + http_port: int = DEFAULT_HTTP_PORT, + agent_id: str = DEFAULT_AGENT_ID, + timeout: float = DEFAULT_TIMEOUT, + attributes: Optional[Dict] = None, + mode: str = "http", + ttl_ms: int = 300000, + ): + self.host = host + self.http_port = http_port + self.agent_id = agent_id + self.timeout = timeout + self.attributes = attributes or {} + self.mode = mode + self.ttl_ms = ttl_ms + self.base_url = f"http://{host}:{http_port}" + self.token: Optional[str] = None + self.session_id: Optional[str] = None + + def _post(self, path: str, body: dict) -> dict: + """Send a POST request and return the parsed JSON response.""" + data = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + f"{self.base_url}{path}", + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + def _get(self, path: str) -> dict: + """Send a GET request and return the parsed JSON response.""" + req = urllib.request.Request( + f"{self.base_url}{path}", + method="GET", + ) + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + def register(self) -> dict: + """Register this agent via HTTP. Returns server response with token.""" + body = { + "agent_id": self.agent_id, + "mode": self.mode, + "ttl_ms": self.ttl_ms, + } + if self.attributes: + body["attributes"] = self.attributes + resp = self._post("/agents/register", body) + if resp.get("status") == "ok": + self.token = resp.get("token") + self.session_id = resp.get("session_id") + # Server may have assigned a different name + if resp.get("agent_id"): + self.agent_id = resp["agent_id"] + return resp + + def heartbeat(self) -> dict: + """Send a heartbeat to keep the HTTP session alive.""" + if not self.token: + raise PlutoError("not registered (no token)") + return self._post("/agents/heartbeat", {"token": self.token}) + + def poll(self) -> List[dict]: + """Poll for queued messages. Also acts as a heartbeat.""" + if not self.token: + raise PlutoError("not registered (no token)") + resp = self._get(f"/agents/poll?token={self.token}") + return resp.get("messages", []) + + def send(self, to: str, payload: dict, request_id: Optional[str] = None) -> dict: + """Send a direct message to another agent.""" + if not self.token: + raise PlutoError("not registered (no token)") + body = {"token": self.token, "to": to, "payload": payload} + if request_id: + body["request_id"] = request_id + return self._post("/agents/send", body) + + def broadcast(self, payload: dict) -> dict: + """Broadcast a message to all agents.""" + if not self.token: + raise PlutoError("not registered (no token)") + return self._post("/agents/broadcast", {"token": self.token, "payload": payload}) + + def subscribe(self, topic: str) -> dict: + """Subscribe to a topic channel.""" + if not self.token: + raise PlutoError("not registered (no token)") + return self._post("/agents/subscribe", {"token": self.token, "topic": topic}) + + def unregister(self) -> dict: + """Unregister and remove the HTTP session.""" + if not self.token: + raise PlutoError("not registered (no token)") + resp = self._post("/agents/unregister", {"token": self.token}) + self.token = None + self.session_id = None + return resp + + def list_agents(self) -> List[str]: + """List all connected agents via HTTP.""" + resp = self._get("/agents") + return resp.get("agents", []) + + def agent_status(self, agent_id: str) -> dict: + """Query a specific agent's status.""" + return self._get(f"/agents/{agent_id}") + + def __enter__(self): + self.register() + return self + + def __exit__(self, *_): + try: + self.unregister() + except Exception: + pass + + # ── Agent guide generation ──────────────────────────────────────────────────── def generate_agent_guide( diff --git a/src_py/pluto_client_def.py b/src_py/pluto_client_def.py index 6c51482..20712f5 100644 --- a/src_py/pluto_client_def.py +++ b/src_py/pluto_client_def.py @@ -24,6 +24,7 @@ DEFAULT_HOST: str = "localhost" DEFAULT_PORT: int = 9000 +DEFAULT_HTTP_PORT: int = 9001 DEFAULT_TIMEOUT: float = 10.0 # seconds DEFAULT_AGENT_ID: str = "pluto-cli" diff --git a/tests/test_v021_http_sessions.py b/tests/test_v021_http_sessions.py new file mode 100644 index 0000000..d912b93 --- /dev/null +++ b/tests/test_v021_http_sessions.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python3 +""" +test_v021_http_sessions.py — Integration tests for Pluto v0.2.1 HTTP session features. + +Tests all 4 solutions: + 1. HTTP-based session registration (POST /agents/register with token) + 2. Stateless agent mode (mode=stateless, configurable TTL) + 3. PlutoClient.sh register --daemon (tested separately via shell) + 4. Configurable heartbeat TTL for HTTP agents + +Also tests: + - Duplicate agent name prevention + - Cross-protocol visibility (HTTP agents visible to TCP agents) + - HTTP message send/receive via polling + - HTTP broadcast + - HTTP heartbeat keeps session alive + - Session expiry on missing heartbeats + +Requires: Pluto server running on localhost:9000 (TCP) / :9001 (HTTP) +""" + +import json +import os +import socket +import sys +import time +import urllib.request +import urllib.error + +# Add src_py to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src_py")) + +from pluto_client import PlutoClient, PlutoHttpClient, PlutoError + +HOST = "127.0.0.1" +TCP_PORT = 9000 +HTTP_PORT = 9001 +BASE_URL = f"http://{HOST}:{HTTP_PORT}" + +passed = 0 +failed = 0 +errors = [] + + +def test(name): + """Decorator for test functions.""" + def decorator(func): + global passed, failed + try: + func() + passed += 1 + print(f" ✓ {name}") + except Exception as e: + failed += 1 + errors.append((name, str(e))) + print(f" ✗ {name}: {e}") + return decorator + + +def http_post(path, body): + data = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + f"{BASE_URL}{path}", + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=5) as resp: + return json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as e: + return json.loads(e.read().decode("utf-8")) + + +def http_get(path): + req = urllib.request.Request(f"{BASE_URL}{path}", method="GET") + try: + with urllib.request.urlopen(req, timeout=5) as resp: + return json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as e: + return json.loads(e.read().decode("utf-8")) + + +def tcp_send_recv(msg): + """Send a JSON message via TCP and get the response.""" + sock = socket.create_connection((HOST, TCP_PORT), timeout=5) + line = (json.dumps(msg) + "\n").encode("utf-8") + sock.sendall(line) + data = b"" + while b"\n" not in data: + chunk = sock.recv(4096) + if not chunk: + break + data += chunk + sock.close() + return json.loads(data.decode("utf-8").strip()) + + +def rand_id(): + import secrets + return secrets.token_hex(4) + + +# ═══════════════════════════════════════════════════════════════════ +# Solution 1: HTTP-based session registration +# ═══════════════════════════════════════════════════════════════════ + +@test("Solution 1: HTTP register returns token and session_id") +def _(): + agent_id = f"test-http-{rand_id()}" + resp = http_post("/agents/register", {"agent_id": agent_id}) + assert resp["status"] == "ok", f"Expected ok, got {resp}" + assert "token" in resp, f"Missing token in {resp}" + assert "session_id" in resp, f"Missing session_id in {resp}" + assert resp["agent_id"] == agent_id + assert resp["mode"] == "http" + # Cleanup + http_post("/agents/unregister", {"token": resp["token"]}) + + +@test("Solution 1: HTTP agent appears in /agents list") +def _(): + agent_id = f"test-visible-{rand_id()}" + reg = http_post("/agents/register", {"agent_id": agent_id}) + token = reg["token"] + + agents = http_get("/agents") + assert agent_id in agents["agents"], f"{agent_id} not in {agents['agents']}" + + http_post("/agents/unregister", {"token": token}) + + +@test("Solution 1: HTTP heartbeat keeps session alive") +def _(): + agent_id = f"test-hb-{rand_id()}" + reg = http_post("/agents/register", {"agent_id": agent_id}) + token = reg["token"] + + hb = http_post("/agents/heartbeat", {"token": token}) + assert hb["status"] == "ok" + assert "ts" in hb + + http_post("/agents/unregister", {"token": token}) + + +@test("Solution 1: HTTP agent can send messages to TCP agent") +def _(): + # Register TCP agent + tcp_agent = f"tcp-recv-{rand_id()}" + sock = socket.create_connection((HOST, TCP_PORT), timeout=5) + sock.sendall((json.dumps({"op": "register", "agent_id": tcp_agent}) + "\n").encode()) + tcp_resp = b"" + while b"\n" not in tcp_resp: + tcp_resp += sock.recv(4096) + + # Register HTTP agent + http_agent = f"http-sender-{rand_id()}" + reg = http_post("/agents/register", {"agent_id": http_agent}) + token = reg["token"] + + # Send from HTTP to TCP + send_resp = http_post("/agents/send", { + "token": token, + "to": tcp_agent, + "payload": {"msg": "hello from HTTP"} + }) + assert send_resp["status"] == "ok", f"Send failed: {send_resp}" + + # TCP agent should receive the message (drain events) + sock.settimeout(2) + received_msg = False + for _ in range(5): + try: + data = sock.recv(4096) + for line in data.decode().strip().split("\n"): + event = json.loads(line) + if event.get("event") == "message" and event.get("from") == http_agent: + received_msg = True + break + except socket.timeout: + break + if received_msg: + break + + sock.close() + http_post("/agents/unregister", {"token": token}) + assert received_msg, "TCP agent did not receive message from HTTP agent" + + +@test("Solution 1: HTTP agent can poll messages") +def _(): + # Register HTTP agent + http_agent = f"http-poll-{rand_id()}" + reg = http_post("/agents/register", {"agent_id": http_agent}) + token = reg["token"] + actual_id = reg["agent_id"] + + # Send message via TCP to HTTP agent + tcp_agent = f"tcp-sender-{rand_id()}" + sock = socket.create_connection((HOST, TCP_PORT), timeout=5) + sock.sendall((json.dumps({"op": "register", "agent_id": tcp_agent}) + "\n").encode()) + sock.recv(4096) + sock.sendall((json.dumps({"op": "send", "to": actual_id, + "payload": {"data": "test-poll"}}) + "\n").encode()) + sock.recv(4096) + sock.close() + + time.sleep(0.2) + + # Poll messages + poll = http_get(f"/agents/poll?token={token}") + assert poll["status"] == "ok", f"Poll failed: {poll}" + assert poll["count"] >= 1, f"Expected at least 1 message, got {poll['count']}" + msgs = poll["messages"] + found = any(m.get("from") == tcp_agent for m in msgs) + assert found, f"Message from {tcp_agent} not found in {msgs}" + + # Second poll should be empty + poll2 = http_get(f"/agents/poll?token={token}") + assert poll2["count"] == 0, f"Expected 0 messages on second poll, got {poll2['count']}" + + http_post("/agents/unregister", {"token": token}) + + +@test("Solution 1: HTTP unregister removes agent") +def _(): + agent_id = f"test-unreg-{rand_id()}" + reg = http_post("/agents/register", {"agent_id": agent_id}) + token = reg["token"] + + unreg = http_post("/agents/unregister", {"token": token}) + assert unreg["status"] == "ok" + + # Heartbeat should fail + hb = http_post("/agents/heartbeat", {"token": token}) + assert hb["status"] == "error" + + +# ═══════════════════════════════════════════════════════════════════ +# Solution 2: Stateless agent mode +# ═══════════════════════════════════════════════════════════════════ + +@test("Solution 2: Stateless agent registration") +def _(): + agent_id = f"test-stateless-{rand_id()}" + resp = http_post("/agents/register", { + "agent_id": agent_id, + "mode": "stateless" + }) + assert resp["status"] == "ok" + assert resp["mode"] == "stateless" + assert "token" in resp + + # Agent appears in list + agents = http_get("/agents") + assert agent_id in agents["agents"] + + http_post("/agents/unregister", {"token": resp["token"]}) + + +@test("Solution 2: Stateless agent receives messages via poll") +def _(): + agent_id = f"stateless-recv-{rand_id()}" + reg = http_post("/agents/register", { + "agent_id": agent_id, + "mode": "stateless" + }) + token = reg["token"] + + # Send via message/send HTTP endpoint + sender = f"sender-{rand_id()}" + http_post("/agents/register", {"agent_id": sender}) + + send_resp = http_post("/messages/send", { + "agent_id": sender, + "to": agent_id, + "payload": {"info": "stateless test"} + }) + assert send_resp["status"] == "ok" + + time.sleep(0.1) + poll = http_get(f"/agents/poll?token={token}") + assert poll["count"] >= 1 + + http_post("/agents/unregister", {"token": token}) + + +# ═══════════════════════════════════════════════════════════════════ +# Solution 4: Configurable heartbeat TTL +# ═══════════════════════════════════════════════════════════════════ + +@test("Solution 4: Custom TTL accepted at registration") +def _(): + agent_id = f"test-ttl-{rand_id()}" + resp = http_post("/agents/register", { + "agent_id": agent_id, + "mode": "stateless", + "ttl_ms": 600000 # 10 minutes + }) + assert resp["status"] == "ok" + assert resp["ttl_ms"] == 600000 + + http_post("/agents/unregister", {"token": resp["token"]}) + + +@test("Solution 4: Default TTL is 300000ms (5 min)") +def _(): + agent_id = f"test-defttl-{rand_id()}" + resp = http_post("/agents/register", { + "agent_id": agent_id, + "mode": "http" + }) + assert resp["status"] == "ok" + assert resp["ttl_ms"] == 300000 + + http_post("/agents/unregister", {"token": resp["token"]}) + + +# ═══════════════════════════════════════════════════════════════════ +# Duplicate name prevention +# ═══════════════════════════════════════════════════════════════════ + +@test("Duplicate name: HTTP registration gets unique suffix") +def _(): + agent_id = f"dup-test-{rand_id()}" + + # Register first + reg1 = http_post("/agents/register", {"agent_id": agent_id}) + assert reg1["agent_id"] == agent_id + + # Register same name — should get suffixed + reg2 = http_post("/agents/register", {"agent_id": agent_id}) + assert reg2["status"] == "ok" + assert reg2["agent_id"] != agent_id, f"Expected different ID, got same: {reg2['agent_id']}" + assert reg2["agent_id"].startswith(agent_id + "-") + suffix = reg2["agent_id"][len(agent_id) + 1:] + assert len(suffix) == 6, f"Expected 6-char suffix, got '{suffix}' ({len(suffix)} chars)" + + http_post("/agents/unregister", {"token": reg1["token"]}) + http_post("/agents/unregister", {"token": reg2["token"]}) + + +@test("Duplicate name: TCP session gets unique suffix") +def _(): + agent_id = f"dup-tcp-{rand_id()}" + + # First TCP agent on persistent connection + sock1 = socket.create_connection((HOST, TCP_PORT), timeout=5) + sock1.sendall((json.dumps({"op": "register", "agent_id": agent_id}) + "\n").encode()) + r1 = b"" + while b"\n" not in r1: + r1 += sock1.recv(4096) + resp1 = json.loads(r1.decode().strip()) + assert resp1["agent_id"] == agent_id + + # Second TCP agent with same name + sock2 = socket.create_connection((HOST, TCP_PORT), timeout=5) + sock2.sendall((json.dumps({"op": "register", "agent_id": agent_id}) + "\n").encode()) + r2 = b"" + while b"\n" not in r2: + r2 += sock2.recv(4096) + resp2 = json.loads(r2.decode().strip()) + assert resp2["status"] == "ok" + assert resp2["agent_id"] != agent_id + assert resp2["agent_id"].startswith(agent_id + "-") + suffix = resp2["agent_id"][len(agent_id) + 1:] + assert len(suffix) == 6, f"Expected 6-char suffix, got '{suffix}'" + + sock1.close() + sock2.close() + + +@test("Duplicate name: Cross-protocol (TCP first, HTTP second)") +def _(): + agent_id = f"dup-cross-{rand_id()}" + + # Register via TCP + sock = socket.create_connection((HOST, TCP_PORT), timeout=5) + sock.sendall((json.dumps({"op": "register", "agent_id": agent_id}) + "\n").encode()) + r = b"" + while b"\n" not in r: + r += sock.recv(4096) + + # Register same via HTTP + reg = http_post("/agents/register", {"agent_id": agent_id}) + assert reg["status"] == "ok" + assert reg["agent_id"] != agent_id + + sock.close() + http_post("/agents/unregister", {"token": reg["token"]}) + + +# ═══════════════════════════════════════════════════════════════════ +# PlutoHttpClient Python class +# ═══════════════════════════════════════════════════════════════════ + +@test("PlutoHttpClient: register, heartbeat, poll, unregister") +def _(): + client = PlutoHttpClient( + host=HOST, http_port=HTTP_PORT, + agent_id=f"pyclient-{rand_id()}", + mode="http", + ttl_ms=300000 + ) + resp = client.register() + assert resp["status"] == "ok" + assert client.token is not None + assert client.session_id is not None + + hb = client.heartbeat() + assert hb["status"] == "ok" + + messages = client.poll() + assert isinstance(messages, list) + + agents = client.list_agents() + assert client.agent_id in agents + + unreg = client.unregister() + assert unreg["status"] == "ok" + + +@test("PlutoHttpClient: context manager") +def _(): + with PlutoHttpClient( + host=HOST, http_port=HTTP_PORT, + agent_id=f"ctx-{rand_id()}", + ) as client: + assert client.token is not None + agents = client.list_agents() + assert client.agent_id in agents + + +@test("PlutoHttpClient: send to TCP agent") +def _(): + # TCP agent + tcp_agent = f"tcp-target-{rand_id()}" + sock = socket.create_connection((HOST, TCP_PORT), timeout=5) + sock.sendall((json.dumps({"op": "register", "agent_id": tcp_agent}) + "\n").encode()) + sock.recv(4096) + + # HTTP sends + with PlutoHttpClient( + host=HOST, http_port=HTTP_PORT, + agent_id=f"http-src-{rand_id()}", + ) as client: + resp = client.send(tcp_agent, {"msg": "from PlutoHttpClient"}) + assert resp["status"] == "ok" + + sock.close() + + +# ═══════════════════════════════════════════════════════════════════ +# HTTP broadcast +# ═══════════════════════════════════════════════════════════════════ + +@test("HTTP agent can broadcast") +def _(): + agent_id = f"http-bc-{rand_id()}" + reg = http_post("/agents/register", {"agent_id": agent_id}) + token = reg["token"] + + resp = http_post("/agents/broadcast", { + "token": token, + "payload": {"msg": "broadcast from HTTP agent"} + }) + assert resp["status"] == "ok" + + http_post("/agents/unregister", {"token": token}) + + +# ═══════════════════════════════════════════════════════════════════ +# HTTP subscribe to topic +# ═══════════════════════════════════════════════════════════════════ + +@test("HTTP agent can subscribe to topic") +def _(): + agent_id = f"http-sub-{rand_id()}" + reg = http_post("/agents/register", {"agent_id": agent_id}) + token = reg["token"] + + resp = http_post("/agents/subscribe", { + "token": token, + "topic": "test-topic" + }) + assert resp["status"] == "ok" + + http_post("/agents/unregister", {"token": token}) + + +# ═══════════════════════════════════════════════════════════════════ +# Error cases +# ═══════════════════════════════════════════════════════════════════ + +@test("Invalid token returns error") +def _(): + resp = http_post("/agents/heartbeat", {"token": "PLUTO-fake"}) + assert resp["status"] == "error" + + +@test("Missing agent_id returns error") +def _(): + resp = http_post("/agents/register", {}) + assert "error" in resp + + +# ═══════════════════════════════════════════════════════════════════ + +if __name__ == "__main__": + print(f"\n{'='*60}") + print(f" Pluto v0.2.1 HTTP Session Integration Tests") + print(f" Server: {HOST}:{TCP_PORT} (TCP) / {HOST}:{HTTP_PORT} (HTTP)") + print(f"{'='*60}\n") + + # Tests already ran during import via decorators + print(f"\n{'='*60}") + print(f" Results: {passed} passed, {failed} failed") + if errors: + print(f"\n Failures:") + for name, err in errors: + print(f" ✗ {name}: {err}") + print(f"{'='*60}\n") + + sys.exit(1 if failed > 0 else 0)