Skip to content

Commit

Permalink
WAL: drop entries for UId that are lower than snapshot index.
Browse files Browse the repository at this point in the history
As noone needs these entries, best not write them at all.
  • Loading branch information
kjnilsson committed May 24, 2024
1 parent 3a4dacb commit acd158e
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ handle_event({segments, Tid, NewSegs},
handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind},
#?MODULE{cfg = Cfg,
first_index = FstIdx,
last_index = LstIdx,
last_written_index_term = {LastWrittenIdx, _} = LWIdx,
snapshot_state = SnapState0} = State0)
%% only update snapshot if it is newer than the last snapshot
when SnapIdx >= FstIdx ->
Expand All @@ -590,7 +592,16 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind},
Effects = [DeleteCurrentSnap | CPEffects] ++ Effects0,
%% do not set last written index here as the snapshot may
%% be for a past index
LWIdxTerm = case LastWrittenIdx > SnapIdx of
true ->
LWIdx;
false ->
Snap
end,

{State#?MODULE{first_index = SnapIdx + 1,
last_index = max(LstIdx, SnapIdx),
last_written_index_term = LWIdxTerm,
snapshot_state = SnapState}, Effects};
checkpoint ->
put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, SnapIdx),
Expand Down
12 changes: 12 additions & 0 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,11 @@ write_data({UId, _} = Id, Idx, Term, Data0, Trunc,

handle_msg({append, {UId, Pid} = Id, Idx, Term, Entry},
#state{writers = Writers} = State0) ->
SnapIdx = snap_idx(UId),
case maps:find(UId, Writers) of
_ when Idx =< SnapIdx ->
%% a snapshot already exists that is higher - just drop the write
State0#state{writers = Writers#{UId => {in_seq, SnapIdx}}};
{ok, {_, PrevIdx}} when Idx =< PrevIdx + 1 ->
write_data(Id, Idx, Term, Entry, false, State0);
error ->
Expand Down Expand Up @@ -1004,3 +1008,11 @@ table_start(false, Idx, TblStart) ->
min(TblStart, Idx);
table_start(true, Idx, _TblStart) ->
Idx.

snap_idx(ServerUId) ->
case ets:lookup(ra_log_snapshot_state, ServerUId) of
[{_, SnapIdx}] ->
SnapIdx;
[] ->
-1
end.
25 changes: 24 additions & 1 deletion test/ra_log_wal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ all_tests() ->
recover_with_last_entry_corruption_pre_allocate,
checksum_failure_in_middle_of_file_should_fail,
recover_with_partial_last_entry,
sys_get_status
sys_get_status,
drop_writes_if_snapshot_has_higher_index
].

groups() ->
Expand Down Expand Up @@ -108,6 +109,7 @@ init_per_testcase(TestCase, Config) ->
max_size_bytes => ?MAX_SIZE_BYTES},
_ = ets:new(ra_open_file_metrics, [named_table, public, {write_concurrency, true}]),
_ = ets:new(ra_io_metrics, [named_table, public, {write_concurrency, true}]),
_ = ets:new(ra_log_snapshot_state, [named_table, public, {write_concurrency, true}]),
[{ra_log_ets, Ets},
{writer_id, {UId, self()}},
{test_case, TestCase},
Expand Down Expand Up @@ -872,6 +874,27 @@ checksum_failure_in_middle_of_file_should_fail(Config) ->
meck:unload(),
ok.

drop_writes_if_snapshot_has_higher_index(Config) ->
ok = logger:set_primary_config(level, all),
Conf = ?config(wal_conf, Config),
{UId, _} = WriterId = ?config(writer_id, Config),
{ok, Pid} = ra_log_wal:start_link(Conf),
{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"),
{12, 1, "value"} = await_written(WriterId, {12, 12, 1}),
{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 13, 1, "value2"),
{13, 1, "value2"} = await_written(WriterId, {13, 13, 1}),

ets:insert(ra_log_snapshot_state, {UId, 20}),
{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 14, 1, "value2"),
timer:sleep(500),

undefined = mem_tbl_read(UId, 14),
ra_lib:dump(ets:tab2list(ra_log_open_mem_tables)),
proc_lib:stop(Pid),
[{_, _, _, Tid}] = ets:lookup(ra_log_open_mem_tables, UId),
?assert(not ets:info(Tid, compressed)),
ok.

empty_mailbox() ->
receive
_ ->
Expand Down

0 comments on commit acd158e

Please sign in to comment.