Skip to content

Commit

Permalink
Message ack regression (#512)
Browse files Browse the repository at this point in the history
* regression test

* Add missing disconnection tests for RpcConsumer

* regression test for rpc entrypoint too

* fix: only attempt to ack/requeue messages if their connection is alive

* remove debug experiment

* remove unused imports

* fix import

* don't use stdout=PIPE because it will deadlock with a large enough output;
delete the proxy again when we're done

* pragmatic solve: just wait for some grace period before removing the proxy

* pragmas in case sleep isn't required

* add coverage for requeue case

* make regression tests slightly more granular for clarity

* occasionally the ack/requeue is attempted before the connection is considered
dead

* protect against race-condition of connection closing after we check it

* don't bother deleting the proxy again

* remove unused import
  • Loading branch information
mattbennett committed Feb 17, 2018
1 parent b33a361 commit ccade35
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 11 deletions.
21 changes: 15 additions & 6 deletions nameko/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from logging import getLogger

import six
from amqp.exceptions import RecoverableConnectionError
from amqp.exceptions import ConnectionError
from eventlet.event import Event
from kombu import Connection
from kombu.common import maybe_declare
Expand All @@ -25,7 +25,6 @@
from nameko.extensions import (
DependencyProvider, Entrypoint, ProviderCollector, SharedExtension
)
from nameko.utils.retry import retry


_log = getLogger(__name__)
Expand Down Expand Up @@ -310,13 +309,23 @@ def unregister_provider(self, provider):

super(QueueConsumer, self).unregister_provider(provider)

@retry(for_exceptions=RecoverableConnectionError, max_attempts=3)
def ack_message(self, message):
message.ack()
# only attempt to ack if the message connection is alive;
# otherwise the message will already have been reclaimed by the broker
if message.channel.connection:
try:
message.ack()
except ConnectionError: # pragma: no cover
pass # ignore connection closing inside conditional

@retry(for_exceptions=RecoverableConnectionError, max_attempts=3)
def requeue_message(self, message):
message.requeue()
# only attempt to requeue if the message connection is alive;
# otherwise the message will already have been reclaimed by the broker
if message.channel.connection:
try:
message.requeue()
except ConnectionError: # pragma: no cover
pass # ignore connection closing inside conditional

def _cancel_consumers_if_requested(self):
provider_remove_events = self._pending_remove_providers.items()
Expand Down
3 changes: 1 addition & 2 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def toxiproxy_server():
host = TOXIPROXY_HOST
port = TOXIPROXY_PORT
server = subprocess.Popen(
['toxiproxy-server', '-port', str(port), '-host', host],
stdout=subprocess.PIPE
['toxiproxy-server', '-port', str(port), '-host', host]
)

class NotReady(Exception):
Expand Down
97 changes: 96 additions & 1 deletion test/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import eventlet
import pytest
from eventlet.semaphore import Semaphore
from kombu import Exchange, Queue
from kombu.connection import Connection
from mock import Mock, call, patch
Expand Down Expand Up @@ -489,16 +490,28 @@ def publish(msg):
)
return publish

@pytest.fixture
def lock(self):
return Semaphore()

@pytest.fixture
def tracker(self):
return Mock()

@pytest.fixture(autouse=True)
def container(
self, container_factory, rabbit_config, toxic_queue_consumer, queue
self, container_factory, rabbit_config, toxic_queue_consumer, queue,
lock, tracker
):

class Service(object):
name = "service"

@consume(queue)
def echo(self, arg):
lock.acquire()
lock.release()
tracker(arg)
return arg

# very fast heartbeat
Expand Down Expand Up @@ -657,6 +670,88 @@ def reset(args, kwargs, result, exc_info):
publish(msg)
assert result.get() == msg

def test_message_ack_regression(
self, container, publish, toxiproxy, lock, tracker
):
""" Regression for https://github.com/nameko/nameko/issues/511
"""
# prevent workers from completing
lock.acquire()

# fire entrypoint and block the worker;
# break connection while the worker is active, then release worker
with entrypoint_waiter(container, 'echo') as result:
publish('msg1')
while not lock._waiters:
eventlet.sleep() # pragma: no cover
toxiproxy.disable()
# allow connection to close before releasing worker
eventlet.sleep(.1)
lock.release()

# entrypoint will return and attempt to ack initiating message
assert result.get() == "msg1"

# enabling connection will re-deliver the initiating message
# and it will be processed again
with entrypoint_waiter(container, 'echo') as result:
toxiproxy.enable()
assert result.get() == "msg1"

# connection re-established, container should work again
with entrypoint_waiter(container, 'echo', timeout=1) as result:
publish('msg2')
assert result.get() == 'msg2'

def test_message_requeue_regression(
self, container, publish, toxiproxy, lock, tracker
):
""" Regression for https://github.com/nameko/nameko/issues/511
"""
# turn on requeue_on_error
consumer = get_extension(container, Consumer)
consumer.requeue_on_error = True

# make entrypoint raise the first time it's called so that
# we attempt to requeue it
class Boom(Exception):
pass

def error_once():
yield Boom("error")
while True:
yield
tracker.side_effect = error_once()

# prevent workers from completing
lock.acquire()

# fire entrypoint and block the worker;
# break connection while the worker is active, then release worker
with entrypoint_waiter(container, 'echo') as result:
publish('msg1')
while not lock._waiters:
eventlet.sleep() # pragma: no cover
toxiproxy.disable()
# allow connection to close before releasing worker
eventlet.sleep(.1)
lock.release()

# entrypoint will return and attempt to requeue initiating message
with pytest.raises(Boom):
result.get()

# enabling connection will re-deliver the initiating message
# and it will be processed again
with entrypoint_waiter(container, 'echo', timeout=1) as result:
toxiproxy.enable()
assert result.get() == 'msg1'

# connection re-established, container should work again
with entrypoint_waiter(container, 'echo', timeout=1) as result:
publish('msg2')
assert result.get() == 'msg2'


@skip_if_no_toxiproxy
class TestPublisherDisconnections(object):
Expand Down

0 comments on commit ccade35

Please sign in to comment.