From aa95ae9b2049f4888c16e4216a4ce26843274a78 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 26 Mar 2025 11:17:29 +0100 Subject: [PATCH 1/2] Fix crash when consuming from unavailable quorum queue Prior to this commit, when a client consumed from an unavailable quorum queue, the following crash occurred: ``` {badmatch,{error,noproc}} [{rabbit_quorum_queue,consume,3,[{file,\"rabbit_quorum_queue.erl\"},{line,993}]} ``` This commit fixes this bug by returning any error when registering a quorum queue consumer to rabbit_queue_type. This commit also refactors errors returned by rabbit_queue_type:consume/3 to simplify and ensure seperation of concerns. For example prior to this commit, the channel did error formatting specifically for consuming from streams. It's better if the channel is unaware of what queue type it consumes from and have each queue type implementation format their own errors. (cherry picked from commit ef1a595a134565aec01fa39454dd6226b15c3d59) --- deps/rabbit/src/rabbit_amqp_session.erl | 7 +- deps/rabbit/src/rabbit_amqqueue.erl | 3 +- deps/rabbit/src/rabbit_channel.erl | 80 +++++++---------- deps/rabbit/src/rabbit_classic_queue.erl | 8 +- deps/rabbit/src/rabbit_queue_type.erl | 8 +- deps/rabbit/src/rabbit_quorum_queue.erl | 86 +++++++++++-------- deps/rabbit/src/rabbit_stream_queue.erl | 24 ++++-- deps/rabbit/test/amqp_client_SUITE.erl | 52 ++++++++++- deps/rabbit/test/quorum_queue_SUITE.erl | 40 +++++++-- .../src/rabbit_mqtt_processor.erl | 7 +- 10 files changed, 190 insertions(+), 125 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index b31093dcceb6..d72a9666fe4f 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1494,12 +1494,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, topic_permission_cache = TopicPermCache}, rabbit_global_counters:consumer_created(?PROTOCOL), {ok, [A], State1}; - {error, Reason} -> - protocol_error( - ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - "Consuming from ~s failed: ~tp", - [rabbit_misc:rs(QName), Reason]); - {protocol_error, _Type, Reason, Args} -> + {error, _Type, Reason, Args} -> protocol_error( ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Reason, Args) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 93e9d5c2f0b1..b6e9ede763f7 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -1816,8 +1816,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates) -> rabbit_framing:amqp_table(), any(), rabbit_types:username(), rabbit_queue_type:state()) -> {ok, rabbit_queue_type:state()} | - {error, term()} | - {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. + {error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}. basic_consume(Q, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser, QStates) -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 0b913c406287..8ad4971d5377 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1354,39 +1354,23 @@ handle_method(#'basic.consume'{queue = QueueNameBin, CurrentConsumers = maps:size(ConsumerMapping), case maps:find(ConsumerTag, ConsumerMapping) of error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity' - rabbit_misc:protocol_error( - not_allowed, "reached maximum (~B) of consumers per channel", [MaxConsumers]); + rabbit_misc:protocol_error( + not_allowed, + "reached maximum (~B) of consumers per channel", + [MaxConsumers]); error -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User, AuthzContext), - ActualConsumerTag = - case ConsumerTag of - <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), - "amq.ctag"); - Other -> Other - end, - case basic_consume( - QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, - ExclusiveConsume, Args, NoWait, State) of - {ok, State1} -> - {noreply, State1}; - {error, exclusive_consume_unavailable} -> - rabbit_misc:protocol_error( - access_refused, "~ts in exclusive use", - [rabbit_misc:rs(QueueName)]); - {error, global_qos_not_supported_for_queue_type} -> - rabbit_misc:protocol_error( - not_implemented, "~ts does not support global qos", - [rabbit_misc:rs(QueueName)]); - {error, timeout} -> - rabbit_misc:protocol_error( - internal_error, "~ts timeout occurred during consume operation", - [rabbit_misc:rs(QueueName)]); - {error, no_local_stream_replica_available} -> - rabbit_misc:protocol_error( - resource_error, "~ts does not have a running local replica", - [rabbit_misc:rs(QueueName)]) - end; + ActualTag = case ConsumerTag of + <<>> -> + rabbit_guid:binary( + rabbit_guid:gen_secure(), "amq.ctag"); + _ -> + ConsumerTag + end, + basic_consume( + QueueName, NoAck, ConsumerPrefetch, ActualTag, + ExclusiveConsume, Args, NoWait, State); {ok, _} -> %% Attempted reuse of consumer tag. rabbit_misc:protocol_error( @@ -1685,11 +1669,11 @@ handle_method(_MethodRecord, _Content, _State) -> %% for why. basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, ExclusiveConsume, Args, NoWait, - State = #ch{cfg = #conf{conn_pid = ConnPid, - user = #user{username = Username}}, - limiter = Limiter, - consumer_mapping = ConsumerMapping, - queue_states = QueueStates0}) -> + State0 = #ch{cfg = #conf{conn_pid = ConnPid, + user = #user{username = Username}}, + limiter = Limiter, + consumer_mapping = ConsumerMapping, + queue_states = QueueStates0}) -> case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> @@ -1710,22 +1694,16 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, ActualConsumerTag, {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}}, ConsumerMapping), - - State1 = State#ch{consumer_mapping = CM1, - queue_states = QueueStates}, - {ok, case NoWait of - true -> consumer_monitor(ActualConsumerTag, State1); - false -> State1 - end}; - {{error, exclusive_consume_unavailable} = E, _Q} -> - E; - {{error, global_qos_not_supported_for_queue_type} = E, _Q} -> - E; - {{error, no_local_stream_replica_available} = E, _Q} -> - E; - {{error, timeout} = E, _Q} -> - E; - {{protocol_error, Type, Reason, ReasonArgs}, _Q} -> + State1 = State0#ch{consumer_mapping = CM1, + queue_states = QueueStates}, + State = case NoWait of + true -> + consumer_monitor(ActualConsumerTag, State1); + false -> + State1 + end, + {noreply, State}; + {{error, Type, Reason, ReasonArgs}, _Q} -> rabbit_misc:protocol_error(Type, Reason, ReasonArgs) end. diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 2732e9819081..5c79b6804615 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -297,8 +297,12 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) -> %% TODO: track pids as they change State = ensure_monitor(QPid, QRef, State0), {ok, State#?STATE{pid = QPid}}; - Err -> - Err + {error, exclusive_consume_unavailable} -> + {error, access_refused, "~ts in exclusive use", + [rabbit_misc:rs(QRef)]}; + {error, Reason} -> + {error, internal_error, "failed consuming from classic ~ts: ~tp", + [rabbit_misc:rs(QRef), Reason]} end. %% Delete this function when feature flag rabbitmq_4.0.0 becomes required. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 498db95dc88d..709e7edc8386 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -211,8 +211,7 @@ consume_spec(), queue_state()) -> {ok, queue_state(), actions()} | - {error, term()} | - {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. + {error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}. -callback cancel(amqqueue:amqqueue(), cancel_spec(), @@ -516,15 +515,14 @@ new(Q, State) when ?is_amqqueue(Q) -> -spec consume(amqqueue:amqqueue(), consume_spec(), state()) -> {ok, state()} | - {error, term()} | - {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. + {error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}. consume(Q, Spec, State) -> #ctx{state = CtxState0} = Ctx = get_ctx(Q, State), Mod = amqqueue:get_type(Q), case Mod:consume(Q, Spec, CtxState0) of {ok, CtxState} -> {ok, set_ctx(Q, Ctx#ctx{state = CtxState}, State)}; - Err -> + Err = {error, _Type, _Fmt, _FmtArgs} -> Err end. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 7056edab2485..d39a6e8f253f 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -971,10 +971,12 @@ dequeue(QName, NoAck, _LimiterPid, CTag0, QState0) -> rabbit_queue_type:consume_spec(), rabbit_fifo_client:state()) -> {ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} | - {error, global_qos_not_supported_for_queue_type | timeout}. + {error, atom(), Format :: string(), FormatArgs :: [term()]}. consume(Q, #{limiter_active := true}, _State) when ?amqqueue_is_quorum(Q) -> - {error, global_qos_not_supported_for_queue_type}; + {error, not_implemented, + "~ts does not support global qos", + [rabbit_misc:rs(amqqueue:get_name(Q))]}; consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> #{no_ack := NoAck, channel_pid := ChPid, @@ -1008,46 +1010,58 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> args => Args, username => ActingUser, priority => Priority}, - {ok, _Infos, QState} = rabbit_fifo_client:checkout(ConsumerTag, - Mode, ConsumerMeta, - QState0), - case single_active_consumer_on(Q) of - true -> - %% get the leader from state - case rabbit_fifo_client:query_single_active_consumer(QState) of - {ok, SacResult} -> - ActivityStatus = case SacResult of - {value, {ConsumerTag, ChPid}} -> - single_active; - _ -> - waiting - end, + case rabbit_fifo_client:checkout( + ConsumerTag, Mode, ConsumerMeta, QState0) of + {ok, _Infos, QState} -> + case single_active_consumer_on(Q) of + true -> + %% get the leader from state + case rabbit_fifo_client:query_single_active_consumer(QState) of + {ok, SacResult} -> + ActivityStatus = case SacResult of + {value, {ConsumerTag, ChPid}} -> + single_active; + _ -> + waiting + end, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, + Prefetch, ActivityStatus == single_active, %% Active + ActivityStatus, Args), + emit_consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, Prefetch, + Args, none, ActingUser), + {ok, QState}; + Err -> + consume_error(Err, QName) + end; + false -> rabbit_core_metrics:consumer_created( ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - Prefetch, ActivityStatus == single_active, %% Active - ActivityStatus, Args), - emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, Prefetch, - Args, none, ActingUser), - {ok, QState}; - {error, Error} -> - Error; - {timeout, _} -> - {error, timeout} + Prefetch, true, %% Active + up, Args), + emit_consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, Prefetch, + Args, none, ActingUser), + {ok, QState} end; - false -> - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, - Prefetch, true, %% Active - up, Args), - emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, Prefetch, - Args, none, ActingUser), - {ok, QState} + Err -> + consume_error(Err, QName) end. +consume_error({error, Reason}, QName) -> + {error, internal_error, + "failed consuming from quorum ~ts: ~tp", + [rabbit_misc:rs(QName), Reason]}; +consume_error({timeout, RaServerId}, QName) -> + {error, internal_error, + "timed out consuming from quorum ~ts: ~tp", + [rabbit_misc:rs(QName), RaServerId]}. + cancel(_Q, #{consumer_tag := ConsumerTag} = Spec, State) -> maybe_send_reply(self(), maps:get(ok_msg, Spec, undefined)), Reason = maps:get(reason, Spec, cancel), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a52897437c66..0b7c1c0bbba9 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -290,19 +290,23 @@ format(Q, Ctx) -> consume(Q, #{mode := {simple_prefetch, 0}}, _) when ?amqqueue_is_stream(Q) -> - {protocol_error, precondition_failed, "consumer prefetch count is not set for stream ~ts", + {error, precondition_failed, + "consumer prefetch count is not set for stream ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]}; consume(Q, #{no_ack := true, mode := {simple_prefetch, _}}, _) when ?amqqueue_is_stream(Q) -> - {protocol_error, not_implemented, + {error, not_implemented, "automatic acknowledgement not supported by stream ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]}; consume(Q, #{limiter_active := true}, _State) when ?amqqueue_is_stream(Q) -> - {error, global_qos_not_supported_for_queue_type}; + {error, not_implemented, + "~ts does not support global qos", + [rabbit_misc:rs(amqqueue:get_name(Q))]}; consume(Q, Spec, #stream_client{} = QState0) when ?amqqueue_is_stream(Q) -> + QName = amqqueue:get_name(Q), %% Messages should include the offset as a custom header. case get_local_pid(QState0) of {LocalPid, QState} when is_pid(LocalPid) -> @@ -314,13 +318,10 @@ consume(Q, Spec, #stream_client{} = QState0) args := Args, ok_msg := OkMsg, acting_user := ActingUser} = Spec, - QName = amqqueue:get_name(Q), rabbit_log:debug("~s:~s Local pid resolved ~0p", [?MODULE, ?FUNCTION_NAME, LocalPid]), case parse_offset_arg( rabbit_misc:table_lookup(Args, <<"x-stream-offset">>)) of - {error, _} = Err -> - Err; {ok, OffsetSpec} -> ConsumerPrefetchCount = case Mode of {simple_prefetch, C} -> C; @@ -344,12 +345,17 @@ consume(Q, Spec, #stream_client{} = QState0) maybe_send_reply(ChPid, OkMsg), _ = rabbit_stream_coordinator:register_local_member_listener(Q), Filter = maps:get(filter, Spec, []), - begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, Filter, filter_spec(Args)) + begin_stream(QState, ConsumerTag, OffsetSpec, Mode, + AckRequired, Filter, filter_spec(Args)); + {error, Reason} -> + {error, precondition_failed, + "failed consuming from stream ~ts: ~tp", + [rabbit_misc:rs(QName), Reason]} end; {undefined, _} -> - {protocol_error, precondition_failed, + {error, precondition_failed, "stream ~ts does not have a running replica on the local node", - [rabbit_misc:rs(amqqueue:get_name(Q))]} + [rabbit_misc:rs(QName)]} end. -spec parse_offset_arg(undefined | diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 35f7c9d5c198..1c1ebf3bc1e4 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -201,7 +201,8 @@ groups() -> leader_transfer_stream_send, list_connections, detach_requeues_two_connections_classic_queue, - detach_requeues_two_connections_quorum_queue + detach_requeues_two_connections_quorum_queue, + attach_to_down_quorum_queue ]}, {metrics, [shuffle], @@ -6596,8 +6597,55 @@ bad_x_cc_annotation_exchange(Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection). +%% Attach a receiver to an unavailable quorum queue. +attach_to_down_quorum_queue(Config) -> + QName = <<"q-down">>, + Address = rabbitmq_amqp_address:queue(QName), + + %% Create quorum queue with single replica on node 2. + {_, _, LinkPair2} = Init2 = init(2, Config), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair2, + QName, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-quorum-initial-group-size">> => {ulong, 1} + }}), + ok = close(Init2), + + %% Make quorum queue unavailable. + ok = rabbit_ct_broker_helpers:stop_broker(Config, 2), + + OpnConf = connection_config(0, Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session0} = amqp10_client:begin_session_sync(Connection), + flush(attaching_receiver), + {ok, _Receiver} = amqp10_client:attach_receiver_link( + Session0, <<"receiver">>, Address), + receive + {amqp10_event, + {session, Session0, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + description = {utf8, Desc}}}}} -> + ?assertMatch( + <<"failed consuming from quorum queue 'q-down' in vhost '/'", _Reason/binary>>, + Desc) + after 9000 -> + ct:fail({missing_event, ?LINE}) + end, + + ok = rabbit_ct_broker_helpers:start_broker(Config, 2), + + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync( + Session, <<"my link pair">>), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = close({Connection, Session, LinkPair}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% internal -%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% receive_all_messages(Receiver, Accept) -> receive_all_messages0(Receiver, Accept, []). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 629361c9eb3e..56e5f4a710c8 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -80,6 +80,7 @@ groups() -> metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, consume_in_minority, + get_in_minority, reject_after_leader_transfer, shrink_all, rebalance, @@ -1030,25 +1031,48 @@ publish_and_restart(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0). consume_in_minority(Config) -> - [Server0, Server1, Server2] = - rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), QQ = ?config(queue_name, Config), - RaName = binary_to_atom(<<"%2F_", QQ/binary>>, utf8), + RaName = binary_to_atom(<<"%2F_", QQ/binary>>), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - rabbit_quorum_queue:stop_server({RaName, Server1}), - rabbit_quorum_queue:stop_server({RaName, Server2}), + ok = rabbit_quorum_queue:stop_server({RaName, Server1}), + ok = rabbit_quorum_queue:stop_server({RaName, Server2}), + + ?assertExit( + {{shutdown, + {connection_closing, + {server_initiated_close, 541, + <<"INTERNAL_ERROR - failed consuming from quorum queue " + "'consume_in_minority' in vhost '/'", _Reason/binary>>}}}, _}, + amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ}, self())), + + ok = rabbit_quorum_queue:restart_server({RaName, Server1}), + ok = rabbit_quorum_queue:restart_server({RaName, Server2}). + +get_in_minority(Config) -> + [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + RaName = binary_to_atom(<<"%2F_", QQ/binary>>), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_quorum_queue:stop_server({RaName, Server1}), + ok = rabbit_quorum_queue:stop_server({RaName, Server2}), ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false})), - rabbit_quorum_queue:restart_server({RaName, Server1}), - rabbit_quorum_queue:restart_server({RaName, Server2}), - ok. + ok = rabbit_quorum_queue:restart_server({RaName, Server1}), + ok = rabbit_quorum_queue:restart_server({RaName, Server2}). single_active_consumer_priority_take_over(Config) -> check_quorum_queues_v4_compat(Config), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index b14decb18971..7ae0893a13eb 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1506,10 +1506,9 @@ consume(Q, QoS, #state{ State1 = State0#state{queue_states = QStates}, State = maybe_set_queue_qos1(QoS, State1), {ok, State}; - {error, Reason} = Err -> - ?LOG_ERROR("Failed to consume from ~s: ~p", - [rabbit_misc:rs(QName), Reason]), - Err + {error, Type, Fmt, Args} -> + ?LOG_ERROR(Fmt, Args), + {error, Type} end end) end; From 2009a21d96c445fa3bfc9fe83897030463f8aabd Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 27 Mar 2025 10:20:45 +0100 Subject: [PATCH 2/2] Apply PR formatting feedback https://github.com/rabbitmq/rabbitmq-server/pull/13625#discussion_r2016008850 https://github.com/rabbitmq/rabbitmq-server/pull/13625#discussion_r2016010107 (cherry picked from commit c151806f7c0860b04b2bc684dd66f3c7931a486b) --- deps/rabbit/src/rabbit_channel.erl | 17 +++++------ deps/rabbit/src/rabbit_queue_type.erl | 2 +- deps/rabbit/src/rabbit_quorum_queue.erl | 40 ++++++++++++------------- 3 files changed, 29 insertions(+), 30 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 8ad4971d5377..86d71d7af902 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1354,10 +1354,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, CurrentConsumers = maps:size(ConsumerMapping), case maps:find(ConsumerTag, ConsumerMapping) of error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity' - rabbit_misc:protocol_error( - not_allowed, - "reached maximum (~B) of consumers per channel", - [MaxConsumers]); + rabbit_misc:protocol_error(not_allowed, + "reached maximum (~B) of consumers per channel", + [MaxConsumers]); error -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User, AuthzContext), @@ -1368,13 +1367,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, _ -> ConsumerTag end, - basic_consume( - QueueName, NoAck, ConsumerPrefetch, ActualTag, - ExclusiveConsume, Args, NoWait, State); + basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualTag, + ExclusiveConsume, Args, NoWait, State); {ok, _} -> %% Attempted reuse of consumer tag. - rabbit_misc:protocol_error( - not_allowed, "attempt to reuse consumer tag '~ts'", [ConsumerTag]) + rabbit_misc:protocol_error(not_allowed, + "attempt to reuse consumer tag '~ts'", + [ConsumerTag]) end; handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 709e7edc8386..4ddf31780538 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -522,7 +522,7 @@ consume(Q, Spec, State) -> case Mod:consume(Q, Spec, CtxState0) of {ok, CtxState} -> {ok, set_ctx(Q, Ctx#ctx{state = CtxState}, State)}; - Err = {error, _Type, _Fmt, _FmtArgs} -> + {error, _Type, _Fmt, _FmtArgs} = Err-> Err end. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index d39a6e8f253f..0d99e9a8bd99 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1010,8 +1010,7 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> args => Args, username => ActingUser, priority => Priority}, - case rabbit_fifo_client:checkout( - ConsumerTag, Mode, ConsumerMeta, QState0) of + case rabbit_fifo_client:checkout(ConsumerTag, Mode, ConsumerMeta, QState0) of {ok, _Infos, QState} -> case single_active_consumer_on(Q) of true -> @@ -1024,29 +1023,30 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> _ -> waiting end, - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, - Prefetch, ActivityStatus == single_active, %% Active - ActivityStatus, Args), - emit_consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, Prefetch, - Args, none, ActingUser), + rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, + ExclusiveConsume, + AckRequired, QName, + Prefetch, + ActivityStatus == single_active, + ActivityStatus, Args), + emit_consumer_created(ChPid, ConsumerTag, + ExclusiveConsume, + AckRequired, QName, + Prefetch, Args, none, + ActingUser), {ok, QState}; Err -> consume_error(Err, QName) end; false -> - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, - Prefetch, true, %% Active - up, Args), - emit_consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, Prefetch, - Args, none, ActingUser), + rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, + ExclusiveConsume, + AckRequired, QName, + Prefetch, true, + up, Args), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, Prefetch, + Args, none, ActingUser), {ok, QState} end; Err ->