Skip to content

Commit

Permalink
CQ: Make dirty recovery of shared store more efficient
Browse files Browse the repository at this point in the history
This only applies to v2 because modifying this part of the v1
code is seen as too risky considering v1 will soon get removed.
  • Loading branch information
lhoguin committed May 14, 2024
1 parent 5740ba0 commit 21054d4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
20 changes: 13 additions & 7 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1127,8 +1127,11 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
empty ->
ok = gatherer:stop(Gatherer),
finished;
%% From v1 index walker. @todo Remove when no longer possible to convert from v1.
{value, {MsgId, Count}} ->
{MsgId, Count, {next, Gatherer}}
{MsgId, Count, {next, Gatherer}};
{value, MsgIds} ->
{MsgIds, {next, Gatherer}}
end.

queue_index_walker_reader(#resource{ virtual_host = VHost } = Name, Gatherer) ->
Expand All @@ -1155,27 +1158,30 @@ queue_index_walker_segment(F, Gatherer) ->
{ok, <<?MAGIC:32,?VERSION:8,
FromSeqId:64/unsigned,ToSeqId:64/unsigned,
_/bits>>} ->
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId);
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId, []);
_ ->
%% Invalid segment file. Skip.
ok
end,
ok = file:close(Fd).

queue_index_walker_segment(_, _, N, N) ->
queue_index_walker_segment(_, Gatherer, N, N, Acc) ->
%% We reached the end of the segment file.
gatherer:sync_in(Gatherer, Acc),
ok;
queue_index_walker_segment(Fd, Gatherer, N, Total) ->
queue_index_walker_segment(Fd, Gatherer, N, Total, Acc) ->
case file:read(Fd, ?ENTRY_SIZE) of
%% We found a non-ack persistent entry. Gather it.
{ok, <<1,_:7,1:1,_,1,Id:16/binary,_/bits>>} ->
gatherer:sync_in(Gatherer, {Id, 1}),
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, [Id|Acc]);
%% We found an ack, a transient entry or a non-entry. Skip it.
{ok, _} ->
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, Acc);
%% We reached the end of a partial segment file.
eof when Acc =:= [] ->
ok;
eof ->
gatherer:sync_in(Gatherer, Acc),
ok
end.

Expand Down
58 changes: 26 additions & 32 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,15 @@

-include_lib("rabbit_common/include/rabbit_msg_store.hrl").

%% We flush to disk when the write buffer gets above the max size,
%% or at an interval to make sure we don't keep the data in memory
%% too long. Confirms are sent after the data is flushed to disk.
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB.
-define(SYNC_INTERVAL, 200). %% Milliseconds.
%% We flush to disk at an interval to make sure we don't keep
%% the data in memory too long. Confirms are sent after the
%% data is flushed to disk.
-define(SYNC_INTERVAL, 200). %% Milliseconds.

-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").

-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
-define(WRITE_MODE, [write]).

-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").

%% We keep track of flying messages for writes and removes. The idea is that
%% when a remove comes in before we could process the write, we skip the
Expand Down Expand Up @@ -1575,24 +1569,22 @@ count_msg_refs(Gen, Seed, State) ->
case Gen(Seed) of
finished ->
ok;
%% @todo This is currently required by tests but can't happen otherwise?
{_MsgId, 0, Next} ->
count_msg_refs(Gen, Next, State);
{MsgId, Delta, Next} ->
ok = case index_lookup(MsgId, State) of
not_found ->
index_insert(#msg_location { msg_id = MsgId,
file = undefined,
ref_count = Delta },
State);
#msg_location { ref_count = RefCount } = StoreEntry ->
NewRefCount = RefCount + Delta,
case NewRefCount of
0 -> index_delete(MsgId, State);
_ -> index_update(StoreEntry #msg_location {
ref_count = NewRefCount },
State)
end
end,
%% This clause is kept for v1 compatibility purposes.
%% It can be removed once we no longer support converting from v1 data.
{MsgId, 1, Next} ->
%% @todo Remove index_module and avoid this element/2.
_ = ets:update_counter(element(2, State#msstate.index_state), MsgId, +1,
#msg_location{msg_id=MsgId, file=undefined, ref_count=1}),
count_msg_refs(Gen, Next, State);
{MsgIds, Next} ->
lists:foreach(fun(MsgId) ->
%% @todo Remove index_module and avoid this element/2.
ets:update_counter(element(2, State#msstate.index_state), MsgId, +1,
#msg_location{msg_id=MsgId, file=undefined, ref_count=1})
end, MsgIds),
count_msg_refs(Gen, Next, State)
end.

Expand Down Expand Up @@ -1621,15 +1613,17 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
FileName = filenum_to_name(File),
rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)",
[form_filename(Dir, FileName), length(Files)]),
{ok, Messages0, FileSize} =
%% The scan function already dealt with duplicate messages
%% within the file. We then get messages in reverse order.
{ok, Messages, FileSize} =
scan_file_for_valid_messages(Dir, FileName),
%% The scan gives us messages end-of-file first so we reverse the list
%% in case a compaction had occurred before shutdown to not have to repeat it.
Messages = lists:reverse(Messages0),
%% Valid messages are in file order so the last message is
%% the last message from the list.
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
%% We only keep the first message in the file. Duplicates (due to compaction) get ignored.
%% Fan-out may result in the same message data in multiple
%% files so we have to guard against it.
case index_lookup(MsgId, State) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
Expand All @@ -1649,7 +1643,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
[] -> case ValidMessages of
[] -> 0;
_ -> {_MsgId, TotalSize, Offset} =
hd(ValidMessages),
lists:last(ValidMessages),
Offset + TotalSize
end;
[_|_] -> FileSize
Expand Down

0 comments on commit 21054d4

Please sign in to comment.