Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
sync/1, needs_sync/1, flush/1,
bounds/2, next_segment_boundary/1]).

%% Only used by tests
-export([bounds/1]).

%% Used to upgrade/downgrade from/to the v1 index.
-export([init_for_conversion/3]).
-export([init_args/1]).
Expand Down Expand Up @@ -482,7 +479,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
{LoSeqId, HiSeqId, _} = rabbit_queue_index:bounds(V1State),
%% When resuming after a crash we need to double check the messages that are both
%% in the v1 and v2 index (effectively the messages below the upper bound of the
%% v1 index that are about to be written to it).
%% v2 index that are about to be written to it).
{_, V2HiSeqId, _} = bounds(State0, undefined),
SkipFun = fun
(SeqId, FunState0) when SeqId < V2HiSeqId ->
Expand Down Expand Up @@ -1191,13 +1188,6 @@ flush_pre_publish_cache(TargetRamCount, State) ->
%% the test suite to pass. This can probably be made more accurate
%% in the future.

%% `bounds/1` is only used by tests
-spec bounds(State) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
bounds(State) ->
bounds(State, undefined).

-spec bounds(State, non_neg_integer() | undefined) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
Expand Down
20 changes: 11 additions & 9 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -801,33 +801,35 @@ bq_queue_index1(_Config) ->
TwoSegs = SegmentSize + SegmentSize,
MostOfASegment = trunc(SegmentSize*0.75),
SeqIdsA = lists:seq(0, MostOfASegment-1),
NextSeqIdA = MostOfASegment,
SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment),
NextSeqIdB = 2 * MostOfASegment + 1,
SeqIdsC = lists:seq(0, trunc(SegmentSize/2)),
SeqIdsD = lists:seq(0, SegmentSize*4),

VerifyReadWithPublishedFun = fun verify_read_with_published_v2/3,

with_empty_test_queue(
fun (Qi0, QName) ->
{0, 0, Qi1} = IndexMod:bounds(Qi0),
{0, 0, Qi1} = IndexMod:bounds(Qi0, undefined),
{Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
{0, SegmentSize, Qi3} = IndexMod:bounds(Qi2),
{0, SegmentSize, Qi3} = IndexMod:bounds(Qi2, NextSeqIdA),
{ReadA, Qi4} = IndexMod:read(0, SegmentSize, Qi3),
ok = VerifyReadWithPublishedFun(false, ReadA,
lists:reverse(SeqIdsMsgIdsA)),
%% should get length back as 0, as all the msgs were transient
{0, 0, Qi6} = restart_test_queue(Qi4, QName),
{0, 0, Qi7} = IndexMod:bounds(Qi6),
{NextSeqIdA, NextSeqIdA, Qi7} = IndexMod:bounds(Qi6, NextSeqIdA),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = IndexMod:bounds(Qi8),
{0, TwoSegs, Qi9} = IndexMod:bounds(Qi8, NextSeqIdB),
{ReadB, Qi10} = IndexMod:read(0, SegmentSize, Qi9),
ok = VerifyReadWithPublishedFun(true, ReadB,
lists:reverse(SeqIdsMsgIdsB)),
%% should get length back as MostOfASegment
LenB = length(SeqIdsB),
BytesB = LenB * 10,
{LenB, BytesB, Qi12} = restart_test_queue(Qi10, QName),
{0, TwoSegs, Qi13} = IndexMod:bounds(Qi12),
{0, TwoSegs, Qi13} = IndexMod:bounds(Qi12, NextSeqIdB),
Qi15 = case IndexMod of
rabbit_queue_index ->
Qi14 = IndexMod:deliver(SeqIdsB, Qi13),
Expand All @@ -841,7 +843,7 @@ bq_queue_index1(_Config) ->
{_DeletedSegments, Qi16} = IndexMod:ack(SeqIdsB, Qi15),
Qi17 = IndexMod:flush(Qi16),
%% Everything will have gone now because #pubs == #acks
{0, 0, Qi18} = IndexMod:bounds(Qi17),
{NextSeqIdB, NextSeqIdB, Qi18} = IndexMod:bounds(Qi17, NextSeqIdB),
%% should get length back as 0 because all persistent
%% msgs have been acked
{0, 0, Qi19} = restart_test_queue(Qi18, QName),
Expand Down Expand Up @@ -996,7 +998,7 @@ v2_delete_segment_file_completely_acked1(_Config) ->
%% Publish a full segment file.
{Qi1, SeqIdsMsgIds} = queue_index_publish(SeqIds, true, Qi0),
SegmentSize = length(SeqIdsMsgIds),
{0, SegmentSize, Qi2} = IndexMod:bounds(Qi1),
{0, SegmentSize, Qi2} = IndexMod:bounds(Qi1, undefined),
%% Confirm that the file exists on disk.
Path = IndexMod:segment_file(0, Qi2),
true = filelib:is_file(Path),
Expand Down Expand Up @@ -1024,7 +1026,7 @@ v2_delete_segment_file_partially_acked1(_Config) ->
%% Publish a partial segment file.
{Qi1, SeqIdsMsgIds} = queue_index_publish(SeqIds, true, Qi0),
SeqIdsLen = length(SeqIdsMsgIds),
{0, SegmentSize, Qi2} = IndexMod:bounds(Qi1),
{0, SegmentSize, Qi2} = IndexMod:bounds(Qi1, undefined),
%% Confirm that the file exists on disk.
Path = IndexMod:segment_file(0, Qi2),
true = filelib:is_file(Path),
Expand Down Expand Up @@ -1054,7 +1056,7 @@ v2_delete_segment_file_partially_acked_with_holes1(_Config) ->
{Qi1, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, true, Qi0),
{Qi2, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi1),
SeqIdsLen = length(SeqIdsMsgIdsA) + length(SeqIdsMsgIdsB),
{0, SegmentSize, Qi3} = IndexMod:bounds(Qi2),
{0, SegmentSize, Qi3} = IndexMod:bounds(Qi2, undefined),
%% Confirm that the file exists on disk.
Path = IndexMod:segment_file(0, Qi3),
true = filelib:is_file(Path),
Expand Down
Loading