Skip to content

Commit

Permalink
Merge pull request #57 from mikebonnet/consumer-times-leak
Browse files Browse the repository at this point in the history
prevent a possible memory leak in Consumer
  • Loading branch information
ralphbean committed May 25, 2018
2 parents 8809a98 + 44fbc08 commit b5cadff
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 4 deletions.
7 changes: 4 additions & 3 deletions moksha.hub/moksha/hub/api/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
log = logging.getLogger('moksha.hub')

import six.moves.queue as queue
from collections import deque

from kitchen.iterutils import iterate
from moksha.common.lib.helpers import create_app_engine
Expand Down Expand Up @@ -60,7 +61,7 @@ def __init__(self, hub):
# the queue to do "consume" work.
self.incoming = queue.Queue()
self.headcount_in = self.headcount_out = 0
self._times = []
self._times = deque(maxlen=1024)

callback = self._consume
if self.jsonify:
Expand Down Expand Up @@ -99,7 +100,7 @@ def __json__(self):
backlog = self.incoming.qsize()
headcount_out = self.headcount_out
headcount_in = self.headcount_in
times = self._times
times = list(self._times)
else:
backlog = None
headcount_out = headcount_in = 0
Expand All @@ -120,7 +121,7 @@ def __json__(self):
# Reset these counters before returning.
self.headcount_out = self.headcount_in = 0
self._exception_count = 0
self._times = []
self._times.clear()
return results

def debug(self, message):
Expand Down
100 changes: 99 additions & 1 deletion moksha.hub/moksha/hub/tests/test_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,15 @@ def fake_register_consumer(self, cons):
I'm not sure how to do that, so we're going to fake it and manually
add this consumer to the list of consumers of which the Hub is aware.
"""
consume = cons(self.hub).consume
consumer = cons(self.hub)
consume = consumer.consume
for topic in iterate(cons.topic):
self.hub.topics[topic] = self.hub.topics.get(topic, [])
if consume not in self.hub.topics[topic]:
print('registering fake topic %r' % topic)
self.hub.topics[topic].append(consume)
sleep(sleep_duration)
return consumer

@testutils.crosstest
def test_abstract(self):
Expand Down Expand Up @@ -388,6 +390,102 @@ def _consume(self, message):
central = CentralMokshaHub(config, [TestConsumer], [])
central.close()

@testutils.crosstest
def test_consumer_stats_queued(self):
""" Verify that message processing stats are set for queued messages. """

class TestConsumer(moksha.hub.api.consumer.Consumer):
topic = self.a_topic

def consume(self, message):
pass

cons = self.fake_register_consumer(TestConsumer)

for i in range(5):
self.hub.send_message(topic=self.a_topic, message=secret)

simulate_reactor(sleep_duration)
sleep(sleep_duration)

eq_(cons.headcount_in, 5)
eq_(cons.headcount_out, 0)
eq_(cons._exception_count, 0)
eq_(len(cons._times), 0)

@testutils.crosstest
def test_consumer_stats_processed(self):
""" Verify that message processing stats are set for processed messages. """

class TestConsumer(moksha.hub.api.consumer.Consumer):
topic = self.a_topic

def consume(self, message):
pass

self.hub.config['moksha.blocking_mode'] = True
cons = self.fake_register_consumer(TestConsumer)

for i in range(5):
self.hub.send_message(topic=self.a_topic, message=secret)

simulate_reactor(sleep_duration)
sleep(sleep_duration)

eq_(cons.headcount_in, 5)
eq_(cons.headcount_out, 5)
eq_(cons._exception_count, 0)
eq_(len(cons._times), 5)

@testutils.crosstest
def test_consumer_stats_exceptions(self):
""" Verify that message processing stats are set for messages that generate exceptions. """

class TestConsumer(moksha.hub.api.consumer.Consumer):
topic = self.a_topic

def consume(self, message):
if message['body'] % 2:
raise RuntimeError()

self.hub.config['moksha.blocking_mode'] = True
cons = self.fake_register_consumer(TestConsumer)

for i in range(5):
self.hub.send_message(topic=self.a_topic, message=i)

simulate_reactor(sleep_duration)
sleep(sleep_duration)

eq_(cons.headcount_in, 5)
eq_(cons.headcount_out, 5)
eq_(cons._exception_count, 2)
eq_(len(cons._times), 5)

@testutils.crosstest
def test_consumer_stats_overflow(self):
""" Verify that Consumer._times doesn't grow beyond a maximum size. """

class TestConsumer(moksha.hub.api.consumer.Consumer):
topic = self.a_topic

def consume(self, message):
pass

self.hub.config['moksha.blocking_mode'] = True
cons = self.fake_register_consumer(TestConsumer)

for i in range(1500):
self.hub.send_message(topic=self.a_topic, message=secret)

simulate_reactor(sleep_duration)
sleep(sleep_duration)

eq_(cons.headcount_in, 1500)
eq_(cons.headcount_out, 1500)
eq_(cons._exception_count, 0)
eq_(len(cons._times), 1024)


class TestProducer:
def _setUp(self):
Expand Down

0 comments on commit b5cadff

Please sign in to comment.