Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src_erl/src/pluto_msg_hub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -230,23 +230,32 @@ init([]) ->
handle_call({register, AgentId, SessionId, SessionPid, Attrs}, _From, State) ->
Policy = pluto_config:get(session_conflict_policy, ?DEFAULT_SESSION_CONFLICT),
case ets:lookup(?ETS_AGENTS, AgentId) of
[#agent{status = connected, session_type = SType}]
when (SType =:= http orelse SType =:= stateless), Policy =:= strict ->
%% Name taken by an HTTP/stateless agent — check if session is still alive
case ets:match_object(?ETS_HTTP_SESSIONS,
#http_session{agent_id = AgentId, _ = '_'}) of
[_|_] ->
%% HTTP session still active — assign unique suffix
UniqueId = make_unique_agent_id(AgentId),
do_register(UniqueId, SessionId, SessionPid, Attrs, tcp),
{reply, {ok, SessionId, UniqueId}, State};
[] ->
%% HTTP session expired — safe to take the name
do_register(AgentId, SessionId, SessionPid, Attrs, tcp),
{reply, {ok, SessionId}, State}
end;
[#agent{status = connected, session_pid = OldPid}] when Policy =:= strict ->
case is_pid(OldPid) andalso is_process_alive(OldPid) of
true ->
%% Name taken by a live agent — assign a unique suffixed name
%% Name taken by a live TCP agent — assign a unique suffixed name
UniqueId = make_unique_agent_id(AgentId),
do_register(UniqueId, SessionId, SessionPid, Attrs, tcp),
{reply, {ok, SessionId, UniqueId}, State};
false ->
do_register(AgentId, SessionId, SessionPid, Attrs, tcp),
{reply, {ok, SessionId}, State}
end;
[#agent{status = connected, session_type = SType}]
when (SType =:= http orelse SType =:= stateless), Policy =:= strict ->
%% Name taken by an HTTP/stateless agent — assign a unique suffixed name
UniqueId = make_unique_agent_id(AgentId),
do_register(UniqueId, SessionId, SessionPid, Attrs, tcp),
{reply, {ok, SessionId, UniqueId}, State};
[#agent{status = connected, session_pid = OldPid}] when Policy =:= takeover ->
case is_pid(OldPid) andalso is_process_alive(OldPid) of
true -> OldPid ! {pluto_takeover, AgentId};
Expand All @@ -255,7 +264,7 @@ handle_call({register, AgentId, SessionId, SessionPid, Attrs}, _From, State) ->
do_register(AgentId, SessionId, SessionPid, Attrs, tcp),
{reply, {ok, SessionId}, State};
[#agent{status = disconnected}] ->
%% Agent was disconnected — reconnect within grace period
%% Agent was disconnected — safe to reclaim (same agent reconnecting)
do_register(AgentId, SessionId, SessionPid, Attrs, tcp),
%% Deliver any queued inbox messages
self() ! {deliver_inbox_sync, AgentId},
Expand Down
68 changes: 67 additions & 1 deletion src_erl/test/pluto_v022_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ v022_test_() ->

%% Set status via HTTP
{"http set agent status",
fun() -> t_http_set_status(HttpPort) end}
fun() -> t_http_set_status(HttpPort) end},

%% Duplicate name prevention (cross-type)
{"http then tcp duplicate name gets suffix",
fun() -> t_http_then_tcp_dup_name(TcpPort, HttpPort) end},
{"tcp then http duplicate name gets suffix",
fun() -> t_tcp_then_http_dup_name(TcpPort, HttpPort) end}
]
end}.

Expand Down Expand Up @@ -475,6 +481,66 @@ t_http_set_status(HttpPort) ->

http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}).

%%====================================================================
%% Duplicate name prevention tests
%%====================================================================

%% HTTP agent registers first, then TCP agent tries the same name — must get a suffix
t_http_then_tcp_dup_name(TcpPort, HttpPort) ->
Agent = rand_agent(),
%% Register via HTTP first
{ok, RegResp} = http_post(HttpPort, "/agents/register",
#{<<"agent_id">> => Agent}),
Token = maps:get(<<"token">>, RegResp),
?assertEqual(Agent, maps:get(<<"agent_id">>, RegResp)),

%% Now register via TCP with the same name
{ok, Sock} = gen_tcp:connect({127,0,0,1}, TcpPort,
[binary, {active, false}, {packet, line}], 2000),
RegMsg = pluto_protocol_json:encode_line(#{
<<"op">> => <<"register">>,
<<"agent_id">> => Agent
}),
ok = gen_tcp:send(Sock, RegMsg),
{ok, RawResp} = gen_tcp:recv(Sock, 0, 5000),
{ok, TcpResp} = pluto_protocol_json:decode(string:trim(RawResp)),
?assertEqual(<<"ok">>, maps:get(<<"status">>, TcpResp)),
%% Must have received a different agent_id (suffixed)
TcpAgent = maps:get(<<"agent_id">>, TcpResp),
?assertNotEqual(Agent, TcpAgent),

%% Cleanup
gen_tcp:close(Sock),
http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}).

%% TCP agent registers first, then HTTP agent tries the same name — must get a suffix
t_tcp_then_http_dup_name(TcpPort, HttpPort) ->
Agent = rand_agent(),
%% Register via TCP first
{ok, Sock} = gen_tcp:connect({127,0,0,1}, TcpPort,
[binary, {active, false}, {packet, line}], 2000),
RegMsg = pluto_protocol_json:encode_line(#{
<<"op">> => <<"register">>,
<<"agent_id">> => Agent
}),
ok = gen_tcp:send(Sock, RegMsg),
{ok, RawResp} = gen_tcp:recv(Sock, 0, 5000),
{ok, TcpResp} = pluto_protocol_json:decode(string:trim(RawResp)),
?assertEqual(<<"ok">>, maps:get(<<"status">>, TcpResp)),
?assertEqual(Agent, maps:get(<<"agent_id">>, TcpResp)),

%% Now register via HTTP with the same name — should get a suffix
{ok, HttpResp} = http_post(HttpPort, "/agents/register",
#{<<"agent_id">> => Agent}),
?assertEqual(<<"ok">>, maps:get(<<"status">>, HttpResp)),
HttpAgent = maps:get(<<"agent_id">>, HttpResp),
?assertNotEqual(Agent, HttpAgent),

%% Cleanup
gen_tcp:close(Sock),
Token = maps:get(<<"token">>, HttpResp),
http_post(HttpPort, "/agents/unregister", #{<<"token">> => Token}).

%%====================================================================
%% HTTP helpers
%%====================================================================
Expand Down
Loading