diff --git a/src/ra_log.erl b/src/ra_log.erl index b4a2bf0d..4b340886 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -566,6 +566,8 @@ handle_event({segments, Tid, NewSegs}, handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind}, #?MODULE{cfg = Cfg, first_index = FstIdx, + last_index = LstIdx, + last_written_index_term = {LastWrittenIdx, _} = LWIdx, snapshot_state = SnapState0} = State0) %% only update snapshot if it is newer than the last snapshot when SnapIdx >= FstIdx -> @@ -590,7 +592,16 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind}, Effects = [DeleteCurrentSnap | CPEffects] ++ Effects0, %% do not set last written index here as the snapshot may %% be for a past index + LWIdxTerm = case LastWrittenIdx > SnapIdx of + true -> + LWIdx; + false -> + Snap + end, + {State#?MODULE{first_index = SnapIdx + 1, + last_index = max(LstIdx, SnapIdx), + last_written_index_term = LWIdxTerm, snapshot_state = SnapState}, Effects}; checkpoint -> put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, SnapIdx), diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index 3affd577..7ae8b863 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -454,7 +454,11 @@ write_data({UId, _} = Id, Idx, Term, Data0, Trunc, handle_msg({append, {UId, Pid} = Id, Idx, Term, Entry}, #state{writers = Writers} = State0) -> + SnapIdx = snap_idx(UId), case maps:find(UId, Writers) of + _ when Idx =< SnapIdx -> + %% a snapshot already exists that is higher - just drop the write + State0#state{writers = Writers#{UId => {in_seq, SnapIdx}}}; {ok, {_, PrevIdx}} when Idx =< PrevIdx + 1 -> write_data(Id, Idx, Term, Entry, false, State0); error -> @@ -1004,3 +1008,11 @@ table_start(false, Idx, TblStart) -> min(TblStart, Idx); table_start(true, Idx, _TblStart) -> Idx. + +snap_idx(ServerUId) -> + case ets:lookup(ra_log_snapshot_state, ServerUId) of + [{_, SnapIdx}] -> + SnapIdx; + [] -> + -1 + end. diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index 86ce5481..c00c14e1 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -47,7 +47,8 @@ all_tests() -> recover_with_last_entry_corruption_pre_allocate, checksum_failure_in_middle_of_file_should_fail, recover_with_partial_last_entry, - sys_get_status + sys_get_status, + drop_writes_if_snapshot_has_higher_index ]. groups() -> @@ -108,6 +109,7 @@ init_per_testcase(TestCase, Config) -> max_size_bytes => ?MAX_SIZE_BYTES}, _ = ets:new(ra_open_file_metrics, [named_table, public, {write_concurrency, true}]), _ = ets:new(ra_io_metrics, [named_table, public, {write_concurrency, true}]), + _ = ets:new(ra_log_snapshot_state, [named_table, public, {write_concurrency, true}]), [{ra_log_ets, Ets}, {writer_id, {UId, self()}}, {test_case, TestCase}, @@ -872,6 +874,27 @@ checksum_failure_in_middle_of_file_should_fail(Config) -> meck:unload(), ok. +drop_writes_if_snapshot_has_higher_index(Config) -> + ok = logger:set_primary_config(level, all), + Conf = ?config(wal_conf, Config), + {UId, _} = WriterId = ?config(writer_id, Config), + {ok, Pid} = ra_log_wal:start_link(Conf), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"), + {12, 1, "value"} = await_written(WriterId, {12, 12, 1}), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 13, 1, "value2"), + {13, 1, "value2"} = await_written(WriterId, {13, 13, 1}), + + ets:insert(ra_log_snapshot_state, {UId, 20}), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 14, 1, "value2"), + timer:sleep(500), + + undefined = mem_tbl_read(UId, 14), + ra_lib:dump(ets:tab2list(ra_log_open_mem_tables)), + proc_lib:stop(Pid), + [{_, _, _, Tid}] = ets:lookup(ra_log_open_mem_tables, UId), + ?assert(not ets:info(Tid, compressed)), + ok. + empty_mailbox() -> receive _ ->