Skip to content

Commit

Permalink
Merge pull request #22 from nameko/mandatory-delivery
Browse files Browse the repository at this point in the history
Mandatory delivery
  • Loading branch information
mattbennett committed Jun 6, 2018
2 parents e643c0b + 8d7585a commit 5225192
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 35 deletions.
11 changes: 11 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@ Release Notes

Semantic versioning is followed.

Version 0.6.0
-------------

Released 2018-06-05

* Backoff messages are published with mandatory delivery to ensure
they are delivered. Protects against races between declaration
of the backoff queue and publishing, in case they happen on
different nodes in a RabbitMQ cluster.


Version 0.5.0
-------------

Expand Down
42 changes: 29 additions & 13 deletions nameko_amqp_retry/backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from kombu import Connection
from kombu.common import maybe_declare
from kombu.messaging import Exchange, Queue
from nameko.amqp import get_producer
from nameko.amqp.publish import get_producer, UndeliverableMessage
from nameko.constants import AMQP_URI_CONFIG_KEY, DEFAULT_RETRY_POLICY
from nameko.extensions import SharedExtension

from nameko.utils.retry import retry
from six.moves import queue as PyQueue

EXPIRY_GRACE_PERIOD = 5000 # ms

Expand Down Expand Up @@ -135,17 +136,32 @@ def republish(self, backoff_exc, message, target_queue):

# force redeclaration; the publisher will skip declaration if
# the entity has previously been declared by the same connection
# (see https://github.com/celery/kombu/pull/884)
conn = Connection(amqp_uri)
maybe_declare(queue, conn, retry=True, **DEFAULT_RETRY_POLICY)

producer.publish(
message.body,
headers=headers,
exchange=self.exchange,
routing_key=target_queue,
expiration=expiration_seconds,
retry=True,
retry_policy=DEFAULT_RETRY_POLICY,
declare=[queue.exchange, queue],
**properties
)
@retry(for_exceptions=UndeliverableMessage)
def publish():

producer.publish(
message.body,
headers=headers,
exchange=self.exchange,
routing_key=target_queue,
expiration=expiration_seconds,
mandatory=True,
retry=True,
retry_policy=DEFAULT_RETRY_POLICY,
declare=[queue.exchange, queue],
**properties
)

try:
returned_messages = producer.channel.returned_messages
returned = returned_messages.get_nowait()
except PyQueue.Empty:
pass
else:
raise UndeliverableMessage(returned)

publish()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name='nameko-amqp-retry',
version='0.5.0',
version='0.6.0',
description='Nameko extension allowing AMQP entrypoints to retry later',
author='Student.com',
url='http://github.com/nameko/nameko-amqp-retry',
Expand Down
86 changes: 65 additions & 21 deletions test/test_retry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from contextlib import contextmanager
import itertools
import json
import time
Expand All @@ -8,37 +9,21 @@
from kombu.pools import connections
from kombu.serialization import register, unregister
from mock import ANY, patch
from nameko.amqp.publish import UndeliverableMessage
from nameko.constants import AMQP_URI_CONFIG_KEY
from nameko.extensions import DependencyProvider
from nameko.testing.services import entrypoint_waiter
from nameko.testing.utils import get_extension
from nameko.testing.waiting import wait_for_call
from nameko.utils.retry import retry
from requests.exceptions import HTTPError

from nameko_amqp_retry import Backoff, BackoffPublisher
from nameko_amqp_retry.backoff import (
EXPIRY_GRACE_PERIOD, get_backoff_queue_name)
from nameko_amqp_retry.backoff import get_backoff_queue_name, get_producer
from nameko_amqp_retry.messaging import consume
from nameko_amqp_retry.rpc import rpc


def retry(fn):
""" Barebones retry decorator
"""
def wrapper(*args, **kwargs):
exceptions = AssertionError
max_retries = 3
delay = 1

counter = itertools.count()
while True:
try:
return fn(*args, **kwargs)
except exceptions:
if next(counter) == max_retries:
raise
time.sleep(delay)
return wrapper


class QuickBackoff(Backoff):
schedule = (100,)

Expand Down Expand Up @@ -187,6 +172,65 @@ def all_expired(worker_ctx, res, exc_info):
# was removed (50ms + fast_expire after the first publish)


class TestMandatoryDelivery:

@pytest.fixture
def container(self, container_factory, rabbit_config, queue):

class Service(object):
name = "service"

@consume(queue)
def backoff(self, delay):
raise Backoff()

container = container_factory(Service, rabbit_config)
container.start()
return container

def test_failed_delivery(
self, container, publish_message, exchange, queue, rabbit_config
):
backoff_publisher = get_extension(container, BackoffPublisher)
make_queue = backoff_publisher.make_queue

# patch make_queue so that the return value does not have
# a matching binding; this forces an unroutable messsage
with patch.object(backoff_publisher, 'make_queue') as patched:

patched.return_value = make_queue(999999)

# patch get_producer so we can wait until publish is called
# multiple times, demonstrating the retry
with patch('nameko_amqp_retry.backoff.get_producer') as patched:

# create a replacement producer that we can hook into
# and make our patched get_producer return that
amqp_uri = rabbit_config['AMQP_URI']
with get_producer(amqp_uri) as replacement_producer:

@contextmanager
def producer_context(*a, **k):
yield replacement_producer

patched.side_effect = producer_context

# fire entrypoint and wait for retry of the backoff publish
counter = itertools.count()
with wait_for_call(
replacement_producer, 'publish',
callback=lambda *a, **k: next(counter) == 2
):
publish_message(
exchange, "", routing_key=queue.routing_key
)

# when the retry also fails,
# the container is killed so that the request is requeued
with pytest.raises(UndeliverableMessage):
container.wait()


class TestMultipleMessages(object):

@pytest.fixture
Expand Down

0 comments on commit 5225192

Please sign in to comment.