Permalink
Browse files

Trace messages going to the queues so we can get the number of messag…

…es delivered
  • Loading branch information...
1 parent 48511b3 commit a0fd1cef7062f67c56274b33e13e0afd61425bc6 @epicads-scott epicads-scott committed Feb 11, 2010
Showing with 106 additions and 7 deletions.
  1. +10 −2 snmp/RABBITMQ-MIB.mib
  2. +8 −2 src/rabbit_snmp_sup.erl
  3. +76 −0 src/rabbit_snmp_tracer.erl
  4. +12 −3 src/rabbit_snmp_worker.erl
View
12 snmp/RABBITMQ-MIB.mib
@@ -105,7 +105,8 @@ QueueStatisticsEntry ::= SEQUENCE {
queueAcksUncommitted Gauge32,
queueConsumers Gauge32,
queueTransactions Gauge32,
- queueMemory Counter64
+ queueMemory Counter64,
+ queueMessagesDelivered Counter64
}
queueVhost OBJECT-TYPE
@@ -192,6 +193,13 @@ queueMemory OBJECT-TYPE
DESCRIPTION "Amount of memory used"
::= { queueTableEntry 12 }
+queueMessagesDelivered OBJECT-TYPE
+ SYNTAX Counter64
+ MAX-ACCESS read-only
+ STATUS current
+ DESCRIPTION "Number of messages delivered to this queue"
+ ::= { queueTableEntry 13 }
+
exchangeTable OBJECT-TYPE
SYNTAX SEQUENCE OF ExchangeEntry
MAX-ACCESS not-accessible
@@ -253,7 +261,7 @@ exchangeAutoDelete OBJECT-TYPE
rabbitGroup OBJECT-GROUP
OBJECTS {
- vhostQueueCount, vhostExchangeCount, vhostMessageCount, queueVhost, queueName, queueDurable, queueAutoDelete, queueMessages, queueUnAcknowledged, queueUnCommitted, queueReady, queueAcksUncommitted, queueConsumers, queueTransactions, queueMemory, vhostName, exchangeVhost, exchangeType, exchangeName, exchangeDurable, exchangeAutoDelete
+ vhostQueueCount, vhostExchangeCount, vhostMessageCount, queueVhost, queueName, queueDurable, queueAutoDelete, queueMessages, queueUnAcknowledged, queueUnCommitted, queueReady, queueAcksUncommitted, queueConsumers, queueTransactions, queueMemory, queueMessagesDelivered, vhostName, exchangeVhost, exchangeType, exchangeName, exchangeDurable, exchangeAutoDelete
}
STATUS current
DESCRIPTION "Groups"
View
10 src/rabbit_snmp_sup.erl
@@ -8,10 +8,16 @@ start_link() ->
init([]) ->
{ok, {{one_for_one, 3, 10},
- [{rabbit_snmp_worker,
+ [{rabbit_snmp_tracer,
+ {rabbit_snmp_tracer, start_link, []},
+ permanent,
+ 10000,
+ worker,
+ [rabbit_snmp_tracer]},
+ {rabbit_snmp_worker,
{rabbit_snmp_worker, start_link, []},
permanent,
10000,
worker,
[rabbit_snmp_worker]}
- ]}}.
+ ]}}.
View
76 src/rabbit_snmp_tracer.erl
@@ -0,0 +1,76 @@
+-module(rabbit_snmp_tracer).
+-behaviour(gen_server).
+
+-export([start/0, start/2, stop/0, stop/1, start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-export([start_trace/1, stop_trace/1]).
+
+-record(state, {table}).
+
+start() ->
+ start_link(),
+ ok.
+
+start(normal, []) ->
+ start_link().
+
+stop() ->
+ ok.
+
+stop(_State) ->
+ stop().
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ Table = ets:new(queue_throughput_stats, []),
+ {ok, #state{table = Table}}.
+
+handle_call({start_trace, Pid}, _From, State) ->
+ ets:insert(State#state.table, {Pid, 0}),
+ erlang:trace(Pid, true, [{tracer, self()}, 'receive']),
+ {reply, ok, State};
+
+handle_call({stop_trace, Pid}, _From, State) ->
+ ets:delete(State#state.table, Pid),
+ erlang:trace(Pid, false, [{tracer, self()}, 'receive']),
+ {reply, ok, State};
+
+handle_call({get_count, Pid}, _From, State) ->
+ Result = case ets:lookup(State#state.table, Pid) of
+ [] -> no_stats;
+ [{Pid, Count}] -> Count
+ end,
+ {reply, Result, State};
+
+handle_call(_Msg,_From,State) ->
+ %io:format("Call: ~p~nState: ~p~n", [Msg, State]),
+ {reply, unknown_command, State}.
+
+handle_cast(_Msg,State) ->
+ %io:format("Cast: ~p~nState: ~p~n", [Msg, State]),
+ {noreply, State}.
+
+handle_info({trace, Pid, 'receive', {'$gen_cast', {deliver,_,_,_}}}, State) ->
+ ets:update_counter(State#state.table, Pid, 1),
+ {noreply, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+start_trace(Pid) ->
+ io:format("Starting Trace of: ~p~n", [Pid]),
+ gen_server:call(?MODULE, {start_trace, Pid}).
+
+stop_trace(Pid) ->
+ io:format("Stopping Trace of: ~p~n", [Pid]),
+ gen_server:call(?MODULE, {stop_trace, Pid}).
+
+terminate(_,_State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
View
15 src/rabbit_snmp_worker.erl
@@ -21,11 +21,14 @@ stop(_State) ->
stop().
start_link() ->
- gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
application:start(snmp),
- {ok, UpdateInterval} = application:get_env(rabbit_snmp, update_interval),
+ UpdateInterval = case application:get_env(rabbit_snmp, update_interval) of
+ undefined -> 1000;
+ Value -> Value
+ end,
io:format("Started snmp state poller w/~pms interval~n", [UpdateInterval]),
erlang:send_after(1, self(), update_stats),
{ok, #state{update_interval = UpdateInterval}}.
@@ -60,6 +63,11 @@ create_queue_row([Row|Rest]) ->
{resource, Vhost, queue, QueueName} = proplists:get_value(name, Row),
ListVhost = binary_to_list(Vhost),
ListQueue = binary_to_list(QueueName),
+ QueuePid = proplists:get_value(pid, Row),
+ MessagesSent = case gen_server:call(rabbit_snmp_tracer, {get_count, QueuePid}) of
+ no_stats -> gen_server:call(rabbit_snmp_tracer, {start_trace, QueuePid}), 0;
+ Count -> Count
+ end,
SnmpRow = {ListVhost, ListQueue,
proplists:get_value(durable, Row),
@@ -71,7 +79,8 @@ create_queue_row([Row|Rest]) ->
proplists:get_value(acks_uncommitted, Row),
proplists:get_value(consumers, Row),
proplists:get_value(transactions, Row),
- proplists:get_value(memory, Row)
+ proplists:get_value(memory, Row),
+ MessagesSent
},
snmpa_local_db:table_create_row(queueTable, ListVhost ++ ListQueue, SnmpRow),

0 comments on commit a0fd1ce

Please sign in to comment.