Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ end}.
{mapping, "prometheus.ssl.max_keepalive", "rabbitmq_prometheus.ssl_config.cowboy_opts.max_keepalive",
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.

%% This option is left here for compatibility reasons. It will make no effect.
{mapping, "prometheus.filter_aggregated_queue_metrics_pattern", "rabbitmq_prometheus.filter_aggregated_queue_metrics_pattern", [{datatype, string}]}.

%% Authentication options ========================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,26 +246,26 @@ register() ->
deregister_cleanup(_) -> ok.

collect_mf('detailed', Callback) ->
collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), queues_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_RAW), Callback),
collect(true, ?CLUSTER_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), queues_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_CLUSTER), Callback),
collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_RAW), Callback),
collect(true, ?CLUSTER_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_CLUSTER), Callback),
%% identity is here to enable filtering on a cluster name (as already happens in existing dashboards)
emit_identity_info(Callback),
ok;
collect_mf('per-object', Callback) ->
collect(true, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback),
collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
totals(Callback),
emit_identity_info(Callback),
ok;
collect_mf(_Registry, Callback) ->
PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false),
collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback),
collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
totals(Callback),
emit_identity_info(Callback),
ok.

collect(PerObjectMetrics, Prefix, VHostsFilter, QueuesFilter, IncludedMFs, Callback) ->
collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) ->
_ = [begin
Data = get_data(Table, PerObjectMetrics, VHostsFilter, QueuesFilter),
Data = get_data(Table, PerObjectMetrics, VHostsFilter),
mf(Callback, Prefix, Contents, Data)
end || {Table, Contents} <- IncludedMFs, not mutually_exclusive_mf(PerObjectMetrics, Table, IncludedMFs)],
ok.
Expand Down Expand Up @@ -492,7 +492,7 @@ emit_gauge_metric_if_defined(Labels, Value) ->
gauge_metric(Labels, Value)
end.

get_data(connection_metrics = Table, false, _, _) ->
get_data(connection_metrics = Table, false, _) ->
{Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) ->
{T,
sum(proplists:get_value(recv_cnt, Props), A1),
Expand All @@ -501,7 +501,7 @@ get_data(connection_metrics = Table, false, _, _) ->
sum(proplists:get_value(channels, Props), A4)}
end, empty(Table), Table),
[{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}];
get_data(channel_metrics = Table, false, _, _) ->
get_data(channel_metrics = Table, false, _) ->
{Table, A1, A2, A3, A4, A5, A6, A7} =
ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
{T,
Expand All @@ -516,40 +516,22 @@ get_data(channel_metrics = Table, false, _, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, name = Name}, Props, _}, {T, A1} = Acc)
when is_list(QueuesFilter) ->
case re:run(Name, QueuesFilter, [{capture, none}]) of
match ->
Acc;
nomatch ->
{T,
sum(proplists:get_value(consumers, Props), A1)
}
end;
({_, Props, _}, {T, A1}) ->
{T,
sum(proplists:get_value(consumers, Props), A1)
}
end, empty(MF), Table),
[{Table, [{consumers, A1}]}];
get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) ->
get_data(queue_metrics = Table, false, VHostsFilter) ->
{Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17} =
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, name = Name}, Props, _}, Acc)
when is_list(QueuesFilter) ->
case re:run(Name, QueuesFilter, [{capture, none}]) of
match ->
Acc;
nomatch ->
sum_queue_metrics(Props, Acc)
end;
({_, Props, _}, Acc) ->
sum_queue_metrics(Props, Acc)
end, empty(Table), Table),
Expand All @@ -560,7 +542,7 @@ get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) ->
{message_bytes_ready, A11}, {message_bytes_unacknowledged, A12},
{messages_paged_out, A13}, {message_bytes_paged_out, A14},
{disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}];
get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchange_metrics;
get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
Expand All @@ -571,14 +553,6 @@ get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchang
%% For queue_coarse_metrics
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, name = Name}, V1, V2, V3, V4}, {T, A1, A2, A3, A4} = Acc)
when is_list(QueuesFilter) ->
case re:run(Name, QueuesFilter, [{capture, none}]) of
match ->
Acc;
nomatch ->
{T, V1 + A1, V2 + A2, V3 + A3, V4 + A4}
end;
({_, V1}, {T, A1}) ->
{T, V1 + A1};
({_, V1, _}, {T, A1}) ->
Expand All @@ -605,31 +579,31 @@ get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchang
_ ->
[Result]
end;
get_data(queue_coarse_metrics = Table, true, VHostsFilter, _) when is_map(VHostsFilter) ->
get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
[Row|Acc];
(_, Acc) ->
Acc
end, [], Table);
get_data(MF, true, VHostsFilter, _) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
Table = queue_metrics,
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
[Row|Acc];
(_, Acc) ->
Acc
end, [], Table);
get_data(queue_consumer_count, true, _, _) ->
get_data(queue_consumer_count, true, _) ->
ets:tab2list(queue_metrics);
get_data(vhost_status, _, _, _) ->
get_data(vhost_status, _, _) ->
[ { #{<<"vhost">> => VHost},
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
true -> 1;
false -> 0
end}
|| VHost <- rabbit_vhost:list() ];
get_data(exchange_bindings, _, _, _) ->
get_data(exchange_bindings, _, _) ->
Exchanges = lists:foldl(fun
(#exchange{internal = true}, Acc) ->
Acc;
Expand All @@ -653,7 +627,7 @@ get_data(exchange_bindings, _, _, _) ->
[{<<"vhost=\"", VHost/binary, "\",exchange=\"", Name/binary, "\",type=\"", Type/binary, "\"">>,
Bindings}|Acc]
end, [], WithCount);
get_data(exchange_names, _, _, _) ->
get_data(exchange_names, _, _) ->
lists:foldl(fun
(#exchange{internal = true}, Acc) ->
Acc;
Expand All @@ -663,7 +637,7 @@ get_data(exchange_names, _, _, _) ->
Label = <<"vhost=\"", VHost/binary, "\",exchange=\"", Name/binary, "\",type=\"", (atom_to_binary(EType))/binary, "\"">>,
[{Label, 1}|Acc]
end, [], rabbit_exchange:list());
get_data(Table, _, _, _) ->
get_data(Table, _, _) ->
ets:tab2list(Table).


Expand Down Expand Up @@ -737,10 +711,3 @@ vhosts_filter_from_pdict() ->
maps:merge(All, Enabled)
end.

queues_filter_from_pdict() ->
case get(prometheus_queue_filter) of
undefined ->
false;
Pattern ->
Pattern
end.
6 changes: 0 additions & 6 deletions deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,6 @@ put_filtering_options_into_process_dictionary(Request) ->
put(prometheus_mf_filter, Fs);
_ -> ok
end,
case application:get_env(rabbitmq_prometheus, filter_aggregated_queue_metrics_pattern, undefined) of
undefined -> ok;
Pattern ->
{ok, CompiledPattern} = re:compile(Pattern),
put(prometheus_queue_filter, CompiledPattern)
end,
ok.

parse_vhosts(N) when is_binary(N) ->
Expand Down