Skip to content

Commit

Permalink
Return an error if a call is unsupported
Browse files Browse the repository at this point in the history
[Why]
Currently, if a call to a Ra server is unsupported, the Ra server will
ignore the event and the call will eventually time out. This could
happen when Ra servers sport different versions of Ra for instance.

It would be nicer if the Ra server would reply immediatly with an error
to let the caller know about the actual problem.

[How]
The Ra server could use a `reply` effect in the catch-all clause, but
that effect will crash the Ra server if the initial event is not a
`call` (i.e. it doesn't have a `From` to reply to).

This patch introduces a new effect, `maybe_reply`, that has the same
overall behavior as `reply`, but the lack of a `From` isn't fatal.

The returned error is:

    {error, {unsupported_call, Call}}
  • Loading branch information
dumbbell committed Jun 19, 2024
1 parent 7175f0f commit 99c84ca
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 17 deletions.
12 changes: 7 additions & 5 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
ra_log:effect() |
{reply, ra_reply_body()} |
{reply, term(), ra_reply_body()} |
{maybe_reply, ra_reply_body()} |
{cast, ra_server_id(), term()} |
{send_vote_requests, [{ra_server_id(),
#request_vote_rpc{} | #pre_vote_rpc{}}]} |
Expand Down Expand Up @@ -830,7 +831,7 @@ handle_leader(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
handle_leader(Msg, State) ->
log_unhandled_msg(leader, Msg, State),
{leader, State, []}.
{leader, State, [{maybe_reply, {error, {unsupported_call, Msg}}}]}.


-spec handle_candidate(ra_msg() | election_timeout, ra_server_state()) ->
Expand Down Expand Up @@ -943,7 +944,7 @@ handle_candidate(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
handle_candidate(Msg, State) ->
log_unhandled_msg(candidate, Msg, State),
{candidate, State, []}.
{candidate, State, [{maybe_reply, {error, {unsupported_call, Msg}}}]}.

-spec handle_pre_vote(ra_msg(), ra_server_state()) ->
{ra_state(), ra_server_state(), effects()}.
Expand Down Expand Up @@ -1023,7 +1024,7 @@ handle_pre_vote(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
handle_pre_vote(Msg, State) ->
log_unhandled_msg(pre_vote, Msg, State),
{pre_vote, State, []}.
{pre_vote, State, [{maybe_reply, {error, {unsupported_call, Msg}}}]}.


-spec handle_follower(ra_msg(), ra_server_state()) ->
Expand Down Expand Up @@ -1327,7 +1328,7 @@ handle_follower(force_member_change,
call_for_election(pre_vote, State, [{reply, ok} | Effects]);
handle_follower(Msg, State) ->
log_unhandled_msg(follower, Msg, State),
{follower, State, []}.
{follower, State, [{maybe_reply, {error, {unsupported_call, Msg}}}]}.

handle_receive_snapshot(#install_snapshot_rpc{term = Term,
meta = #{index := SnapIndex,
Expand Down Expand Up @@ -1416,7 +1417,8 @@ handle_receive_snapshot(Msg, State) ->
log_unhandled_msg(receive_snapshot, Msg, State),
%% drop all other events??
%% TODO: work out what else to handle
{receive_snapshot, State, []}.
{receive_snapshot, State,
[{maybe_reply, {error, {unsupported_call, Msg}}}]}.

-spec handle_await_condition(ra_msg(), ra_server_state()) ->
{ra_state(), ra_server_state(), effects()}.
Expand Down
10 changes: 9 additions & 1 deletion src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@

-export([send_rpc/3]).

-ifdef(TEST).
-export([leader_call/3,
local_call/3]).
-endif.

-define(DEFAULT_BROADCAST_TIME, 100).
-define(DEFAULT_ELECTION_MULT, 5).
-define(TICK_INTERVAL_MS, 1000).
Expand Down Expand Up @@ -1296,12 +1301,15 @@ handle_effect(_, {reply, From, Reply}, _, State, Actions) ->
% reply directly
ok = gen_statem:reply(From, Reply),
{State, Actions};
handle_effect(_, {reply, Reply}, {call, From}, State, Actions) ->
handle_effect(_, {ReplyOrMaybeReply, Reply}, {call, From}, State, Actions)
when ReplyOrMaybeReply =:= reply orelse ReplyOrMaybeReply =:= maybe_reply ->
% reply directly
ok = gen_statem:reply(From, Reply),
{State, Actions};
handle_effect(_, {reply, Reply}, EvtType, _, _) ->
exit({undefined_reply, Reply, EvtType});
handle_effect(_, {maybe_reply, _Reply}, _EvtType, State, Actions) ->
{State, Actions};
handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, Id, Term}}, _,
#state{server_state = SS0,
monitors = Monitors,
Expand Down
49 changes: 48 additions & 1 deletion test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
%% `ra_server:command_reply_mode()' type:
-dialyzer({nowarn_function, [process_command_with_unknown_reply_mode/1]}).

%% The following testcases simulate an erroneous or unsupported call that is
%% outside of the spec.
-dialyzer({nowarn_function, [unknown_leader_call/1,
unknown_local_call/1]}).

all() ->
[
{group, tests}
Expand Down Expand Up @@ -68,7 +73,9 @@ all_tests() ->
transfer_leadership_two_node,
new_nonvoter_knows_its_status,
voter_gets_promoted_consistent_leader,
voter_gets_promoted_new_leader
voter_gets_promoted_new_leader,
unknown_leader_call,
unknown_local_call
].

groups() ->
Expand Down Expand Up @@ -1160,6 +1167,46 @@ voter_gets_promoted_new_leader(Config) ->
lists:map(fun({Name, _}) -> #{Name := #{membership := voter}} = Servers end, All),
ok.

unknown_leader_call(Config) ->
[A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config),
{simple, fun erlang:'+'/2, 9}),
try
%% Query the leader and deduce a follower.
{ok, _, Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT),
[Follower | _] = Cluster -- [Leader],
ct:pal("Leader: ~0p~nFollower: ~0p", [Leader, Follower]),

Call = unknown_call,
?assertEqual(
{error, {unsupported_call, Call}},
ra_server_proc:leader_call(Leader, Call, ?DEFAULT_TIMEOUT)),
?assertEqual(
{error, {unsupported_call, Call}},
ra_server_proc:leader_call(Follower, Call, ?DEFAULT_TIMEOUT))
after
terminate_cluster(Cluster)
end.

unknown_local_call(Config) ->
[A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config),
{simple, fun erlang:'+'/2, 9}),
try
%% Query the leader and deduce a follower.
{ok, _, Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT),
[Follower | _] = Cluster -- [Leader],
ct:pal("Leader: ~0p~nFollower: ~0p", [Leader, Follower]),

Call = unknown_call,
?assertEqual(
{error, {unsupported_call, Call}},
ra_server_proc:local_call(Leader, Call, ?DEFAULT_TIMEOUT)),
?assertEqual(
{error, {unsupported_call, Call}},
ra_server_proc:local_call(Follower, Call, ?DEFAULT_TIMEOUT))
after
terminate_cluster(Cluster)
end.

get_gen_statem_status(Ref) ->
{_, _, _, Items} = sys:get_status(Ref),
proplists:get_value(raft_state, lists:last(Items)).
Expand Down
20 changes: 10 additions & 10 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2384,11 +2384,11 @@ candidate_heartbeat_reply(_Config) ->

HeartbeatReply = #heartbeat_reply{term = Term, query_index = 2},
%% Same term is ignored
{candidate, State, []}
{candidate, State, [{maybe_reply, {error, {unsupported_call, _}}}]}
= ra_server:handle_candidate({{no_peer, node()}, HeartbeatReply}, State),

%% Lower term is ignored
{candidate, State, []}
{candidate, State, [{maybe_reply, {error, {unsupported_call, _}}}]}
= ra_server:handle_candidate({{no_peer, node()}, HeartbeatReply#heartbeat_reply{term = Term - 1}}, State),

%% Higher term updates term and changes to follower
Expand Down Expand Up @@ -2439,11 +2439,11 @@ pre_vote_heartbeat_reply(_Config) ->
query_index = 2},

%% Heartbeat reply with same term is ignored
{pre_vote, State, []}
{pre_vote, State, [{maybe_reply, {error, {unsupported_call, _}}}]}
= ra_server:handle_pre_vote({{no_peer, node()}, HeartbeatReply}, State),

%% Heartbeat reply with lower term is ignored
{pre_vote, State, []}
{pre_vote, State, [{maybe_reply, {error, {unsupported_call, _}}}]}
= ra_server:handle_pre_vote(
{{no_peer, node()}, HeartbeatReply#heartbeat_reply{term = Term - 1}},
State),
Expand Down Expand Up @@ -2748,13 +2748,13 @@ receive_snapshot_heartbeat_dropped(_Config) ->
Heartbeat = #heartbeat_rpc{term = Term,
query_index = QueryIndex,
leader_id = Id},
{receive_snapshot, State, []} =
{receive_snapshot, State, [{maybe_reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(Heartbeat, State),
%% Term does not matter
{receive_snapshot, State, []} =
{receive_snapshot, State, [{maybe_reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(Heartbeat#heartbeat_rpc{term = Term + 1},
State),
{receive_snapshot, State, []} =
{receive_snapshot, State, [{maybe_reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(Heartbeat#heartbeat_rpc{term = Term - 1},
State).

Expand All @@ -2765,13 +2765,13 @@ receive_snapshot_heartbeat_reply_dropped(_config) ->

HeartbeatReply = #heartbeat_reply{term = Term,
query_index = QueryIndex},
{receive_snapshot, State, []} =
{receive_snapshot, State, [{maybe_reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(HeartbeatReply, State),
%% Term does not matter
{receive_snapshot, State, []} =
{receive_snapshot, State, [{maybe_reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(HeartbeatReply#heartbeat_reply{term = Term + 1},
State),
{receive_snapshot, State, []} =
{receive_snapshot, State, [{maybe_reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(HeartbeatReply#heartbeat_reply{term = Term - 1},
State).

Expand Down

0 comments on commit 99c84ca

Please sign in to comment.