Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 45 additions & 24 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1830,8 +1830,15 @@ compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
%% Open the file.
FileName = filenum_to_name(File),
{ok, Fd} = file:open(form_filename(Dir, FileName), [read, write, binary, raw]),
%% Load the messages.
Messages = load_and_vacuum_message_file(File, State),
%% Load the messages. It's possible to get 0 messages here;
%% that's OK. That means we have little to do as the file is
%% about to be deleted.
{Messages, _} = scan_and_vacuum_message_file(File, State),
%% Blank holes. We must do this first otherwise the file is left
%% with data that may confuse the code (for example data that looks
%% like a message, isn't a message, but spans over a real message).
%% We blank more than is likely required but better safe than sorry.
blank_holes_in_file(Fd, Messages),
%% Compact the file.
{ok, TruncateSize, IndexUpdates} = do_compact_file(Fd, 0, Messages, lists:reverse(Messages), []),
%% Sync and close the file.
Expand Down Expand Up @@ -1876,6 +1883,32 @@ compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
garbage_collect(),
ok.

%% We must special case the blanking of the beginning of the file.
blank_holes_in_file(Fd, [#msg_location{ offset = Offset }|Tail])
when Offset =/= 0 ->
Bytes = <<0:Offset/unit:8>>,
ok = file:pwrite(Fd, 0, Bytes),
blank_holes_in_file1(Fd, Tail);
blank_holes_in_file(Fd, Messages) ->
blank_holes_in_file1(Fd, Messages).

blank_holes_in_file1(Fd, [
#msg_location{ offset = OneOffset, total_size = OneSize },
#msg_location{ offset = TwoOffset } = Two
|Tail]) when OneOffset + OneSize < TwoOffset ->
Offset = OneOffset + OneSize,
Size = TwoOffset - Offset,
Bytes = <<0:Size/unit:8>>,
ok = file:pwrite(Fd, Offset, Bytes),
blank_holes_in_file1(Fd, [Two|Tail]);
%% We either have only one message left, or contiguous messages.
blank_holes_in_file1(Fd, [_|Tail]) ->
blank_holes_in_file1(Fd, Tail);
%% No need to blank the hole past the last message as we will
%% not write there (no confusion possible) and truncate afterwards.
blank_holes_in_file1(_, []) ->
ok.

%% If the message at the end fits into the hole we have found, we copy it there.
%% We will do the ets:updates after the data is synced to disk.
do_compact_file(Fd, Offset, Start = [#msg_location{ offset = StartMsgOffset }|_],
Expand Down Expand Up @@ -1962,27 +1995,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
ok
end.

load_and_vacuum_message_file(File, State = #gc_state{ dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
fun ({MsgId, TotalSize, Offset}, Acc) ->
case index_lookup(MsgId, State) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
ok = index_delete_object(Entry, State),
Acc;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
[ Entry | Acc ];
_ ->
Acc
end
end, [], Messages).

scan_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
scan_and_vacuum_message_file(File, State = #gc_state{ dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
Expand All @@ -1997,7 +2010,15 @@ scan_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
_ ->
%% Fan-out may remove the entry but also write a new
%% entry in a different file when it needs to write
%% a message and the existing reference is in a file
%% that's about to be deleted. So we explicitly accept
%% these cases and ignore this message.
#msg_location { file = OtherFile, total_size = TotalSize }
when File =/= OtherFile ->
Acc;
not_found ->
Acc
end
end, {[], 0}, Messages).
2 changes: 1 addition & 1 deletion deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
% This Source Code Form is subject to the terms of the Mozilla Public
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
Expand Down