Skip to content

Commit

Permalink
Merge pull request #39 from mokshaproject/feature/blocking-mode
Browse files Browse the repository at this point in the history
Introduce a new "blocking mode" to properly support durable queues.
  • Loading branch information
ralphbean committed May 31, 2017
2 parents 733d8d7 + ee83727 commit 7870415
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 32 deletions.
26 changes: 26 additions & 0 deletions docs/main/Consumers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,29 @@ After modifying your entry-points, you'll need to re-generate your project's `eg
Moksha will now automatically detect, instantiate, and feed your consumer.

Configuring
-----------

A few configuration options can affect the behavior of your consumers.

.. code-block::
moksha.workers_per_consumer = 3
By default, moksha will consume all messages off the bus as they're available
and store them in an internal queue. A number of threads (workers) are spawned
that handle messages off of that queue in parallel. If you're having problems
scaling your consumer, try increasing or decreasing ``moksha.workers_per_consumer``.

.. code-block::
moksha.blocking_mode = False
As stated above, by default moksha will consumer all messages off the bus and
store them in an internal queue. If you don't want this behavior (say, if
you're using a broker with a *durable queue*), then set
``moksha.blocking_mode`` to True in your configuration. This will cause moksha
to block on each message received off the bus, ensuring that the consumer
handles it before signalling the broker that moksha is ready for another
message.
79 changes: 47 additions & 32 deletions moksha.hub/moksha/hub/api/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from kitchen.iterutils import iterate
from moksha.common.lib.helpers import create_app_engine
from moksha.common.lib.converters import asbool
import moksha.hub.reactor


Expand Down Expand Up @@ -82,9 +83,17 @@ def __init__(self, hub):
self.engine = create_app_engine(app, hub.config)
self.DBSession = sessionmaker(bind=self.engine)()

self.N = int(self.hub.config.get('moksha.workers_per_consumer', 1))
for i in range(self.N):
moksha.hub.reactor.reactor.callInThread(self._work)
self.blocking_mode = asbool(self.hub.config.get('moksha.blocking_mode', False))
if self.blocking_mode:
log.info("Blocking mode true for %r. "
"Messages handled as they arrive." % self)
else:
self.N = int(self.hub.config.get('moksha.workers_per_consumer', 1))
log.info("Blocking mode false for %r. "
"Messages to be queued and distributed to %r threads." % (
self, self.N))
for i in range(self.N):
moksha.hub.reactor.reactor.callInThread(self._work_loop)

self._initialized = True

Expand Down Expand Up @@ -162,49 +171,55 @@ def _consume_json(self, message):

def _consume(self, message):
self.headcount_in += 1
self.incoming.put(message)
if self.blocking_mode:
# Do the work right now
self._do_work(message)
else:
# Otherwise, put the message in a queue for other threads to handle
self.incoming.put(message)

def _work(self):
def _work_loop(self):
while True:
# This is a blocking call. It waits until a message is available.
message = self.incoming.get()
self.headcount_out += 1
start = time.time()

# Then we are being asked to quit
if message is StopIteration:
break
self._do_work(message)
self.debug("Worker thread exiting.")

self.debug("Worker thread picking a message.")
try:
self.validate(message)
except Exception as e:
log.warn("Received invalid message %r" % e)
continue
def _do_work(self, message):
self.headcount_out += 1
start = time.time()

try:
self.pre_consume(message)
except Exception as e:
self.log.exception(message)
self.debug("Worker thread picking a message.")
try:
self.validate(message)
except Exception as e:
log.warn("Received invalid message %r" % e)
return

try:
self.consume(message)
except Exception as e:
self.log.exception(message)
# Keep track of how many exceptions we've hit in a row
self._exception_count += 1
try:
self.pre_consume(message)
except Exception as e:
self.log.exception(message)

try:
self.post_consume(message)
except Exception as e:
self.log.exception(message)
try:
self.consume(message)
except Exception as e:
self.log.exception(message)
# Keep track of how many exceptions we've hit in a row
self._exception_count += 1

# Record how long it took to process this message (for stats)
self._times.append(time.time() - start)
try:
self.post_consume(message)
except Exception as e:
self.log.exception(message)

self.debug("Going back to waiting on the incoming queue.")
# Record how long it took to process this message (for stats)
self._times.append(time.time() - start)

self.debug("Worker thread exiting.")
self.debug("Going back to waiting on the incoming queue.")

def validate(self, message):
""" Override to implement your own validation scheme. """
Expand Down

0 comments on commit 7870415

Please sign in to comment.