Skip to content

Commit

Permalink
Merge pull request #447 from rabbitmq/leader-await-condition-bug-fixes
Browse files Browse the repository at this point in the history
Fix bug where cluster_change_permitted flag is incorrect
  • Loading branch information
kjnilsson committed Jun 7, 2024
2 parents 1bc3968 + dbc9f0d commit e17bc59
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 19 deletions.
34 changes: 23 additions & 11 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
handle_receive_snapshot/2,
handle_await_condition/2,
handle_aux/4,
handle_state_enter/2,
handle_state_enter/3,
tick/1,
log_tick/1,
overview/1,
Expand Down Expand Up @@ -1486,11 +1486,12 @@ log_tick(#{cfg := #cfg{},
State#{log => Log}.


-spec handle_state_enter(ra_state() | eol, ra_server_state()) ->
-spec handle_state_enter(ra_state() | eol, ra_state(), ra_server_state()) ->
{ra_server_state() | eol, effects()}.
handle_state_enter(RaftState, #{cfg := #cfg{effective_machine_module = MacMod},
machine_state := MacState} = State) ->
{become(RaftState, State),
handle_state_enter(RaftState, OldRaftState,
#{cfg := #cfg{effective_machine_module = MacMod},
machine_state := MacState} = State) ->
{become(RaftState, OldRaftState, State),
ra_machine:state_enter(MacMod, RaftState, MacState)}.


Expand All @@ -1499,7 +1500,8 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
log := Log,
machine_state := MacState,
aux_state := Aux,
queries_waiting_heartbeats := Queries
queries_waiting_heartbeats := Queries,
pending_consistent_queries := PendingQueries
} = State) ->
NumQueries = queue:len(Queries),
O0 = maps:with([current_term,
Expand All @@ -1519,7 +1521,8 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
O#{log => LogOverview,
aux => Aux,
machine => MacOverview,
num_waiting_queries => NumQueries}.
num_waiting_queries => NumQueries,
num_pending_queries => length(PendingQueries)}.

cfg_to_map(Cfg) ->
element(2, lists:foldl(
Expand Down Expand Up @@ -1668,15 +1671,24 @@ machine_query(QueryFun, #{cfg := #cfg{effective_machine_module = MacMod},

% Internal

become(leader, #{cluster := Cluster, log := Log0} = State) ->
become(leader, OldRaftState, #{cluster := Cluster,
cluster_change_permitted := CCP0,
log := Log0} = State) ->
Log = ra_log:release_resources(maps:size(Cluster) + 2, random, Log0),
CCP = case OldRaftState of
await_condition ->
CCP0;
_ ->
false
end,

State#{log => Log,
cluster_change_permitted => false};
become(follower, #{log := Log0} = State) ->
cluster_change_permitted => CCP};
become(follower, _, #{log := Log0} = State) ->
%% followers should only ever need a single segment open at any one
%% time
State#{log => ra_log:release_resources(1, random, Log0)};
become(_RaftState, State) ->
become(_RaftState, _, State) ->
State.

follower_catchup_cond_fun(OriginalReason) ->
Expand Down
18 changes: 10 additions & 8 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ candidate(_, tick_timeout, State0) ->
{keep_state, handle_tick_metrics(State), set_tick_timer(State, [])};
candidate({call, From}, trigger_election, State) ->
{keep_state, State, [{reply, From, ok}]};
candidate(EventType, Msg, #state{pending_commands = Pending} = State0) ->
candidate(EventType, Msg, State0) ->
case handle_candidate(Msg, State0) of
{candidate, State1, Effects} ->
{State2, Actions0} = ?HANDLE_EFFECTS(Effects, EventType, State1),
Expand All @@ -616,15 +616,11 @@ candidate(EventType, Msg, #state{pending_commands = Pending} = State0) ->
{State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
next_state(follower, State, Actions);
{leader, State1, Effects} ->
{State2, Actions0} = ?HANDLE_EFFECTS(Effects, EventType, State1),
State = State2#state{pending_commands = []},
{State, Actions0} = ?HANDLE_EFFECTS(Effects, EventType, State1),
%% reset the tick timer to avoid it triggering early after a leader
%% change
Actions = set_tick_timer(State2, Actions0),
% inject a bunch of command events to be processed when node
% becomes leader
NextEvents = [{next_event, {call, F}, Cmd} || {F, Cmd} <- Pending],
next_state(leader, State, Actions ++ NextEvents)
Actions = set_tick_timer(State, Actions0),
next_state(leader, State, Actions)
end.

pre_vote(enter, OldState, #state{leader_monitor = MRef} = State0) ->
Expand Down Expand Up @@ -1090,6 +1086,7 @@ handle_enter(RaftState, OldRaftState,
Membership = ra_server:get_membership(ServerState0),
true = ets:insert(ra_state, {Name, RaftState, Membership}),
{ServerState, Effects} = ra_server:handle_state_enter(RaftState,
OldRaftState,
ServerState0),
case RaftState == leader orelse OldRaftState == leader of
true ->
Expand Down Expand Up @@ -1743,6 +1740,11 @@ maybe_set_election_timeout(TimeoutLen, State, Actions) ->
{State#state{election_timeout_set = true},
[election_timeout_action(TimeoutLen, State) | Actions]}.

next_state(leader, #state{pending_commands = Pending} = State, Actions) ->
NextEvents = [{next_event, {call, F}, Cmd} || {F, Cmd} <- Pending],
{next_state, leader, State#state{election_timeout_set = false,
pending_commands = []},
Actions ++ NextEvents};
next_state(Next, State, Actions) ->
%% as changing states will always cancel the state timeout we need
%% to set our own state tracking to false here
Expand Down
16 changes: 16 additions & 0 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ all() ->
is_new,
command,
command_notify,
leader_enters_from_await_condition,
leader_noop_operation_enables_cluster_change,
leader_noop_increments_machine_version,
follower_machine_version,
Expand Down Expand Up @@ -1798,6 +1799,21 @@ command(_Config) ->
ra_server:handle_leader({command, Cmd}, State),
ok.

leader_enters_from_await_condition(_Config) ->
State = (base_state(3, ?FUNCTION_NAME))#{cluster_change_permitted => true},
%% when re-entering leader state from await_condition the
%% cluster change permitted value should be retained
?assertMatch({#{cluster_change_permitted := true}, _},
ra_server:handle_state_enter(leader, await_condition, State)),

?assertMatch({#{cluster_change_permitted := false}, _},
ra_server:handle_state_enter(leader, await_condition,
State#{cluster_change_permitted => false})),

?assertMatch({#{cluster_change_permitted := false}, _},
ra_server:handle_state_enter(leader, candidate, State)),
ok.

command_notify(_Config) ->
N1 = ?N1, N2 = ?N2, N3 = ?N3,
State0 = base_state(3, ?FUNCTION_NAME),
Expand Down

0 comments on commit e17bc59

Please sign in to comment.