Skip to content

Commit

Permalink
Make stream coodinator machine versioned
Browse files Browse the repository at this point in the history
In order to retain deterministic results of state machine applications
during upgrades we need to make the stream coordinator versioned such
that we only use the new logic once the stream coordinator switches to
machine version 1.
  • Loading branch information
kjnilsson committed Jan 7, 2022
1 parent e5ccf26 commit d9a232c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 14 deletions.
50 changes: 36 additions & 14 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
state_enter/2,
init_aux/1,
handle_aux/6,
tick/2]).
tick/2,
version/0,
which_module/1]).

-export([recover/0,
add_replica/2,
Expand Down Expand Up @@ -311,6 +313,11 @@ all_coord_members() ->
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
[{?MODULE, Node} || Node <- [node() | Nodes]].

version() -> 1.

which_module(_) ->
?MODULE.

init(_Conf) ->
#?MODULE{}.

Expand All @@ -320,7 +327,7 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
#?MODULE{streams = Streams0,
monitors = Monitors0} = State0) ->
Stream0 = maps:get(StreamId, Streams0, undefined),
Meta = maps:without([term, machine_version], Meta0),
Meta = maps:without([term], Meta0),
case filter_command(Meta, Cmd, Stream0) of
ok ->
Stream1 = update_stream(Meta, Cmd, Stream0),
Expand Down Expand Up @@ -1040,7 +1047,8 @@ update_stream0(#{system_time := _Ts},
%% epochs?
Stream0
end;
update_stream0(#{system_time := _Ts},
update_stream0(#{system_time := _Ts,
machine_version := MachineVersion},
{member_stopped, _StreamId,
#{node := Node,
index := Idx,
Expand Down Expand Up @@ -1089,15 +1097,15 @@ update_stream0(#{system_time := _Ts},

Members1 = Members0#{Node => Member},

Offsets = [{N, T}
|| #member{state = {stopped, E, T},
target = running,
node = N} <- maps:values(Members1),
E == Epoch],
case is_quorum(length(Nodes), length(Offsets)) of
EpochOffsets = [{N, T}
|| #member{state = {stopped, E, T},
target = running,
node = N} <- maps:values(Members1),
E == Epoch],
case is_quorum(length(Nodes), length(EpochOffsets)) of
true ->
%% select leader
NewWriterNode = select_leader(Offsets),
NewWriterNode = select_leader(MachineVersion, EpochOffsets),
NextEpoch = Epoch + 1,
Members = maps:map(
fun (N, #member{state = {stopped, E, _}} = M)
Expand Down Expand Up @@ -1527,16 +1535,30 @@ find_leader(Members) ->
{undefined, Replicas}
end.

select_leader(Offsets) ->
[{Node, _} | _] = lists:sort(fun({_, {E, Ao}}, {_, {E, Bo}}) ->
select_leader(0, EpochOffsets) ->
%% this is the version 0 faulty version of this code,
%% retained for versioning
[{Node, _} | _] = lists:sort(fun({_, {Ao, E}}, {_, {Bo, E}}) ->
Ao >= Bo;
({_, {Ae, _}}, {_, {Be, _}}) ->
({_, {_, Ae}}, {_, {_, Be}}) ->
Ae >= Be;
({_, empty}, _) ->
false;
(_, {_, empty}) ->
true
end, Offsets),
end, EpochOffsets),
Node;
select_leader(_Version, EpochOffsets) ->
[{Node, _} | _] = lists:sort(
fun({_, {Epoch, OffsetA}}, {_, {Epoch, OffsetB}}) ->
OffsetA >= OffsetB;
({_, {EpochA, _}}, {_, {EpochB, _}}) ->
EpochA >= EpochB;
({_, empty}, _) ->
false;
(_, {_, empty}) ->
true
end, EpochOffsets),
Node.

maybe_sleep({{nodedown, _}, _}) ->
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,7 @@ delete_replica_leader(_) ->

meta(N) when is_integer(N) ->
#{index => N,
machine_version => 1,
system_time => N + 1000}.

started_stream(StreamId, LeaderPid, ReplicaPids) ->
Expand Down

0 comments on commit d9a232c

Please sign in to comment.