Skip to content

Commit

Permalink
Global counters per protocol + protocol AND queue_type
Browse files Browse the repository at this point in the history
This way we can show how many messages were received via a certain
protocol (stream is the second real protocol besides the default amqp091
one), as well as by queue type, which is something that many asked for a
really long time.

The most important aspect is that we can also see them by protocol AND
queue_type, which becomes very important for Streams, which have
different rules from regular queues (e.g. for example, consuming
messages is non-destructive, and deep queue backlogs - think billions of
messages - are normal). Alerting and consumer scaling due to deep
backlogs will now work correctly, as we can distinguish between regular
queues & streams.

This has gone through a few cycles, with @mkuratczyk & @dcorbacho
covering most of the ground. @dcorbacho had most of this in
#3045, but the main
branch went through a few changes in the meantime. Rather than resolving
all the conflicts, and then making the necessary changes, we (@gerhard +
@kjnilsson) took all learnings and started re-applying a lot of the
existing code from #3045. We are confident in this approach and would
like to see it through. We continued working on this with @dumbbell, and
the most important changes are captured in
rabbitmq/seshat#1.

We expose these global counters in rabbitmq_prometheus via a new
collector. We don't want to keep modifying the existing collector, which
grew really complex in parts, especially since we introduced
aggregation, but start with a new namespace, `rabbitmq_global_`, and
continue building on top of it. The idea is to build in parallel, and
slowly transition to the new metrics, because semantically the changes
are too big since streams, and we have been discussing protocol-specific
metrics with @kjnilsson, which makes me think that this approach is
least disruptive and... simple.

While at this, we removed redundant empty return value handling in the
channel. The function called no longer returns this.

Also removed all DONE / TODO & other comments - we'll handle them when
the time comes, no need to leave TODO reminders.

Pairs @kjnilsson @dcorbacho @dumbbell
(this is multiple commits squashed into one)

Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
  • Loading branch information
gerhard committed Jun 22, 2021
1 parent 0cbef99 commit c797125
Show file tree
Hide file tree
Showing 14 changed files with 578 additions and 197 deletions.
1 change: 1 addition & 0 deletions deps/rabbit/BUILD.bazel
Expand Up @@ -155,6 +155,7 @@ RUNTIME_DEPS = [
"@observer_cli//:bazel_erlang_lib",
"@osiris//:bazel_erlang_lib",
"@recon//:bazel_erlang_lib",
"@seshat//:bazel_erlang_lib",
"@sysmon_handler//:bazel_erlang_lib",
"@systemd//:bazel_erlang_lib",
]
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/Makefile
Expand Up @@ -136,14 +136,15 @@ APPS_DIR := $(CURDIR)/apps
LOCAL_DEPS = sasl rabbitmq_prelaunch os_mon inets compiler public_key crypto ssl syntax_tools xmerl

BUILD_DEPS = rabbitmq_cli
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd seshat
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper

PLT_APPS += mnesia

dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris master
dep_systemd = hex 0.6.1
dep_seshat = git https://github.com/rabbitmq/seshat main

define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit.erl
Expand Up @@ -127,6 +127,13 @@
{requires, pre_boot},
{enables, external_infrastructure}]}).

-rabbit_boot_step({rabbit_global_counters,
[{description, "global counters"},
{mfa, {rabbit_global_counters, boot_step,
[]}},
{requires, pre_boot},
{enables, external_infrastructure}]}).

-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
Expand Down
93 changes: 62 additions & 31 deletions deps/rabbit/src/rabbit_channel.erl
Expand Up @@ -1285,6 +1285,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
confirm_enabled = ConfirmEnabled,
delivery_flow = Flow
}) ->
rabbit_global_counters:messages_received(amqp091, 1),
check_msg_size(Content, MaxMessageSize, GCThreshold),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User, AuthzContext),
Expand All @@ -1302,7 +1303,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{MsgSeqNo, State1} =
case DoConfirm orelse Mandatory of
false -> {undefined, State};
true -> SeqNo = State#ch.publish_seqno,
true -> rabbit_global_counters:messages_received_confirm(amqp091, 1),
SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
Expand All @@ -1314,9 +1316,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Username, TraceState),
DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
{Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
none ->
deliver_to_queues(DQ, State1);
{Msgs, Acks} ->
Msgs1 = ?QUEUE:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
Expand Down Expand Up @@ -1360,14 +1364,14 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
DeliveryTag, QueueStates0)
end) of
{ok, MessageCount, Msg, QueueStates} ->
{ok, QueueType} = rabbit_queue_type:module(QueueName, QueueStates),
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, Msg,
State#ch{queue_states = QueueStates});
QueueType, State#ch{queue_states = QueueStates});
{empty, QueueStates} ->
{ok, QueueType} = rabbit_queue_type:module(QueueName, QueueStates),
rabbit_global_counters:messages_get_empty(amqp091, QueueType, 1),
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
{reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}};
empty ->
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
{reply, #'basic.get_empty'{}, State};
{error, {unsupported, single_active_consumer}} ->
rabbit_misc:protocol_error(
resource_locked,
Expand Down Expand Up @@ -1692,9 +1696,9 @@ handle_method(#'tx.select'{}, _, State) ->
handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");

handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
limiter = Limiter}) ->
State1 = queue_fold(fun deliver_to_queues/2, State, Msgs),
State1 = queue_fold(fun deliver_to_queues/2, State, Deliveries),
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
{State2, Actions2} =
lists:foldl(fun ({ack, A}, {Acc, Actions}) ->
Expand Down Expand Up @@ -1954,7 +1958,7 @@ internal_reject(Requeue, Acked, Limiter,
ok = notify_limiter(Limiter, Acked),
{State#ch{queue_states = QueueStates}, Actions}.

record_sent(Type, Tag, AckRequired,
record_sent(Type, QueueType, Tag, AckRequired,
Msg = {QName, _QPid, MsgId, Redelivered, _Message},
State = #ch{cfg = #conf{channel = ChannelNum,
trace_state = TraceState,
Expand All @@ -1964,15 +1968,28 @@ record_sent(Type, Tag, AckRequired,
unacked_message_q = UAMQ,
next_tag = DeliveryTag
}) ->
?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of
{get, true} -> get;
{get, false} -> get_no_ack;
{deliver, true} -> deliver;
{deliver, false} -> deliver_no_ack
end, State),
rabbit_global_counters:messages_delivered(amqp091, QueueType, 1),
?INCR_STATS(queue_stats, QName, 1,
case {Type, AckRequired} of
{get, true} ->
rabbit_global_counters:messages_delivered_get_manual_ack(amqp091, QueueType, 1),
get;
{get, false} ->
rabbit_global_counters:messages_delivered_get_auto_ack(amqp091, QueueType, 1),
get_no_ack;
{deliver, true} ->
rabbit_global_counters:messages_delivered_consume_manual_ack(amqp091, QueueType, 1),
deliver;
{deliver, false} ->
rabbit_global_counters:messages_delivered_consume_auto_ack(amqp091, QueueType, 1),
deliver_no_ack
end, State),
case Redelivered of
true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
false -> ok
true ->
rabbit_global_counters:messages_redelivered(amqp091, QueueType, 1),
?INCR_STATS(queue_stats, QName, 1, redeliver, State);
false ->
ok
end,
DeliveredAt = os:system_time(millisecond),
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
Expand Down Expand Up @@ -2034,8 +2051,14 @@ ack(Acked, State = #ch{queue_states = QueueStates0}) ->
ok = notify_limiter(State#ch.limiter, Acked),
{State#ch{queue_states = QueueStates}, Actions}.

incr_queue_stats(QName, MsgIds, State) ->
incr_queue_stats(QName, MsgIds, State = #ch{queue_states = QueueStates}) ->
Count = length(MsgIds),
case rabbit_queue_type:module(QName, QueueStates) of
{ok, QueueType} ->
rabbit_global_counters:messages_acknowledged(amqp091, QueueType, Count);
_ ->
noop
end,
?INCR_STATS(queue_stats, QName, Count, ack, State).

%% {Msgs, Acks}
Expand Down Expand Up @@ -2108,15 +2131,16 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
confirm = false,
mandatory = false},
_RoutedToQueueNames = []}, State) -> %% optimisation
_RoutedToQueueNames = []}, State) -> %% optimisation when there are no queues
?INCR_STATS(exchange_stats, XName, 1, publish, State),
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{exchange_name = XName},
mandatory = Mandatory,
confirm = Confirm,
msg_seq_no = MsgSeqNo},
_RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) ->
_RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
AllNames = case rabbit_amqqueue:lookup(QName) of
{ok, Q0} ->
case amqqueue:get_options(Q0) of
Expand All @@ -2128,6 +2152,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
Qs = rabbit_amqqueue:lookup(AllNames),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
Expand Down Expand Up @@ -2164,6 +2189,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
end,
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
Expand Down Expand Up @@ -2213,11 +2239,13 @@ infer_extra_bcc(Qs) ->
process_routing_mandatory(_Mandatory = true,
_RoutedToQs = [],
Msg, State) ->
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
ok = basic_return(Msg, State, no_route),
ok;
process_routing_mandatory(_Mandatory = false,
_RoutedToQs = [],
#basic_message{exchange_name = ExchangeName}, State) ->
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
ok;
process_routing_mandatory(_, _, _, _) ->
Expand Down Expand Up @@ -2245,6 +2273,7 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
case rabbit_node_monitor:pause_partition_guard() of
ok ->
Confirms = lists:append(C),
rabbit_global_counters:messages_confirmed(amqp091, length(Confirms)),
Rejects = lists:append(R),
ConfirmMsgSeqNos =
lists:foldl(
Expand Down Expand Up @@ -2721,8 +2750,9 @@ handle_deliver0(ConsumerTag, AckRequired,
redelivered = Redelivered,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
case rabbit_queue_type:module(QName, Qs) of
{ok, rabbit_classic_queue} ->
{ok, QueueType} = rabbit_queue_type:module(QName, Qs),
case QueueType of
rabbit_classic_queue ->
ok = rabbit_writer:send_command_and_notify(
WriterPid, QPid, self(), Deliver, Content);
_ ->
Expand All @@ -2732,13 +2762,14 @@ handle_deliver0(ConsumerTag, AckRequired,
undefined -> ok;
_ -> rabbit_basic:maybe_gc_large_msg(Content, GCThreshold)
end,
record_sent(deliver, ConsumerTag, AckRequired, Msg, State).
record_sent(deliver, QueueType, ConsumerTag, AckRequired, Msg, State).

handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}, State) ->
content = Content}},
QueueType, State) ->
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
Expand All @@ -2747,7 +2778,7 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State)}.
{noreply, record_sent(get, QueueType, DeliveryTag, not(NoAck), Msg, State)}.

init_tick_timer(State = #ch{tick_timer = undefined}) ->
{ok, Interval} = application:get_env(rabbit, channel_tick_interval),
Expand Down Expand Up @@ -2783,10 +2814,10 @@ get_operation_timeout_and_deadline() ->
Deadline = now_millis() + Timeout,
{Timeout, Deadline}.

queue_fold(Fun, Init, Q) ->
case ?QUEUE:out(Q) of
{empty, _Q} -> Init;
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
queue_fold(Fun, Acc, Queue) ->
case ?QUEUE:out(Queue) of
{empty, _Queue} -> Acc;
{{value, Item}, Queue1} -> queue_fold(Fun, Fun(Item, Acc), Queue1)
end.

evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
Expand Down

0 comments on commit c797125

Please sign in to comment.