Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions PlutoClient.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Commands:
stats Show server statistics (locks, messages, deadlocks,
per-agent counters). No registration required.
guide Generate the Pluto agent guide file.
register Register an agent and maintain presence.

Global options (place before the command):
--host HOST Pluto server host (default: 127.0.0.1)
Expand All @@ -98,6 +99,13 @@ Global options (place before the command):
Guide-specific options:
--output PATH Output path for the generated guide file.

Register-specific options:
--daemon Run as background daemon maintaining TCP heartbeat
--http Use HTTP registration (no persistent TCP needed)
--stateless Register as stateless agent with longer TTL
--ttl SECONDS TTL in seconds for HTTP/stateless mode (default: 300)
--http-port PORT HTTP port (default: 9001)

Examples:
${GREEN}# Check if the server is reachable${NC}
$(basename "$0") ping
Expand All @@ -111,6 +119,15 @@ Examples:
${GREEN}# Generate the agent coordination guide${NC}
$(basename "$0") guide --output ./agent_guide.md

${GREEN}# Register via HTTP (for CLI agents like Claude Code)${NC}
$(basename "$0") register --http --agent-id claude-workspace

${GREEN}# Register as stateless with 5-min TTL${NC}
$(basename "$0") register --stateless --ttl 300 --agent-id my-agent

${GREEN}# Spawn a TCP daemon that maintains heartbeat${NC}
$(basename "$0") register --daemon --agent-id my-agent

Requires: Python 3, a running Pluto server (see ${CYAN}./PlutoServer.sh --help${NC}).

${YELLOW}Starting an Agent with Pluto:${NC}
Expand Down Expand Up @@ -148,5 +165,155 @@ fi
PYTHON_BIN=$(find_python)
ensure_venv "${PYTHON_BIN}"

# ── Register subcommand special handling ──────────────────────────────────────

handle_register() {
local host="127.0.0.1"
local port="9000"
local http_port="9001"
local agent_id="pluto-cli"
local mode=""
local daemon=false
local ttl=300
local args=()

# Parse global options and register-specific flags
while [[ $# -gt 0 ]]; do
case "$1" in
--host) host="$2"; shift 2 ;;
--port) port="$2"; shift 2 ;;
--http-port) http_port="$2"; shift 2 ;;
--agent-id) agent_id="$2"; shift 2 ;;
--daemon) daemon=true; shift ;;
--http) mode="http"; shift ;;
--stateless) mode="stateless"; shift ;;
--ttl) ttl="$2"; shift 2 ;;
register) shift ;; # skip the command itself
*) args+=("$1"); shift ;;
esac
done

if [[ "$daemon" == true ]]; then
# Solution 3: Spawn a background daemon that maintains TCP connection
info "Starting TCP daemon for agent '${agent_id}'..."
local pidfile="/tmp/pluto/daemon_${agent_id}.pid"
mkdir -p /tmp/pluto

# Check if daemon is already running
if [[ -f "$pidfile" ]] && kill -0 "$(cat "$pidfile")" 2>/dev/null; then
warn "Daemon for '${agent_id}' already running (PID $(cat "$pidfile"))"
exit 0
fi

nohup python -c "
import sys, os, time, socket, json, signal

HOST = '${host}'
PORT = int('${port}')
AGENT_ID = '${agent_id}'
HB_INTERVAL = 10 # seconds

def main():
pidfile = '${pidfile}'
with open(pidfile, 'w') as f:
f.write(str(os.getpid()))

def cleanup(sig, frame):
try: os.remove(pidfile)
except: pass
sys.exit(0)
signal.signal(signal.SIGTERM, cleanup)
signal.signal(signal.SIGINT, cleanup)

while True:
try:
sock = socket.create_connection((HOST, PORT), timeout=5)
# Register
msg = json.dumps({'op': 'register', 'agent_id': AGENT_ID}) + '\n'
sock.sendall(msg.encode())
sock.settimeout(5)
resp = sock.recv(4096)
print(f'[pluto-daemon] Registered as {AGENT_ID}', flush=True)

# Heartbeat loop
while True:
time.sleep(HB_INTERVAL)
ping = json.dumps({'op': 'ping'}) + '\n'
sock.sendall(ping.encode())
data = sock.recv(4096)
if not data:
break
except Exception as e:
print(f'[pluto-daemon] Connection lost: {e}, reconnecting...', flush=True)
time.sleep(3)

main()
" > /tmp/pluto/daemon_${agent_id}.log 2>&1 &

local daemon_pid=$!
ok "Daemon started (PID ${daemon_pid}), log: /tmp/pluto/daemon_${agent_id}.log"
ok "To stop: kill ${daemon_pid}"
return 0
fi

if [[ "$mode" == "http" || "$mode" == "stateless" ]]; then
# Solutions 1, 2, 4: HTTP-based registration
local ttl_ms=$((ttl * 1000))
local body="{\"agent_id\":\"${agent_id}\",\"mode\":\"${mode}\",\"ttl_ms\":${ttl_ms}}"

info "Registering agent '${agent_id}' via HTTP (mode=${mode}, ttl=${ttl}s)..."

local response
response=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "${body}" \
"http://${host}:${http_port}/agents/register" 2>&1)

if [[ $? -ne 0 ]]; then
err "Failed to connect to http://${host}:${http_port}"
exit 1
fi

local status
status=$(echo "$response" | python -c "import sys,json; print(json.loads(sys.stdin.read()).get('status',''))" 2>/dev/null)

if [[ "$status" == "ok" ]]; then
local token
token=$(echo "$response" | python -c "import sys,json; print(json.loads(sys.stdin.read()).get('token',''))" 2>/dev/null)
local actual_id
actual_id=$(echo "$response" | python -c "import sys,json; print(json.loads(sys.stdin.read()).get('agent_id',''))" 2>/dev/null)
ok "Registered successfully!"
ok " Agent ID : ${actual_id}"
ok " Token : ${token}"
ok " Mode : ${mode}"
ok " TTL : ${ttl}s"
ok ""
ok "To keep alive, periodically call:"
ok " curl -X POST -H 'Content-Type: application/json' \\"
ok " -d '{\"token\":\"${token}\"}' \\"
ok " http://${host}:${http_port}/agents/heartbeat"
ok ""
ok "To poll messages:"
ok " curl http://${host}:${http_port}/agents/poll?token=${token}"
else
err "Registration failed: ${response}"
exit 1
fi
return 0
fi

# Default: TCP registration (foreground, exits when done)
info "Registering agent '${agent_id}' via TCP..."
exec python "${PYTHON_SCRIPT}" --host "$host" --port "$port" --agent-id "$agent_id" ping
}

# Check if register subcommand is being used
for arg in "$@"; do
if [[ "$arg" == "register" ]]; then
handle_register "$@"
exit $?
fi
done

# Run the Python client with all provided arguments
exec python "${PYTHON_SCRIPT}" "$@"
2 changes: 1 addition & 1 deletion VERSION.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.2.0
v0.2.1
5 changes: 4 additions & 1 deletion src_erl/include/pluto.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
-define(APP, pluto).

%% ── Version ─────────────────────────────────────────────────────────────────
-define(VERSION, "0.2.0").
-define(VERSION, "0.2.1").

%% ── Default configuration values ────────────────────────────────────────────
-define(DEFAULT_TCP_PORT, 9000).
Expand All @@ -27,6 +27,8 @@
-define(DEFAULT_HEARTBEAT_TIMEOUT_MS, 30000).
-define(DEFAULT_RECONNECT_GRACE_MS, 30000).
-define(DEFAULT_MAX_WAIT_MS, 60000).
-define(DEFAULT_HTTP_SESSION_TTL_MS, 300000). %% 5 minutes for HTTP agents
-define(DEFAULT_HTTP_SESSION_SWEEP_MS, 10000). %% sweep HTTP sessions every 10s
-define(DEFAULT_FLUSH_INTERVAL, 60000).
-define(DEFAULT_PERSISTENCE_DIR, "/tmp/pluto/state").
-define(DEFAULT_EVENT_LOG_DIR, "/tmp/pluto/events").
Expand All @@ -42,6 +44,7 @@
-define(ETS_LIVENESS, pluto_liveness). %% session_id -> last_seen_ms
-define(ETS_TASKS, pluto_tasks). %% task_id -> task record
-define(ETS_MSG_INBOX, pluto_msg_inbox). %% {agent_id, seq} -> message map
-define(ETS_HTTP_SESSIONS, pluto_http_sessions). %% token -> #http_session{}

%% ── Maximum line length for TCP reads (1 MB) ───────────────────────────────
-define(MAX_LINE_LENGTH, 1048576).
Expand Down
24 changes: 23 additions & 1 deletion src_erl/include/pluto_records.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
%% last_seen — System time (ms) of last heartbeat/message
%% custom_status— Custom agent status (e.g. <<"busy">>, <<"idle">>)
%% subscriptions— List of topic names this agent subscribes to
%% session_type — Connection type: 'tcp' | 'http' | 'stateless'
%%
-record(agent, {
agent_id :: binary(),
Expand All @@ -76,7 +77,8 @@
attributes :: map(),
last_seen :: integer(),
custom_status :: binary(),
subscriptions :: [binary()]
subscriptions :: [binary()],
session_type :: tcp | http | stateless
}).

%% ── Session record ──────────────────────────────────────────────────────────
Expand All @@ -92,4 +94,24 @@
session_pid :: pid()
}).

%% ── HTTP session record ─────────────────────────────────────────────────────
%% Stored in the ETS_HTTP_SESSIONS table, keyed by token.
%% Represents a stateless/HTTP agent session maintained via periodic heartbeats.
%%
%% token — Opaque session token returned at HTTP registration (binary)
%% agent_id — The agent_id bound to this HTTP session (binary)
%% session_id — Server-generated session identifier (binary)
%% ttl_ms — Time-to-live; session expires if no heartbeat within this window
%% last_seen — System time (ms) of last HTTP heartbeat or request
%% mode — 'http' | 'stateless'
%%
-record(http_session, {
token :: binary(),
agent_id :: binary(),
session_id :: binary(),
ttl_ms :: non_neg_integer(),
last_seen :: integer(),
mode :: http | stateless
}).

-endif. %% PLUTO_RECORDS_HRL
2 changes: 1 addition & 1 deletion src_erl/src/pluto.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, pluto, [
{description, "Pluto — Agent Coordination Server"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, [
pluto_sup,
pluto_lock_mgr,
Expand Down
2 changes: 2 additions & 0 deletions src_erl/src/pluto_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ create_ets_tables() ->
ets:new(?ETS_LIVENESS, [named_table, set, public]),
ets:new(?ETS_TASKS, [named_table, set, public]),
ets:new(?ETS_MSG_INBOX, [named_table, ordered_set, public]),
ets:new(?ETS_HTTP_SESSIONS, [named_table, set, public,
{keypos, #http_session.token}]),
ok.

%% @private Print the Pluto ASCII art banner on startup.
Expand Down
25 changes: 25 additions & 0 deletions src_erl/src/pluto_heartbeat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ handle_info(sweep, #state{sweep_ms = SweepMs, timeout_ms = TimeoutMs} = State) -
end
end, AllEntries),

%% Also sweep HTTP sessions with per-session TTL
sweep_http_sessions(Now),

erlang:send_after(SweepMs, self(), sweep),
{noreply, State};

Expand All @@ -101,3 +104,25 @@ handle_info(_Info, State) ->

terminate(_Reason, _State) ->
ok.

%%====================================================================
%% Internal
%%====================================================================

%% @private Sweep HTTP sessions that have exceeded their individual TTL.
sweep_http_sessions(Now) ->
AllHttpSessions = ets:tab2list(?ETS_HTTP_SESSIONS),
lists:foreach(fun(#http_session{token = Token, agent_id = AgentId,
session_id = SessId, ttl_ms = TtlMs,
last_seen = LastSeen}) ->
case Now - LastSeen > TtlMs of
true ->
?LOG_WARN("HTTP session ~s (agent ~s) expired (TTL ~wms)",
[SessId, AgentId, TtlMs]),
ets:delete(?ETS_HTTP_SESSIONS, Token),
ets:delete(?ETS_SESSIONS, SessId),
pluto_msg_hub:unregister_agent(AgentId);
false ->
ok
end
end, AllHttpSessions).
Loading
Loading