Skip to content

Commit

Permalink
First pass SAC consumer priority implementation.
Browse files Browse the repository at this point in the history
Single active consumers will be activated if they have a higher priority
than the currently active consumer. if the currently active consumer
has pending messages, no further messages will be assigned to the
consumer and the activation of the new consumer will happen once
all pending messages are settled. This is to ensure processing order.

Consumers with the same priority will internally be ordered to
favour those with credit then those that attached first.

QQ: add SAC consumer priority integration tests

Dialyzer fix

QQ: add check for ff in tests
  • Loading branch information
kjnilsson committed May 1, 2024
1 parent 1271cee commit 203b671
Show file tree
Hide file tree
Showing 6 changed files with 813 additions and 195 deletions.
247 changes: 162 additions & 85 deletions deps/rabbit/src/rabbit_fifo.erl

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions deps/rabbit/src/rabbit_fifo.hrl
Expand Up @@ -87,7 +87,8 @@
-type consumer_meta() :: #{ack => boolean(),
username => binary(),
prefetch => non_neg_integer(),
args => list()
args => list(),
priority => non_neg_integer()
% %% set if and only if credit API v2 is in use
% initial_delivery_count => rabbit_queue_type:delivery_count()
}.
Expand Down Expand Up @@ -122,7 +123,7 @@

-record(consumer,
{cfg = #consumer_cfg{},
status = up :: up | suspected_down | cancelled | waiting,
status = up :: up | suspected_down | cancelled | fading,
next_msg_id = 0 :: msg_id(),
checked_out = #{} :: #{msg_id() => msg()},
%% max number of messages that can be sent
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_quorum_queue.erl
Expand Up @@ -856,10 +856,16 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
0
end,

Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
{_Key, Value} ->
Value;
_ -> 0
end,
ConsumerMeta = #{ack => AckRequired,
prefetch => Prefetch,
args => Args,
username => ActingUser},
username => ActingUser,
priority => Priority},
{ok, _Infos, QState} = rabbit_fifo_client:checkout(ConsumerTag,
Mode, ConsumerMeta,
QState0),
Expand Down
261 changes: 197 additions & 64 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Expand Up @@ -46,68 +46,70 @@ groups() ->
{unclustered, [], [
{uncluster_size_2, [], [add_member]}
]},
{clustered, [], [
{cluster_size_2, [], [add_member_2,
add_member_not_running,
add_member_classic,
add_member_wrong_type,
add_member_already_a_member,
add_member_not_found,
delete_member_not_running,
delete_member_classic,
delete_member_wrong_type,
delete_member_queue_not_found,
delete_member,
delete_member_not_a_member,
delete_member_member_already_deleted,
node_removal_is_quorum_critical]
++ memory_tests()},
{cluster_size_3, [], [
cleanup_data_dir,
channel_handles_ra_event,
declare_during_node_down,
simple_confirm_availability_on_leader_change,
publishing_to_unavailable_queue,
confirm_availability_on_leader_change,
recover_from_single_failure,
recover_from_multiple_failures,
leadership_takeover,
delete_declare,
delete_member_during_node_down,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
consume_in_minority,
reject_after_leader_transfer,
shrink_all,
rebalance,
file_handle_reservations,
file_handle_reservations_above_limit,
node_removal_is_not_quorum_critical,
leader_locator_client_local,
leader_locator_balanced,
leader_locator_balanced_maintenance,
leader_locator_balanced_random_maintenance,
leader_locator_policy,
status,
format,
add_member_2
]
++ all_tests()},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
quorum_cluster_size_7,
node_removal_is_not_quorum_critical,
select_nodes_with_least_replicas,
select_nodes_with_least_replicas_node_down
]},
{clustered_with_partitions, [],
[
reconnect_consumer_and_publish,
reconnect_consumer_and_wait,
reconnect_consumer_and_wait_channel_down
]}
]}
{clustered, [],
[
{cluster_size_2, [], [add_member_2,
add_member_not_running,
add_member_classic,
add_member_wrong_type,
add_member_already_a_member,
add_member_not_found,
delete_member_not_running,
delete_member_classic,
delete_member_wrong_type,
delete_member_queue_not_found,
delete_member,
delete_member_not_a_member,
node_removal_is_quorum_critical]
++ memory_tests()},
{cluster_size_3, [], [
cleanup_data_dir,
channel_handles_ra_event,
declare_during_node_down,
simple_confirm_availability_on_leader_change,
publishing_to_unavailable_queue,
confirm_availability_on_leader_change,
recover_from_single_failure,
recover_from_multiple_failures,
leadership_takeover,
delete_declare,
delete_member_during_node_down,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
consume_in_minority,
reject_after_leader_transfer,
shrink_all,
rebalance,
file_handle_reservations,
file_handle_reservations_above_limit,
node_removal_is_not_quorum_critical,
leader_locator_client_local,
leader_locator_balanced,
leader_locator_balanced_maintenance,
leader_locator_balanced_random_maintenance,
leader_locator_policy,
status,
format,
add_member_2,
single_active_consumer_priority_take_over,
single_active_consumer_priority
]
++ all_tests()},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
quorum_cluster_size_7,
node_removal_is_not_quorum_critical,
select_nodes_with_least_replicas,
select_nodes_with_least_replicas_node_down
]},
{clustered_with_partitions, [],
[
reconnect_consumer_and_publish,
reconnect_consumer_and_wait,
reconnect_consumer_and_wait_channel_down
]}
]}
].

all_tests() ->
Expand Down Expand Up @@ -923,6 +925,7 @@ publish_confirm(Ch, QName, Timeout) ->
ct:pal("NOT CONFIRMED! ~ts", [QName]),
fail
after Timeout ->
flush(1),
exit(confirm_timeout)
end.

Expand Down Expand Up @@ -971,6 +974,120 @@ consume_in_minority(Config) ->
ok = rabbit_ct_broker_helpers:wait_for_async_start_node(Server2),
ok.

single_active_consumer_priority_take_over(Config) ->
check_quorum_queues_v4_compat(Config),

[Server0, Server1, _Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
QName = ?config(queue_name, Config),
Q1 = <<QName/binary, "_1">>,
RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8),
QueryFun = fun rabbit_fifo:query_single_active_consumer/1,
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-single-active-consumer">>, bool, true}],
?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)),
ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 1}]),
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
#'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}),
publish_confirm(Ch2, Q1),
%% higher priority consumer attaches
ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 3}]),

%% Q1 should still have Ch1 as consumer as it has pending messages
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query,
[RaNameQ1, QueryFun])),

%% ack the message
receive
{#'basic.deliver'{consumer_tag = <<"ch1-ctag1">>,
delivery_tag = DeliveryTag}, _} ->
amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false})
after 5000 ->
flush(1),
exit(basic_deliver_timeout)
end,

?awaitMatch({ok, {_, {value, {<<"ch2-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun]),
?DEFAULT_AWAIT),
ok.

single_active_consumer_priority(Config) ->
check_quorum_queues_v4_compat(Config),
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server2),
QName = ?config(queue_name, Config),
Q1 = <<QName/binary, "_1">>,
Q2 = <<QName/binary, "_2">>,
Q3 = <<QName/binary, "_3">>,
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-single-active-consumer">>, bool, true}],
?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)),
?assertEqual({'queue.declare_ok', Q2, 0, 0}, declare(Ch2, Q2, Args)),
?assertEqual({'queue.declare_ok', Q3, 0, 0}, declare(Ch3, Q3, Args)),

ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 3}]),
ok = subscribe(Ch1, Q2, false, <<"ch1-ctag2">>, [{"x-priority", byte, 2}]),
ok = subscribe(Ch1, Q3, false, <<"ch1-ctag3">>, [{"x-priority", byte, 1}]),


ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 1}]),
ok = subscribe(Ch2, Q2, false, <<"ch2-ctag2">>, [{"x-priority", byte, 3}]),
ok = subscribe(Ch2, Q3, false, <<"ch2-ctag3">>, [{"x-priority", byte, 2}]),

ok = subscribe(Ch3, Q1, false, <<"ch3-ctag1">>, [{"x-priority", byte, 2}]),
ok = subscribe(Ch3, Q2, false, <<"ch3-ctag2">>, [{"x-priority", byte, 1}]),
ok = subscribe(Ch3, Q3, false, <<"ch3-ctag3">>, [{"x-priority", byte, 3}]),


RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8),
RaNameQ2 = binary_to_atom(<<"%2F", "_", Q2/binary>>, utf8),
RaNameQ3 = binary_to_atom(<<"%2F", "_", Q3/binary>>, utf8),
%% assert each queue has a different consumer
QueryFun = fun rabbit_fifo:query_single_active_consumer/1,

%% Q1 should have the consumer on Ch1
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),

%% Q2 Ch2
?assertMatch({ok, {_, {value, {<<"ch2-ctag2">>, _}}}, _},
rpc:call(Server1, ra, local_query, [RaNameQ2, QueryFun])),

%% Q3 Ch3
?assertMatch({ok, {_, {value, {<<"ch3-ctag3">>, _}}}, _},
rpc:call(Server2, ra, local_query, [RaNameQ3, QueryFun])),

%% close Ch3
_ = rabbit_ct_client_helpers:close_channel(Ch3),
flush(100),

%% assert Q3 has Ch2 (priority 2) as consumer
?assertMatch({ok, {_, {value, {<<"ch2-ctag3">>, _}}}, _},
rpc:call(Server2, ra, local_query, [RaNameQ3, QueryFun])),

%% close Ch2
_ = rabbit_ct_client_helpers:close_channel(Ch2),
flush(100),

%% assert all queues as has Ch1 as consumer
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
?assertMatch({ok, {_, {value, {<<"ch1-ctag2">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ2, QueryFun])),
?assertMatch({ok, {_, {value, {<<"ch1-ctag3">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ3, QueryFun])),
ok.

reject_after_leader_transfer(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down Expand Up @@ -3649,13 +3766,20 @@ consume_empty(Ch, Queue, NoAck) ->
no_ack = NoAck})).

subscribe(Ch, Queue, NoAck) ->
subscribe(Ch, Queue, NoAck, <<"ctag">>, []).

subscribe(Ch, Queue, NoAck, Tag, Args) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
no_ack = NoAck,
consumer_tag = <<"ctag">>},
arguments = Args,
consumer_tag = Tag},
self()),
receive
#'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
#'basic.consume_ok'{consumer_tag = Tag} ->
ok
after 30000 ->
flush(100),
exit(subscribe_timeout)
end.

qos(Ch, Prefetch, Global) ->
Expand Down Expand Up @@ -3768,3 +3892,12 @@ basic_get(Ch, Q, NoAck, Attempt) ->
timer:sleep(100),
basic_get(Ch, Q, NoAck, Attempt - 1)
end.

check_quorum_queues_v4_compat(Config) ->
case rabbit_ct_broker_helpers:is_feature_flag_enabled(Config,
quorum_queues_4) of
false ->
throw({skip, "test needs feature flag quorum_queues_v4"});
true ->
ok
end.

0 comments on commit 203b671

Please sign in to comment.