Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CQ shared message store improvements #6090

Merged
merged 23 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
32816c0
CQ: Improve reading from shared message store
lhoguin Oct 21, 2022
4e4e6e4
CQ: Remove mechanism for closing FHC FDs in queues
lhoguin Feb 3, 2023
ba6c075
CQ: Fix a Dialyzer warning
lhoguin Feb 3, 2023
608bd7c
Tweak recovery following compaction changes
lhoguin Feb 21, 2023
06ada80
Messages were reversed so... whoops! Fixed
lhoguin Feb 21, 2023
f286bfc
Rework the top-level comment
lhoguin Feb 21, 2023
c22fc46
Cleanup todos
lhoguin Feb 23, 2023
f66be29
Remove a couple values no longer used in the state
lhoguin Feb 23, 2023
4f06544
Remove another value and related functions
lhoguin Feb 23, 2023
c031f04
Remove unused fields in client_msstate
lhoguin Feb 23, 2023
468d9f9
Remove very old comment referring to a dedup cache
lhoguin Feb 23, 2023
e35a75c
Remove another old comment that was only relevant in 2016 if ever
lhoguin Feb 23, 2023
12a63a4
Remove todo
lhoguin Feb 23, 2023
4b87ded
Remove pointless todos
lhoguin Feb 23, 2023
ef7c68a
CQ shared store: rework the flying optimisation
lhoguin Mar 10, 2023
af21e80
Fix compilation following rebase
lhoguin May 22, 2023
76ee654
Keep fd open in msg store client single reads
lhoguin May 24, 2023
8c0fd4f
Close fd open in msg store client before hibernating
lhoguin May 24, 2023
54e04a1
Set read_many threshold for shared store to 12k bytes
lhoguin May 26, 2023
9b7cbfd
CQ: Fix crash when queue is not durable
lhoguin May 26, 2023
93eccae
Cleanup todos and unnecessary comments
lhoguin May 30, 2023
7a8d110
Fix Dialyzer warning
lhoguin May 30, 2023
59259b2
CQ: Don't read message data when purging the queue
lhoguin Jun 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion deps/rabbit/src/rabbit_msg_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

-module(rabbit_msg_file).

-export([append/3, read/2, scan/4]).
-export([append/3, read/2, pread/2, pread/3, scan/4]).

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -39,6 +39,9 @@

append(FileHdl, MsgId, MsgBody)
when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
%% @todo I think we are actually writing MsgId TWICE: once in
%% the header, once in the body. Might be better to reduce
%% the size of the body...
MsgBodyBin = term_to_binary(MsgBody),
MsgBodyBinSize = size(MsgBodyBin),
Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
Expand Down Expand Up @@ -67,6 +70,44 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.

-spec pread(io_device(), position(), msg_size()) ->
rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()},
any()).

pread(FileHdl, Offset, TotalSize) ->
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
case file:pread(FileHdl, Offset, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
MsgId:?MSG_ID_SIZE_BYTES/binary,
MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
{ok, {MsgId, binary_to_term(MsgBodyBin)}};
KO -> KO
end.

-spec pread(io_device(), [{position(), msg_size()}]) ->
{ok, [msg()]} | {error, any()} | eof.

pread(FileHdl, LocNums) ->
case file:pread(FileHdl, LocNums) of
{ok, DataL} -> {ok, pread_parse(DataL)};
KO -> KO
end.

pread_parse([<<Size:?INTEGER_SIZE_BITS,
_MsgId:?MSG_ID_SIZE_BYTES/binary,
Rest0/bits>>|Tail]) ->
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
<<MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS,
Rest/bits>> = Rest0,
[binary_to_term(MsgBodyBin)|pread_parse([Rest|Tail])];
pread_parse([<<>>]) ->
[];
pread_parse([<<>>|Tail]) ->
pread_parse(Tail).

-spec scan(io_device(), file_size(), message_accumulator(A), A) ->
{'ok', A, position()}.

Expand Down
1,610 changes: 646 additions & 964 deletions deps/rabbit/src/rabbit_msg_store.erl

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion deps/rabbit/src/rabbit_msg_store_ets_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
-behaviour(rabbit_msg_store_index).

-export([new/1, recover/1,
lookup/2, insert/2, update/2, update_fields/3, delete/2,
lookup/2, select_from_file/3, select_all_from_file/2, insert/2, update/2, update_fields/3, delete/2,
delete_object/2, clean_up_temporary_reference_count_entries_without_file/1, terminate/1]).

-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
Expand Down Expand Up @@ -42,6 +42,18 @@ lookup(Key, State) ->
[Entry] -> Entry
end.

%% @todo We currently fetch all and then filter by file.
%% This might lead to too many lookups... How to best
%% optimize this? ets:select didn't seem great.
select_from_file(MsgIds, File, State) ->
All = [lookup(Id, State) || Id <- MsgIds],
[MsgLoc || MsgLoc=#msg_location{file=MsgFile} <- All, MsgFile =:= File].

%% Note that this function is not terribly efficient and should only be
%% used for compaction or similar.
select_all_from_file(File, State) ->
ets:match_object(State #state.table, #msg_location { file = File, _ = '_' }).

insert(Obj, State) ->
true = ets:insert_new(State #state.table, Obj),
ok.
Expand Down
107 changes: 64 additions & 43 deletions deps/rabbit/src/rabbit_msg_store_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@

-behaviour(gen_server2).

-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]).
-export([start_link/1, compact/2, truncate/4, delete/2, stop/1]).

-export([set_maximum_since_use/2]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/3]).

-record(state,
{ pending_no_readers,
on_action,
{ pending,
timer_ref,
msg_store_state
}).

Expand All @@ -33,22 +33,21 @@ start_link(MsgStoreState) ->
gen_server2:start_link(?MODULE, [MsgStoreState],
[{timeout, infinity}]).

-spec combine(pid(), rabbit_msg_store:file_num(),
rabbit_msg_store:file_num()) -> 'ok'.
-spec compact(pid(), rabbit_msg_store:file_num()) -> 'ok'.

combine(Server, Source, Destination) ->
gen_server2:cast(Server, {combine, Source, Destination}).
compact(Server, File) ->
gen_server2:cast(Server, {compact, File}).

-spec truncate(pid(), rabbit_msg_store:file_num(), non_neg_integer(), integer()) -> 'ok'.

truncate(Server, File, TruncateSize, ThresholdTimestamp) ->
gen_server2:cast(Server, {truncate, File, TruncateSize, ThresholdTimestamp}).

-spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'.

delete(Server, File) ->
gen_server2:cast(Server, {delete, File}).

-spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'.

no_readers(Server, File) ->
gen_server2:cast(Server, {no_readers, File}).

-spec stop(pid()) -> 'ok'.

stop(Server) ->
Expand All @@ -64,8 +63,7 @@ set_maximum_since_use(Pid, Age) ->
init([MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
{ok, #state { pending_no_readers = #{},
on_action = [],
{ok, #state { pending = #{},
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.

Expand All @@ -75,28 +73,44 @@ prioritise_cast(_Msg, _Len, _State) -> 0.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.

handle_cast({combine, Source, Destination}, State) ->
{noreply, attempt_action(combine, [Source, Destination], State), hibernate};

handle_cast({delete, File}, State) ->
{noreply, attempt_action(delete, [File], State), hibernate};

handle_cast({no_readers, File},
State = #state { pending_no_readers = Pending }) ->
{noreply, case maps:find(File, Pending) of
error ->
State;
{ok, {Action, Files}} ->
Pending1 = maps:remove(File, Pending),
attempt_action(
Action, Files,
State #state { pending_no_readers = Pending1 })
end, hibernate};
handle_cast({compact, File}, State) ->
%% Since we don't compact files that have a valid size of 0,
%% we cannot have a delete queued at the same time as we are
%% asked to compact. We can always compact.
{noreply, attempt_action(compact, [File], State), hibernate};

handle_cast({truncate, File, TruncateSize, ThresholdTimestamp}, State = #state{pending = Pending}) ->
case Pending of
%% No need to truncate if we are going to delete.
#{File := {delete, _}} ->
{noreply, State, hibernate};
%% Attempt to truncate otherwise. If a truncate was already
%% scheduled we drop it in favor of the new truncate.
_ ->
State1 = State#state{pending = maps:remove(File, Pending)},
{noreply, attempt_action(truncate, [File, TruncateSize, ThresholdTimestamp], State1), hibernate}
end;

handle_cast({delete, File}, State = #state{pending = Pending}) ->
%% We drop any pending action because deletion takes precedence over truncation.
State1 = State#state{pending = maps:remove(File, Pending)},
{noreply, attempt_action(delete, [File], State1), hibernate};

handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
{noreply, State, hibernate}.

%% Run all pending actions.
handle_info({timeout, TimerRef, do_pending},
State = #state{ pending = Pending,
timer_ref = TimerRef }) ->
State1 = State#state{ pending = #{},
timer_ref = undefined },
State2 = maps:fold(fun(_File, {Action, Args}, StateFold) ->
attempt_action(Action, Args, StateFold)
end, State1, Pending),
{noreply, State2, hibernate};

handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.

Expand All @@ -106,20 +120,27 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

attempt_action(Action, Files,
State = #state { pending_no_readers = Pending,
on_action = Thunks,
attempt_action(Action, Args,
State = #state { pending = Pending,
msg_store_state = MsgStoreState }) ->
case do_action(Action, Files, MsgStoreState) of
{ok, OkThunk} ->
State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
[OkThunk | Thunks])};
{defer, [File | _]} ->
Pending1 = maps:put(File, {Action, Files}, Pending),
State #state { pending_no_readers = Pending1 }
case do_action(Action, Args, MsgStoreState) of
ok ->
State;
defer ->
[File|_] = Args,
Pending1 = maps:put(File, {Action, Args}, Pending),
ensure_pending_timer(State #state { pending = Pending1 })
end.

do_action(combine, [Source, Destination], MsgStoreState) ->
rabbit_msg_store:combine_files(Source, Destination, MsgStoreState);
do_action(compact, [File], MsgStoreState) ->
rabbit_msg_store:compact_file(File, MsgStoreState);
do_action(truncate, [File, Size, ThresholdTimestamp], MsgStoreState) ->
rabbit_msg_store:truncate_file(File, Size, ThresholdTimestamp, MsgStoreState);
do_action(delete, [File], MsgStoreState) ->
rabbit_msg_store:delete_file(File, MsgStoreState).

ensure_pending_timer(State = #state{timer_ref = undefined}) ->
TimerRef = erlang:start_timer(5000, self(), do_pending),
State#state{timer_ref = TimerRef};
ensure_pending_timer(State) ->
State.
Loading