diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 54b28f8a31d4..83ff3c8edafa 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -1172,7 +1172,9 @@ query_messages_total(State) -> messages_total(State). query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) -> - Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0), + Cons = maps:fold(fun(_, ?CONSUMER_PID(P) = V, S) -> + S#{P => V} + end, #{}, Cons0), maps:keys(maps:merge(Enqs, Cons)). diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index 1a8d4d7adc9c..9d5d3695684c 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -93,8 +93,8 @@ basics(Config) -> ConsumerTag = UId, ok = start_cluster(ClusterName, [ServerId]), FState0 = rabbit_fifo_client:init([ServerId]), - {ok, FState1} = rabbit_fifo_client:checkout(ConsumerTag, {simple_prefetch, 1}, - #{}, FState0), + {ok, _, FState1} = rabbit_fifo_client:checkout(ConsumerTag, {simple_prefetch, 1}, + #{}, FState0), rabbit_quorum_queue:wal_force_roll_over(node()), % create segment the segment will trigger a snapshot @@ -184,7 +184,7 @@ duplicate_delivery(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init([ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0), + {ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0), {ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, corr1, msg1, F1), Fun = fun Loop(S0) -> receive @@ -219,7 +219,7 @@ usage(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init([ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0), + {ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0), {ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, corr1, msg1, F1), {ok, F3, []} = rabbit_fifo_client:enqueue(ClusterName, corr2, msg2, F2), {_, _, _} = process_ra_events(receive_ra_events(2, 2), ClusterName, F3), @@ -272,7 +272,7 @@ detects_lost_delivery(Config) -> F000 = rabbit_fifo_client:init([ServerId]), {ok, F00, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F000), {_, _, F0} = process_ra_events(receive_ra_events(1, 0), ClusterName, F00), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0), + {ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0), {ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, msg2, F1), {ok, F3, []} = rabbit_fifo_client:enqueue(ClusterName, msg3, F2), % lose first delivery @@ -301,9 +301,9 @@ returns_after_down(Config) -> Self = self(), _Pid = spawn(fun () -> F = rabbit_fifo_client:init([ServerId]), - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, - {simple_prefetch, 10}, - #{}, F), + {ok, _, _} = rabbit_fifo_client:checkout(<<"tag">>, + {simple_prefetch, 10}, + #{}, F), Self ! checkout_done end), receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, @@ -382,7 +382,7 @@ discard(Config) -> _ = ra:members(ServerId), F0 = rabbit_fifo_client:init([ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, + {ok, _, F1} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F0), {ok, F2, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F1), F3 = discard_next_delivery(ClusterName, F2, 5000), @@ -405,8 +405,8 @@ cancel_checkout(Config) -> ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init([ServerId], 4), {ok, F1, []} = rabbit_fifo_client:enqueue(ClusterName, m1, F0), - {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, - #{}, F1), + {ok, _, F2} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, + #{}, F1), {_, _, F3} = process_ra_events(receive_ra_events(1, 1), ClusterName, F2, [], [], fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), @@ -424,7 +424,7 @@ lost_delivery(Config) -> {_, _, F2} = process_ra_events( receive_ra_events(1, 0), ClusterName, F1, [], [], fun (_, S) -> S end), - {ok, F3} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F2), + {ok, _, F3} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 10}, #{}, F2), %% drop a delivery, simulating e.g. a full distribution buffer receive {ra_event, _, Evt} -> @@ -449,6 +449,7 @@ lost_delivery(Config) -> ok. credit_api_v1(Config) -> + meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> false end), ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), @@ -458,7 +459,7 @@ credit_api_v1(Config) -> {_, _, F3} = process_ra_events(receive_ra_events(2, 0), ClusterName, F2), %% checkout with 0 prefetch CTag = <<"my-tag">>, - {ok, F4} = rabbit_fifo_client:checkout(CTag, credited, #{}, F3), + {ok, _, F4} = rabbit_fifo_client:checkout(CTag, {credited, 0}, #{}, F3), %% assert no deliveries {_, _, F5} = process_ra_events(receive_ra_events(), ClusterName, F4, [], [], fun @@ -505,9 +506,9 @@ credit_api_v2(Config) -> CTag = <<"my-tag">>, DC0 = 16#ff_ff_ff_ff, DC1 = 0, %% = DC0 + 1 using 32 bit serial number arithmetic - {ok, F4} = rabbit_fifo_client:checkout( + {ok, _, F4} = rabbit_fifo_client:checkout( %% initial_delivery_count in consumer meta means credit API v2. - CTag, credited, #{initial_delivery_count => DC0}, F3), + CTag, {credited, DC0}, #{}, F3), %% assert no deliveries {_, _, F5} = process_ra_events(receive_ra_events(), ClusterName, F4, [], [], fun @@ -609,7 +610,7 @@ test_queries(Config) -> exit(ready_timeout) end, F0 = rabbit_fifo_client:init([ServerId], 4), - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 1}, #{}, F0), + {ok, _, _} = rabbit_fifo_client:checkout(<<"tag">>, {simple_prefetch, 1}, #{}, F0), {ok, {_, Ready}, _} = ra:local_query(ServerId, fun rabbit_fifo:query_messages_ready/1), ?assertEqual(1, Ready), @@ -637,8 +638,8 @@ dequeue(Config) -> {ok, F2_, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F1b), {_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F2_), - % {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F2), - {ok, _, {_, _, 0, _, msg1}, F3} = rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F2), + {ok, _, {_, _, 0, _, msg1}, F3} = + rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F2), {ok, F4_, []} = rabbit_fifo_client:enqueue(ClusterName, msg2, F3), {_, _, F4} = process_ra_events(receive_ra_events(1, 0), ClusterName, F4_), {ok, _, {_, _, MsgId, _, msg2}, F5} = rabbit_fifo_client:dequeue(ClusterName, Tag, unsettled, F4), @@ -698,7 +699,7 @@ receive_ra_events(Acc) -> end. process_ra_events(Events, ClusterName, State) -> - DeliveryFun = fun ({deliver, _, Tag, Msgs}, S) -> + DeliveryFun = fun ({deliver, Tag, _, Msgs}, S) -> MsgIds = [element(1, M) || M <- Msgs], {S0, _} = rabbit_fifo_client:settle(Tag, MsgIds, S), S0