Skip to content

Commit

Permalink
Merge pull request #448 from rabbitmq/snapshot-installed-callback
Browse files Browse the repository at this point in the history
Add ra_machine:snapshot_installed/2 callback.
  • Loading branch information
the-mikedavis authored Jun 12, 2024
2 parents 4f633ad + da873f6 commit 9cb281a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
10 changes: 8 additions & 2 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@
%% Optional: implements a lookup from version to the module implementing the
%% machine logic for that version.


-module(ra_machine).

-compile({no_auto_import, [apply/3]}).

-include("ra.hrl").


-export([init/2,
apply/4,
tick/3,
snapshot_installed/3,
state_enter/3,
overview/2,
query/3,
Expand Down Expand Up @@ -210,6 +209,7 @@
command_meta_data/0]).

-optional_callbacks([tick/2,
snapshot_installed/2,
state_enter/2,
init_aux/1,
handle_aux/5,
Expand Down Expand Up @@ -241,6 +241,8 @@

-callback tick(TimeMs :: milliseconds(), state()) -> effects().

-callback snapshot_installed(ra_snapshot:meta(), state()) -> effects().

-callback init_aux(Name :: atom()) -> AuxState :: term().

-callback handle_aux(ra_server:ra_state(),
Expand Down Expand Up @@ -300,6 +302,10 @@ apply(Mod, Metadata, Cmd, State) ->
tick(Mod, TimeMs, State) ->
?OPT_CALL(Mod:tick(TimeMs, State), []).

-spec snapshot_installed(module(), ra_snapshot:meta(), state()) -> effects().
snapshot_installed(Mod, Meta, State) ->
?OPT_CALL(Mod:snapshot_installed(Meta, State), []).

%% @doc called when the ra_server_proc enters a new state
-spec state_enter(module(), ra_server:ra_state() | eol, state()) ->
effects().
Expand Down
12 changes: 9 additions & 3 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@ handle_follower(Msg, State) ->
handle_receive_snapshot(#install_snapshot_rpc{term = Term,
meta = #{index := SnapIndex,
machine_version := SnapMacVer,
term := SnapTerm},
term := SnapTerm} = SnapMeta,
chunk_state = {Num, ChunkFlag},
data = Data},
#{cfg := #cfg{id = Id,
Expand All @@ -1358,10 +1358,10 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
SnapState, Log0),
%% if the machine version of the snapshot is higher
%% we also need to update the current effective machine configuration
EffMacMod = ra_machine:which_module(Machine, SnapMacVer),
Cfg = case SnapMacVer > CurEffMacVer of
true ->
put_counter(Cfg0, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, SnapMacVer),
EffMacMod = ra_machine:which_module(Machine, SnapMacVer),
Cfg0#cfg{effective_machine_version = SnapMacVer,
machine_versions = [{SnapIndex, SnapMacVer}
| MachineVersions],
Expand All @@ -1373,6 +1373,11 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
end,

{#{cluster := ClusterIds}, MacState} = ra_log:recover_snapshot(Log),

SnapInstalledEffs = ra_machine:snapshot_installed(EffMacMod,
SnapMeta,
MacState),

State = update_term(Term,
State0#{cfg => Cfg,
log => Log,
Expand All @@ -1383,7 +1388,8 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
machine_state => MacState}),
%% it was the last snapshot chunk so we can revert back to
%% follower status
{follower, persist_last_applied(State), [{reply, Reply} | Effs]};
{follower, persist_last_applied(State), [{reply, Reply} |
Effs ++ SnapInstalledEffs]};
next ->
Log = ra_log:set_snapshot_state(SnapState, Log0),
State = update_term(Term, State0#{log => Log}),
Expand Down
33 changes: 32 additions & 1 deletion test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,21 @@ disconnected_node_catches_up(Config) ->
<<Tag:2/binary, _/binary>> -> binary_to_atom(Tag, utf8)
end,

start_follower(DownServerNodeName, PrivDir),

DownNode = start_follower(DownServerNodeName, PrivDir),

Self = self(),
SPid = erlang:spawn(DownNode,
fun () ->
erlang:register(snapshot_installed_proc, self()),
receive
{snapshot_installed, _Meta} = Evt ->
Self ! Evt,
ok
after 10000 ->
ok
end
end),
await_condition(
fun () ->
ok == ra:restart_server(?SYS, DownServerId)
Expand All @@ -497,6 +510,16 @@ disconnected_node_catches_up(Config) ->
SI /= undefined
end, 200),

receive
{snapshot_installed, Meta} ->
ct:pal("snapshot installed receive ~p", [Meta]),
ok
after 10000 ->
erlang:exit(SPid, kill),
ct:fail("snapshot_installed not received"),
ok
end,

stop_nodes(ServerIds),
ok.

Expand Down Expand Up @@ -1265,6 +1288,14 @@ apply(#{index := _Idx}, {segment_writer_or_wal_crash_follower, _}, State) ->
apply(#{index := Idx}, _Cmd, State) ->
{State, ok, [{release_cursor, Idx, State}]}.

snapshot_installed(Meta, _State) ->
case whereis(snapshot_installed_proc) of
undefined ->
[];
Pid ->
[{send_msg, Pid, {snapshot_installed, Meta}}]
end.

node_setup(DataDir) ->
ok = ra_lib:make_dir(DataDir),
% NodeDir = filename:join(DataDir, atom_to_list(node())),
Expand Down

0 comments on commit 9cb281a

Please sign in to comment.