Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2601,14 +2601,22 @@ maybe_deltas_to_betas(DelsAndAcksFun,
count = DeltaCount,
transient = Transient,
end_seq_id = DeltaSeqIdEnd } = Delta,
%% For v1 we always want to read messages up to the next segment boundary.
%% This is because v1 is not optimised for multiple reads from the same
%% segment: every time we read messages from a segment it has to read
%% and parse the entire segment from disk, filtering the messages we
%% requested afterwards.
%%
%% For v2 we want to limit the number of messages read at once to lower
%% the memory footprint. We use the consume rate to determine how many
%% messages we read.
DeltaSeqLimit = case Version of
1 -> DeltaSeqIdEnd;
2 -> DeltaSeqId + MemoryLimit
end,
DeltaSeqId1 =
lists:min([IndexMod:next_segment_boundary(DeltaSeqId),
%% We must limit the number of messages read at once
%% otherwise the queue will attempt to read up to segment_entry_count()
%% messages from the index each time. The value is determined
%% using the consuming rate.
DeltaSeqId + MemoryLimit,
DeltaSeqIdEnd]),
DeltaSeqLimit, DeltaSeqIdEnd]),
{List0, IndexState1} = IndexMod:read(DeltaSeqId, DeltaSeqId1, IndexState),
{List, StoreState2} = case Version of
1 -> {List0, StoreState};
Expand Down