Skip to content
This repository has been archived by the owner on Nov 16, 2020. It is now read-only.

Periodic garbage collection of stats #27

Merged
merged 8 commits into from Feb 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/rabbit_mgmt_agent_sup.erl
Expand Up @@ -43,7 +43,9 @@ init([]) ->
ExternalStats = {rabbit_mgmt_external_stats,
{rabbit_mgmt_external_stats, start_link, []},
permanent, 5000, worker, [rabbit_mgmt_external_stats]},
{ok, {{one_for_one, 100, 10}, [ST, MD, ExternalStats | MC ++ MGC]}}.
GC = {rabbit_mgmt_gc, {rabbit_mgmt_gc, start_link, []},
permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_gc]},
{ok, {{one_for_one, 100, 10}, [ST, MD, ExternalStats, GC | MC ++ MGC]}}.

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
10 changes: 6 additions & 4 deletions src/rabbit_mgmt_external_stats.erl
Expand Up @@ -28,7 +28,6 @@

-include_lib("rabbit_common/include/rabbit.hrl").

-define(REFRESH_RATIO, 5000).
-define(METRICS_KEYS, [fd_used, sockets_used, mem_used, disk_free, proc_used, gc_num,
gc_bytes_reclaimed, context_switches]).

Expand All @@ -46,7 +45,8 @@
fd_total,
fhc_stats,
node_owners,
last_ts
last_ts,
interval
}).

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -336,9 +336,11 @@ format_mochiweb_option(_K, V) ->
%%--------------------------------------------------------------------

init([]) ->
{ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
State = #state{fd_total = file_handle_cache:ulimit(),
fhc_stats = file_handle_cache_stats:get(),
node_owners = sets:new()},
node_owners = sets:new(),
interval = Interval},
%% We can update stats straight away as they need to be available
%% when the mgmt plugin starts a collector
{ok, emit_update(State)}.
Expand Down Expand Up @@ -370,7 +372,7 @@ emit_update(State0) ->
rabbit_core_metrics:node_stats(coarse_metrics, MStats),
rabbit_core_metrics:node_stats(node_metrics, OStats0),
rabbit_event:notify(node_stats, PStats ++ MStats ++ OStats),
erlang:send_after(?REFRESH_RATIO, self(), emit_update),
erlang:send_after(State#state.interval, self(), emit_update),
emit_node_node_stats(State).

emit_node_node_stats(State = #state{node_owners = Owners}) ->
Expand Down
237 changes: 237 additions & 0 deletions src/rabbit_mgmt_gc.erl
@@ -0,0 +1,237 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_mgmt_gc).

-include_lib("rabbit_common/include/rabbit.hrl").

-record(state, {timer,
interval
}).

-spec start_link() -> rabbit_types:ok_pid_or_error().

-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

init(_) ->
Interval = rabbit_misc:get_env(rabbitmq_management_agent, metrics_gc_interval, 120000),
{ok, start_timer(#state{interval = Interval})}.

handle_call(test, _From, State) ->
{reply, ok, State}.

handle_cast(_Request, State) ->
{noreply, State}.

handle_info(start_gc, State) ->
gc_connections(),
gc_vhosts(),
gc_channels(),
gc_queues(),
gc_exchanges(),
gc_nodes(),
{noreply, start_timer(State)}.

terminate(_Reason, #state{timer = TRef}) ->
erlang:cancel_timer(TRef),
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

start_timer(#state{interval = Interval} = St) ->
TRef = erlang:send_after(Interval, self(), start_gc),
St#state{timer = TRef}.

gc_connections() ->
gc_process(connection_stats_coarse_conn_stats),
gc_process(connection_created_stats),
gc_process(connection_stats).

gc_vhosts() ->
VHosts = rabbit_vhost:list(),
GbSet = gb_sets:from_list(VHosts),
gc_entity(vhost_stats_coarse_conn_stats, GbSet),
gc_entity(vhost_stats_fine_stats, GbSet),
gc_entity(vhost_msg_stats, GbSet),
gc_entity(vhost_msg_rates, GbSet),
gc_entity(vhost_stats_deliver_stats, GbSet).

gc_channels() ->
gc_process(channel_created_stats),
gc_process(channel_stats),
gc_process(channel_stats_fine_stats),
gc_process(channel_process_stats),
gc_process(channel_stats_deliver_stats),
ok.

gc_queues() ->
Queues = rabbit_amqqueue:list_names(),
GbSet = gb_sets:from_list(Queues),
gc_entity(queue_stats_publish, GbSet),
gc_entity(queue_stats, GbSet),
gc_entity(queue_msg_stats, GbSet),
gc_entity(queue_process_stats, GbSet),
gc_entity(queue_msg_rates, GbSet),
gc_entity(queue_stats_deliver_stats, GbSet),
gc_process_and_entity(channel_queue_stats_deliver_stats_queue_index, GbSet),
gc_process_and_entity(consumer_stats_queue_index, GbSet),
gc_process_and_entity(consumer_stats_channel_index, GbSet),
gc_process_and_entity(consumer_stats, GbSet),
gc_process_and_entity(channel_exchange_stats_fine_stats_channel_index, GbSet),
gc_process_and_entity(channel_queue_stats_deliver_stats, GbSet),
gc_process_and_entity(channel_queue_stats_deliver_stats_channel_index, GbSet),
ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()),
gc_entities(queue_exchange_stats_publish, GbSet, ExchangeGbSet),
gc_entities(queue_exchange_stats_publish_queue_index, GbSet, ExchangeGbSet),
gc_entities(queue_exchange_stats_publish_exchange_index, GbSet, ExchangeGbSet).

gc_exchanges() ->
Exchanges = rabbit_exchange:list_names(),
GbSet = gb_sets:from_list(Exchanges),
gc_entity(exchange_stats_publish_in, GbSet),
gc_entity(exchange_stats_publish_out, GbSet),
gc_entity(channel_exchange_stats_fine_stats_exchange_index, GbSet),
gc_process_and_entity(channel_exchange_stats_fine_stats, GbSet).

gc_nodes() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
GbSet = gb_sets:from_list(Nodes),
gc_entity(node_stats, GbSet),
gc_entity(node_coarse_stats, GbSet),
gc_entity(node_persister_stats, GbSet),
gc_entity(node_node_coarse_stats_node_index, GbSet),
gc_entity(node_node_stats, GbSet),
gc_entity(node_node_coarse_stats, GbSet).

gc_process(Table) ->
ets:foldl(fun({{Pid, _} = Key, _}, none) ->
gc_process(Pid, Table, Key);
({Pid = Key, _}, none) ->
gc_process(Pid, Table, Key);
({Pid = Key, _, _}, none) ->
gc_process(Pid, Table, Key);
({{Pid, _} = Key, _, _, _, _}, none) ->
gc_process(Pid, Table, Key)
end, none, Table).

gc_process(Pid, Table, Key) ->
case erlang:is_process_alive(Pid) of
true ->
none;
false ->
ets:delete(Table, Key),
none
end.

gc_entity(Table, GbSet) ->
ets:foldl(fun({{_, Id} = Key, _}, none) when Table == node_node_stats ->
gc_entity(Id, Table, Key, GbSet);
({{{_, Id}, _} = Key, _}, none) when Table == node_node_coarse_stats ->
gc_entity(Id, Table, Key, GbSet);
({{Id, _} = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
({{Id, _} = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet)
end, none, Table).

gc_entity(Id, Table, Key, GbSet) ->
case gb_sets:is_member(Id, GbSet) of
true ->
none;
false ->
ets:delete(Table, Key),
none
end.

gc_process_and_entity(Table, GbSet) ->
ets:foldl(fun({{Id, Pid, _} = Key, _}, none) when Table == consumer_stats ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({Id = Key, {_, Pid, _}} = Object, none)
when Table == consumer_stats_queue_index ->
gc_object(Pid, Table, Object),
gc_entity(Id, Table, Key, GbSet);
({Pid = Key, {Id, _, _}} = Object, none)
when Table == consumer_stats_channel_index ->
gc_object(Id, Table, Object, GbSet),
gc_process(Pid, Table, Key);
({Id = Key, {{Pid, _}, _}} = Object, none)
when Table == channel_exchange_stats_fine_stats_exchange_index;
Table == channel_queue_stats_deliver_stats_queue_index ->
gc_object(Pid, Table, Object),
gc_entity(Id, Table, Key, GbSet);
({Pid = Key, {{_, Id}, _}} = Object, none)
when Table == channel_exchange_stats_fine_stats_channel_index;
Table == channel_queue_stats_deliver_stats_channel_index ->
gc_object(Id, Table, Object, GbSet),
gc_process(Pid, Table, Key);
({{{Pid, Id}, _} = Key, _}, none)
when Table == channel_queue_stats_deliver_stats;
Table == channel_exchange_stats_fine_stats ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{{Pid, Id}, _} = Key, _, _, _, _, _, _, _, _}, none) ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet)
end, none, Table).

gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
case erlang:is_process_alive(Pid) orelse gb_sets:is_member(Id, GbSet) of
true ->
none;
false ->
ets:delete(Table, Key),
none
end.

gc_object(Pid, Table, Object) ->
case erlang:is_process_alive(Pid) of
true ->
none;
false ->
ets:delete_object(Table, Object),
none
end.

gc_object(Id, Table, Object, GbSet) ->
case gb_sets:is_member(Id, GbSet) of
true ->
none;
false ->
ets:delete_object(Table, Object),
none
end.

gc_entities(Table, QueueGbSet, ExchangeGbSet) ->
ets:foldl(fun({{{Q, X}, _} = Key, _}, none)
when Table == queue_exchange_stats_publish ->
gc_entity(Q, Table, Key, QueueGbSet),
gc_entity(X, Table, Key, ExchangeGbSet);
({Q, {{_, X}, _}} = Object, none)
when Table == queue_exchange_stats_publish_queue_index ->
gc_object(X, Table, Object, ExchangeGbSet),
gc_entity(Q, Table, Q, QueueGbSet);
({X, {{Q, _}, _}} = Object, none)
when Table == queue_exchange_stats_publish_exchange_index ->
gc_object(Q, Table, Object, QueueGbSet),
gc_entity(X, Table, X, ExchangeGbSet)
end, none, Table).
4 changes: 2 additions & 2 deletions src/rabbit_mgmt_metrics_collector.erl
Expand Up @@ -173,7 +173,7 @@ aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions}, NextStats,
policies = {BPolicies, _, GPolicies}} = State) ->
Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct),
Diff = get_difference(Id, Stats, State),
[insert_entry(vhost_stats_coarse_conn_stats, vhost({connection_created_stats, Id}),
[insert_entry(vhost_stats_coarse_conn_stats, vhost({connection_created, Id}),
TS, Diff, Size, Interval, true) || {Size, Interval} <- GPolicies],
[begin
insert_entry(connection_stats_coarse_conn_stats, Id, TS,
Expand Down Expand Up @@ -421,7 +421,7 @@ vhost(#resource{virtual_host = VHost}) ->
vhost({queue_stats, #resource{virtual_host = VHost}}) ->
VHost;
vhost({TName, Pid}) ->
pget(vhost, lookup_element(TName, Pid, 3)).
pget(vhost, lookup_element(TName, Pid, 2)).

exchange_exists(Name) ->
case rabbit_exchange:lookup(Name) of
Expand Down