Skip to content

Commit

Permalink
More test refactoring and new API fixes
Browse files Browse the repository at this point in the history
rabbit_fifo_prop_SUITE refactoring and other fixes.

fixss

bzl

bzl

fixes
  • Loading branch information
kjnilsson committed May 1, 2024
1 parent 3425233 commit 1271cee
Show file tree
Hide file tree
Showing 13 changed files with 818 additions and 1,139 deletions.
1 change: 1 addition & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ rabbitmq_suite(
deps = [
"//deps/rabbit_common:erlang_app",
"@proper//:erlang_app",
"@meck//:erlang_app",
"@ra//:erlang_app",
],
)
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
hdrs = ["src/rabbit_fifo.hrl"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
deps = ["//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
)
erlang_bytecode(
name = "rabbit_fifo_dlx_SUITE_beam_files",
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,6 @@
{quorum_queues_v4,
#{desc => "Unlocks QQ v4 goodies",
stability => stable,
depends_on => [quorum_queue]
depends_on => [quorum_queue,
credit_api_v2]
}}).
159 changes: 90 additions & 69 deletions deps/rabbit/src/rabbit_fifo.erl

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
{simple_prefetch, MaxCredit :: non_neg_integer()}.
%% determines how credit is replenished

-type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
credited,
simple_prefetch} |
-type checkout_spec() :: {once | auto,
Num :: non_neg_integer(),
credited | simple_prefetch} |

{dequeue, settled | unsettled} |
cancel |
Expand Down
37 changes: 27 additions & 10 deletions deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
-define(SOFT_LIMIT, 32).
-define(TIMER_TIME, 10000).
-define(COMMAND_TIMEOUT, 30000).
-define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra

-type seq() :: non_neg_integer().

Expand Down Expand Up @@ -118,6 +119,9 @@ enqueue(QName, Correlation, Msg,
cfg = #cfg{servers = Servers,
timeout = Timeout}} = State0) ->
%% the first publish, register and enqueuer for this process.
%% TODO: we _only_ need to pre-register an enqueuer to discover if the
%% queue overflow is `reject_publish` and the queue can accept new messages
%% if the queue does not have `reject_publish` set we can skip this step
Reg = rabbit_fifo:make_register_enqueuer(self()),
case ra:process_command(Servers, Reg, Timeout) of
{ok, reject_publish, Leader} ->
Expand Down Expand Up @@ -335,19 +339,32 @@ discard(ConsumerTag, [_|_] = MsgIds,
state()) ->
{ok, ConsumerInfos :: map(), state()} |
{error | timeout, term()}.
checkout(ConsumerTag, CreditMode, Meta,
checkout(ConsumerTag, CreditMode, #{} = Meta,
#state{consumers = CDels0} = State0)
when is_binary(ConsumerTag) ->
when is_binary(ConsumerTag) andalso
is_tuple(CreditMode) ->
Servers = sorted_servers(State0),
ConsumerId = consumer_id(ConsumerTag),
NumUnsettled = case CreditMode of
credited -> 0;
Spec = case rabbit_fifo:is_v4() of
true ->
case CreditMode of
{simple_prefetch, 0} ->
{auto, {simple_prefetch,
?UNLIMITED_PREFETCH_COUNT}};
_ ->
{auto, CreditMode}
end;
false ->
case CreditMode of
{credited, _} ->
{auto, 0, credited};
{simple_prefetch, 0} ->
{auto, ?UNLIMITED_PREFETCH_COUNT, simple_prefetch};
{simple_prefetch, Num} ->
Num
end,
Cmd = rabbit_fifo:make_checkout(ConsumerId,
{auto, NumUnsettled, CreditMode},
Meta),
{auto, Num, simple_prefetch}
end
end,
Cmd = rabbit_fifo:make_checkout(ConsumerId, Spec, Meta),
%% ???
Ack = maps:get(ack, Meta, true),

Expand All @@ -369,7 +386,7 @@ checkout(ConsumerTag, CreditMode, Meta,
NextMsgId - 1
end
end,
DeliveryCount = case maps:is_key(initial_delivery_count, Meta) of
DeliveryCount = case rabbit_fifo:is_v4() of
true -> credit_api_v2;
false -> {credit_api_v1, 0}
end,
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
-opaque state() :: #?STATE{}.

%% Delete atom 'credit_api_v1' when feature flag credit_api_v2 becomes required.
-type consume_mode() :: {simple_prefetch, non_neg_integer()} |
-type consume_mode() :: {simple_prefetch, Prefetch :: non_neg_integer()} |
{credited, Initial :: delivery_count() | credit_api_v1}.
-type consume_spec() :: #{no_ack := boolean(),
channel_pid := pid(),
Expand Down
42 changes: 16 additions & 26 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -849,29 +849,19 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
ConsumerTag = quorum_ctag(ConsumerTag0),
%% consumer info is used to describe the consumer properties
AckRequired = not NoAck,
{CreditMode, DeclaredPrefetch, ConsumerMeta0} =
case Mode of
{credited, C} ->
Meta = if C =:= credit_api_v1 ->
#{};
is_integer(C) ->
#{initial_delivery_count => C}
end,
{credited, 0, Meta};
{simple_prefetch = M, Declared} ->
Effective = case Declared of
0 -> ?UNLIMITED_PREFETCH_COUNT;
_ -> Declared
end,
{{M, Effective}, Declared, #{}}
end,
ConsumerMeta = maps:merge(ConsumerMeta0,
#{ack => AckRequired,
prefetch => DeclaredPrefetch,
args => Args,
username => ActingUser}),
Prefetch = case Mode of
{simple_prefetch, Declared} ->
Declared;
_ ->
0
end,

ConsumerMeta = #{ack => AckRequired,
prefetch => Prefetch,
args => Args,
username => ActingUser},
{ok, _Infos, QState} = rabbit_fifo_client:checkout(ConsumerTag,
CreditMode, ConsumerMeta,
Mode, ConsumerMeta,
QState0),
case single_active_consumer_on(Q) of
true ->
Expand All @@ -887,10 +877,10 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName,
DeclaredPrefetch, ActivityStatus == single_active, %% Active
Prefetch, ActivityStatus == single_active, %% Active
ActivityStatus, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, DeclaredPrefetch,
AckRequired, QName, Prefetch,
Args, none, ActingUser),
{ok, QState};
{error, Error} ->
Expand All @@ -902,10 +892,10 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName,
DeclaredPrefetch, true, %% Active
Prefetch, true, %% Active
up, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, DeclaredPrefetch,
AckRequired, QName, Prefetch,
Args, none, ActingUser),
{ok, QState}
end.
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/test/amqp_credit_api_v2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ credit_api_v2(Config) ->

?assertEqual(ok,
rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FUNCTION_NAME)),
?assertEqual(ok,
rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queues_v4)),
flush(enabled_feature_flag),

%% Consume with credit API v2
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3306,12 +3306,14 @@ cancel_consumer_gh_3729(Config) ->
ct:fail("basic.cancel_ok timeout")
end,

D = #'queue.declare'{queue = QQ, passive = true, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
D = #'queue.declare'{queue = QQ, passive = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]},

F = fun() ->
#'queue.declare_ok'{queue = QQ,
message_count = MC,
consumer_count = CC} = amqp_channel:call(Ch, D),
ct:pal("Mc ~b CC ~b", [MC, CC]),
MC =:= 1 andalso CC =:= 0
end,
rabbit_ct_helpers:await_condition(F, 30000),
Expand Down

0 comments on commit 1271cee

Please sign in to comment.