Skip to content

Commit

Permalink
Merge pull request #31 from mokshaproject/feature/more-stats
Browse files Browse the repository at this point in the history
Export a list of the times taken to process each message.
  • Loading branch information
ralphbean committed Jul 27, 2015
2 parents eb9762c + ef61ce4 commit d777802
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion moksha.hub/moksha/hub/api/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
:meth:`Consumer.consume` method.
.. moduleauthor:: Luke Macken <lmacken@redhat.com>
.. moduleauthor:: Ralph Bean <rbean@redhat.com>
"""

import json
import threading
import time
import logging
log = logging.getLogger('moksha.hub')

Expand Down Expand Up @@ -60,6 +62,7 @@ def __init__(self, hub):
# the queue to do "consume" work.
self.incoming = queue.Queue()
self.headcount_in = self.headcount_out = 0
self._times = []

callback = self._consume
if self.jsonify:
Expand All @@ -86,14 +89,15 @@ def __init__(self, hub):
self._initialized = True

def __json__(self):

if self._initialized:
backlog = self.incoming.qsize()
headcount_out = self.headcount_out
headcount_in = self.headcount_in
times = self._times
else:
backlog = None
headcount_out = headcount_in = 0
times = []

results = {
"name": type(self).__name__,
Expand All @@ -105,10 +109,12 @@ def __json__(self):
"backlog": backlog,
"headcount_out": headcount_out,
"headcount_in": headcount_in,
"times": times,
}
# Reset these counters before returning.
self.headcount_out = self.headcount_in = 0
self._exception_count = 0
self._times = []
return results

def debug(self, message):
Expand Down Expand Up @@ -163,6 +169,7 @@ def _work(self):
# This is a blocking call. It waits until a message is available.
message = self.incoming.get()
self.headcount_out += 1
start = time.time()

# Then we are being asked to quit
if message is StopIteration:
Expand Down Expand Up @@ -192,6 +199,9 @@ def _work(self):
except Exception as e:
self.log.exception(message)

# Record how long it took to process this message (for stats)
self._times.append(time.time() - start)

self.debug("Going back to waiting on the incoming queue.")

self.debug("Worker thread exiting.")
Expand Down

0 comments on commit d777802

Please sign in to comment.