Skip to content

Commit

Permalink
Support Nameko 2.11.0 and v3 prerelease (#26)
Browse files Browse the repository at this point in the history
* upgrade nameko
fix implementation, simplify failed delivery test

* with refactored impl we can support older namekos

* class path is different on python > 3.4

* make py37 work on travis

* use default value for application headers

* use amqp declare to inspect queues, not rest api
test against more modern rabbitmq

* test against prerelease nameko

* make implementation compatible

* allow 88 character lines

* add prerelease tests to travis matrix
  • Loading branch information
mattbennett committed Nov 23, 2018
1 parent 84fe9aa commit 67f3e42
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 132 deletions.
57 changes: 52 additions & 5 deletions .travis.yml
Expand Up @@ -5,7 +5,7 @@ services:
- docker

before_install:
- docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3.5.4-management
- docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3.7.8-management

install:
- pip install tox
Expand All @@ -18,13 +18,60 @@ matrix:
include:
- stage: lib
python: 2.7
env: TOX_ENV=py27-lib
env: TOX_ENV=py27-oldest-lib
- stage: lib
python: 2.7
env: TOX_ENV=py27-latest-lib
- stage: lib
python: 2.7
env: TOX_ENV=py27-prerelease-lib
- stage: lib
python: 3.4
env: TOX_ENV=py34-lib
- stage: examples
env: TOX_ENV=py34-oldest-lib
- stage: lib
python: 3.4
env: TOX_ENV=py34-latest-lib
- stage: lib
python: 3.4
env: TOX_ENV=py34-examples
env: TOX_ENV=py34-prerelease-lib
- stage: lib
python: 3.5
env: TOX_ENV=py35-oldest-lib
- stage: lib
python: 3.5
env: TOX_ENV=py35-latest-lib
- stage: lib
python: 3.5
env: TOX_ENV=py35-prerelease-lib
- stage: lib
python: 3.6
env: TOX_ENV=py36-oldest-lib
- stage: lib
python: 3.6
env: TOX_ENV=py36-latest-lib
- stage: lib
python: 3.6
env: TOX_ENV=py36-prerelease-lib
- stage: lib
python: 3.7
dist: xenial
sudo: true
env: TOX_ENV=py37-oldest-lib
- stage: lib
python: 3.7
dist: xenial
sudo: true
env: TOX_ENV=py37-latest-lib
- stage: lib
python: 3.7
dist: xenial
sudo: true
env: TOX_ENV=py37-prerelease-lib
- stage: examples
python: 3.7
dist: xenial
sudo: true
env: TOX_ENV=py37-examples
- stage: deploy
script: skip
deploy:
Expand Down
69 changes: 28 additions & 41 deletions nameko_amqp_retry/backoff.py
Expand Up @@ -4,11 +4,9 @@
from kombu import Connection
from kombu.common import maybe_declare
from kombu.messaging import Exchange, Queue
from nameko.amqp.publish import get_producer, UndeliverableMessage
from nameko.amqp.publish import Publisher
from nameko.constants import AMQP_URI_CONFIG_KEY, DEFAULT_RETRY_POLICY
from nameko.extensions import SharedExtension
from nameko.utils.retry import retry
from six.moves import queue as PyQueue

EXPIRY_GRACE_PERIOD = 5000 # ms

Expand Down Expand Up @@ -124,44 +122,33 @@ def republish(self, backoff_exc, message, target_queue):
expiration = backoff_exc.next(message, self.exchange.name)
queue = self.make_queue(expiration)

# republish to appropriate backoff queue
properties = message.properties.copy()
headers = properties.pop('application_headers', {})

headers['backoff'] = expiration
expiration_seconds = float(expiration) / 1000

amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY]
with get_producer(amqp_uri) as producer:

properties = message.properties.copy()
headers = properties.pop('application_headers')

headers['backoff'] = expiration
expiration_seconds = float(expiration) / 1000

# force redeclaration; the publisher will skip declaration if
# the entity has previously been declared by the same connection
# (see https://github.com/celery/kombu/pull/884)
conn = Connection(amqp_uri)
maybe_declare(queue, conn, retry=True, **DEFAULT_RETRY_POLICY)

@retry(for_exceptions=UndeliverableMessage)
def publish():

producer.publish(
message.body,
headers=headers,
exchange=self.exchange,
routing_key=target_queue,
expiration=expiration_seconds,
mandatory=True,
retry=True,
retry_policy=DEFAULT_RETRY_POLICY,
declare=[queue.exchange, queue],
**properties
)

try:
returned_messages = producer.channel.returned_messages
returned = returned_messages.get_nowait()
except PyQueue.Empty:
pass
else:
raise UndeliverableMessage(returned)
# force redeclaration; the publisher will skip declaration if
# the entity has previously been declared by the same connection
# (see https://github.com/celery/kombu/pull/884)
conn = Connection(amqp_uri)
maybe_declare(
queue, conn.channel(), retry=True, **DEFAULT_RETRY_POLICY
)

publish()
# republish to appropriate backoff queue
publisher = Publisher(amqp_uri)
publisher.publish(
message.body,
headers=headers,
exchange=self.exchange,
routing_key=target_queue,
expiration=expiration_seconds,
mandatory=True,
retry=True,
retry_policy=DEFAULT_RETRY_POLICY,
declare=[queue.exchange, queue],
**properties
)
19 changes: 11 additions & 8 deletions nameko_amqp_retry/rpc.py
@@ -1,13 +1,11 @@
import sys

from nameko.messaging import HeaderEncoder, QueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

from nameko_amqp_retry import (
Backoff, BackoffPublisher, expect_backoff_exception)
from nameko_amqp_retry import Backoff, BackoffPublisher, expect_backoff_exception
from nameko_amqp_retry.constants import (
CALL_ID_STACK_HEADER_KEY, RPC_METHOD_ID_HEADER_KEY)
CALL_ID_STACK_HEADER_KEY, RPC_METHOD_ID_HEADER_KEY
)


class RpcConsumer(NamekoRpcConsumer):
Expand All @@ -27,10 +25,9 @@ def handle_message(self, body, message):
self.handle_result(message, None, exc_info)


class Rpc(NamekoRpc, HeaderEncoder):
class Rpc(NamekoRpc):

rpc_consumer = RpcConsumer()
queue_consumer = QueueConsumer()
backoff_publisher = BackoffPublisher()

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -65,7 +62,13 @@ def handle_result(self, message, worker_ctx, result, exc_info):
self.backoff_publisher.republish(
exc, message, target_queue
)
self.queue_consumer.ack_message(message)
try:
# pylint: disable=no-member
self.rpc_consumer.consumer.ack_message(message)
except AttributeError:
# nameko 2.x backwards compatibilty
# pylint: disable=no-member
self.rpc_consumer.queue_consumer.ack_message(message)
return result, exc_info

except Backoff.Expired:
Expand Down
6 changes: 6 additions & 0 deletions setup.cfg
@@ -0,0 +1,6 @@
[flake8]
exclude =
.git,
.tox,
__pycache__,
max-line-length = 88
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -10,7 +10,7 @@
packages=find_packages(exclude=['test', 'test.*']),
install_requires=[
"nameko>=2.7.0",
"kombu>=3.0.25,<4"
"kombu"
],
extras_require={
'dev': [
Expand Down
4 changes: 4 additions & 0 deletions test/__init__.py
@@ -1,3 +1,7 @@
import sys

import pkg_resources

PY3 = sys.version_info >= (3, 0)
PY34 = PY3 and sys.version_info[1] == 4
NAMEKO3 = pkg_resources.get_distribution("nameko").version.split('.')[0] == '3'
12 changes: 11 additions & 1 deletion test/conftest.py
Expand Up @@ -5,10 +5,10 @@
from kombu.messaging import Exchange, Queue
from kombu.pools import connections, producers
from mock import patch
from nameko.amqp.publish import get_connection
from nameko.constants import AMQP_URI_CONFIG_KEY
from nameko.standalone.events import event_dispatcher
from nameko.standalone.rpc import ClusterRpcProxy

from nameko_amqp_retry import Backoff
from nameko_amqp_retry.events import event_handler
from nameko_amqp_retry.messaging import consume
Expand Down Expand Up @@ -210,3 +210,13 @@ def cb(worker_ctx, res, exc_info):
if exc_info and exc_info[0] is Backoff.Expired:
return True
return cb


@pytest.fixture
def queue_info(amqp_uri):
def get_queue_info(queue_name):
with get_connection(amqp_uri) as conn:
queue = Queue(name=queue_name)
queue = queue.bind(conn)
return queue.queue_declare(passive=True)
return get_queue_info
11 changes: 6 additions & 5 deletions test/test_events.py
Expand Up @@ -4,11 +4,9 @@
import six
from mock import ANY
from nameko.testing.services import entrypoint_waiter, get_extension

from nameko_amqp_retry import Backoff
from nameko_amqp_retry.events import event_handler, EventHandler

from test import PY3
from nameko_amqp_retry.events import EventHandler, event_handler
from test import PY3, PY34


class TestEvents(object):
Expand Down Expand Up @@ -157,7 +155,10 @@ def method(self, arg):
stack = "".join(traceback.format_exception(exc_type, exc, tb))
assert "NotYet: try again later" in stack
assert "nameko_amqp_retry.backoff.Backoff" in stack
assert "nameko_amqp_retry.backoff.Expired" in stack
if PY34:
assert "nameko_amqp_retry.backoff.Expired" in stack
else:
assert "nameko_amqp_retry.backoff.Backoff.Expired" in stack

def test_multiple_services(
self, dispatch_event, container_factory, entrypoint_tracker,
Expand Down
28 changes: 14 additions & 14 deletions test/test_messaging.py
Expand Up @@ -7,8 +7,8 @@
from nameko.testing.services import entrypoint_waiter, get_extension
from nameko_amqp_retry import Backoff
from nameko_amqp_retry.backoff import get_backoff_queue_name
from nameko_amqp_retry.messaging import consume, Consumer
from test import PY3
from nameko_amqp_retry.messaging import Consumer, consume
from test import PY3, PY34


class TestMessaging(object):
Expand Down Expand Up @@ -159,12 +159,15 @@ def method(self, arg):
stack = "".join(traceback.format_exception(exc_type, exc, tb))
assert "NotYet: try again later" in stack
assert "nameko_amqp_retry.backoff.Backoff" in stack
assert "nameko_amqp_retry.backoff.Expired" in stack
if PY34:
assert "nameko_amqp_retry.backoff.Expired" in stack
else:
assert "nameko_amqp_retry.backoff.Backoff.Expired" in stack

def test_multiple_queues_with_same_exchange_and_routing_key(
self, container_factory, entrypoint_tracker, rabbit_manager, exchange,
wait_for_result, publish_message, counter, rabbit_config,
backoff_count, fast_backoff
backoff_count, fast_backoff, queue_info
):
""" Message consumption backoff works when there are muliple queues
receiving the published message
Expand Down Expand Up @@ -205,17 +208,14 @@ def method(self, payload):
publish_message(exchange, "msg", routing_key="message")

# ensure all messages are processed
vhost = rabbit_config['vhost']
for delay in fast_backoff:
backoff_queue = rabbit_manager.get_queue(
vhost, get_backoff_queue_name(delay)
)
assert backoff_queue['messages'] == 0

service_queue_one = rabbit_manager.get_queue(vhost, queue_one.name)
service_queue_two = rabbit_manager.get_queue(vhost, queue_two.name)
assert service_queue_one['messages'] == 0
assert service_queue_two['messages'] == 0
backoff_queue = queue_info(get_backoff_queue_name(delay))
assert backoff_queue.message_count == 0

service_queue_one = queue_info(queue_one.name)
service_queue_two = queue_info(queue_two.name)
assert service_queue_one.message_count == 0
assert service_queue_two.message_count == 0

assert result_one.get() == "one"
assert result_two.get() == "two"
Expand Down

0 comments on commit 67f3e42

Please sign in to comment.