Skip to content

Commit

Permalink
Merge pull request #52 from mokshaproject/explicit-nacks
Browse files Browse the repository at this point in the history
Allow configuration of explicit nacks.
  • Loading branch information
ralphbean committed Mar 9, 2018
2 parents aa66ac6 + 317838f commit 1c2c923
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 2 deletions.
18 changes: 18 additions & 0 deletions docs/main/Consumers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,21 @@ you're using a broker with a *durable queue*), then set
to block on each message received off the bus, ensuring that the consumer
handles it before signalling the broker that moksha is ready for another
message.

Other options that influence interaction with durable queues include
``stomp_ack_mode`` and ``stomp_send_explicit_nacks``.

.. code-block::
stomp_ack_mode = auto
stomp_send_explicit_nacks = True
When using a STOMP backend, the ``stomp_ack_mode`` variable (which defaults to
'auto') is passed through to the protocol subscription. It may take any of the
valid values in the `STOMP specification
<https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_ack_Header>`_.
Closely related, the ``stomp_send_explicit_nacks`` setting (which defaults to
True) can be used to turn off explicit nacks to the broker. When set to
``True``, explicit nacks are sent when exceptions are raised from any running
Consumer. When set to ``False``, raised exceptions will result in neither an
ACK nor a NACK being sent.
2 changes: 1 addition & 1 deletion moksha.hub/moksha/hub/api/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def _do_work(self, message):
# Record how long it took to process this message (for stats)
self._times.append(time.time() - start)

self.debug("Going back to waiting on the incoming queue.")
self.debug("Going back to waiting on the incoming queue. Message handled: %r" % handled)
return handled

def validate(self, message):
Expand Down
14 changes: 13 additions & 1 deletion moksha.hub/moksha/hub/stomp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging

from moksha.common.lib.converters import asbool

try:
# stomper is not ready for py3
Expand Down Expand Up @@ -127,6 +128,14 @@ def dataReceived(self, data):
# Otherwise, see if we need to turn a naive 'ack' from stomper into
# a 'nack' if our consumers failed to do their jobs.
if handled is False and response.startswith("ACK\n"):

send_nacks = asbool(self.client.hub.config.get('stomp_send_explicit_nacks', True))
if not send_nacks:
log.warn("Message handling failed. stomp_send_explicit_nacks=%r. "
"Sending no reply to the broker.", send_nacks)
# Return, so as not to send an erroneous ack.
return

if stomper.STOMP_VERSION != '1.1':
log.error("Unable to NACK stomp %r" % stomper.STOMP_VERSION)
# Also, not sending an erroneous ack.
Expand All @@ -138,5 +147,8 @@ def dataReceived(self, data):
response = stomper.nack(message_id, subscription, transaction_id)

# Finally, send our response (ACK or NACK) back to the broker.
log.debug(response)
if not handled:
log.warn("handled=%r. Responding with %s" % (handled, response))
else:
log.debug("handled=%r. Responding with %s" % (handled, response))
self.transport.write(response)

0 comments on commit 1c2c923

Please sign in to comment.