Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Mar 21, 2024
1 parent 60f3f6e commit d9449e4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
4 changes: 3 additions & 1 deletion deps/rabbit/src/rabbit_fifo.erl
Expand Up @@ -1172,7 +1172,9 @@ query_messages_total(State) ->
messages_total(State).

query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) ->
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
Cons = maps:fold(fun(_, ?CONSUMER_PID(P) = V, S) ->
S#{P => V}
end, #{}, Cons0),
maps:keys(maps:merge(Enqs, Cons)).


Expand Down
39 changes: 20 additions & 19 deletions deps/rabbit/test/rabbit_fifo_int_SUITE.erl
Expand Up @@ -93,8 +93,8 @@ basics(Config) ->
ConsumerTag = UId,
ok = start_cluster(ClusterName, [ServerId]),
FState0 = rabbit_fifo_client:init([ServerId]),
{ok, FState1} = rabbit_fifo_client:checkout(ConsumerTag, {simple_prefetch, 1},
#{}, FState0),
{ok, _, FState1} = rabbit_fifo_client:checkout(ConsumerTag, {simple_prefetch, 1},
#{}, FState0),

rabbit_quorum_queue:wal_force_roll_over(node()),
% create segment the segment will trigger a snapshot
Expand Down Expand Up @@ -184,7 +184,7 @@ duplicate_delivery(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init([ServerId]),
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
{ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
{ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, corr1, msg1, F1),
Fun = fun Loop(S0) ->
receive
Expand Down Expand Up @@ -219,7 +219,7 @@ usage(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init([ServerId]),
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
{ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
{ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, corr1, msg1, F1),
{ok, F3, []} = rabbit_fifo_client:enqueue(ClusterName, corr2, msg2, F2),
{_, _, _} = process_ra_events(receive_ra_events(2, 2), ClusterName, F3),
Expand Down Expand Up @@ -272,7 +272,7 @@ detects_lost_delivery(Config) ->
F000 = rabbit_fifo_client:init([ServerId]),
{ok, F00, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F000),
{_, _, F0} = process_ra_events(receive_ra_events(1, 0), ClusterName, F00),
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
{ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0),
{ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, msg2, F1),
{ok, F3, []} = rabbit_fifo_client:enqueue(ClusterName, msg3, F2),
% lose first delivery
Expand Down Expand Up @@ -301,9 +301,9 @@ returns_after_down(Config) ->
Self = self(),
_Pid = spawn(fun () ->
F = rabbit_fifo_client:init([ServerId]),
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>,
{simple_prefetch, 10},
#{}, F),
{ok, _, _} = rabbit_fifo_client:checkout(<<"tag">>,
{simple_prefetch, 10},
#{}, F),
Self ! checkout_done
end),
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
Expand Down Expand Up @@ -382,7 +382,7 @@ discard(Config) ->
_ = ra:members(ServerId),

F0 = rabbit_fifo_client:init([ServerId]),
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10},
{ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10},
#{}, F0),
{ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F1),
F3 = discard_next_delivery(ClusterName, F2, 5000),
Expand All @@ -405,8 +405,8 @@ cancel_checkout(Config) ->
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init([ServerId], 4),
{ok, F1, []} = rabbit_fifo_client:enqueue(ClusterName, m1, F0),
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10},
#{}, F1),
{ok, _, F2} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10},
#{}, F1),
{_, _, F3} = process_ra_events(receive_ra_events(1, 1), ClusterName, F2,
[], [], fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
Expand All @@ -424,7 +424,7 @@ lost_delivery(Config) ->
{_, _, F2} = process_ra_events(
receive_ra_events(1, 0), ClusterName, F1, [], [],
fun (_, S) -> S end),
{ok, F3} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F2),
{ok, _, F3} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F2),
%% drop a delivery, simulating e.g. a full distribution buffer
receive
{ra_event, _, Evt} ->
Expand All @@ -449,6 +449,7 @@ lost_delivery(Config) ->
ok.

credit_api_v1(Config) ->
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> false end),
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
Expand All @@ -458,7 +459,7 @@ credit_api_v1(Config) ->
{_, _, F3} = process_ra_events(receive_ra_events(2, 0), ClusterName, F2),
%% checkout with 0 prefetch
CTag = <<"my-tag">>,
{ok, F4} = rabbit_fifo_client:checkout(CTag, credited, #{}, F3),
{ok, _, F4} = rabbit_fifo_client:checkout(CTag, {credited, 0}, #{}, F3),
%% assert no deliveries
{_, _, F5} = process_ra_events(receive_ra_events(), ClusterName, F4, [], [],
fun
Expand Down Expand Up @@ -505,9 +506,9 @@ credit_api_v2(Config) ->
CTag = <<"my-tag">>,
DC0 = 16#ff_ff_ff_ff,
DC1 = 0, %% = DC0 + 1 using 32 bit serial number arithmetic
{ok, F4} = rabbit_fifo_client:checkout(
{ok, _, F4} = rabbit_fifo_client:checkout(
%% initial_delivery_count in consumer meta means credit API v2.
CTag, credited, #{initial_delivery_count => DC0}, F3),
CTag, {credited, DC0}, #{}, F3),
%% assert no deliveries
{_, _, F5} = process_ra_events(receive_ra_events(), ClusterName, F4, [], [],
fun
Expand Down Expand Up @@ -609,7 +610,7 @@ test_queries(Config) ->
exit(ready_timeout)
end,
F0 = rabbit_fifo_client:init([ServerId], 4),
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 1}, #{}, F0),
{ok, _, _} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 1}, #{}, F0),
{ok, {_, Ready}, _} = ra:local_query(ServerId,
fun rabbit_fifo:query_messages_ready/1),
?assertEqual(1, Ready),
Expand Down Expand Up @@ -637,8 +638,8 @@ dequeue(Config) ->
{ok, F2_, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F1b),
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F2_),

% {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F2),
{ok, _, {_, _, 0, _, msg1}, F3} = rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F2),
{ok, _, {_, _, 0, _, msg1}, F3} =
rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F2),
{ok, F4_, []} = rabbit_fifo_client:enqueue(ClusterName, msg2, F3),
{_, _, F4} = process_ra_events(receive_ra_events(1, 0), ClusterName, F4_),
{ok, _, {_, _, MsgId, _, msg2}, F5} = rabbit_fifo_client:dequeue(ClusterName, Tag, unsettled, F4),
Expand Down Expand Up @@ -698,7 +699,7 @@ receive_ra_events(Acc) ->
end.

process_ra_events(Events, ClusterName, State) ->
DeliveryFun = fun ({deliver, _, Tag, Msgs}, S) ->
DeliveryFun = fun ({deliver, Tag, _, Msgs}, S) ->
MsgIds = [element(1, M) || M <- Msgs],
{S0, _} = rabbit_fifo_client:settle(Tag, MsgIds, S),
S0
Expand Down

0 comments on commit d9449e4

Please sign in to comment.