Skip to content

Commit

Permalink
checkpoint: bring in enhancements from nameko#398/#7
Browse files Browse the repository at this point in the history
  • Loading branch information
mattbennett committed Feb 8, 2017
1 parent f668cf8 commit f24395b
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 59 deletions.
Binary file added .cagoule.db
Binary file not shown.
15 changes: 12 additions & 3 deletions nameko/amqp/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,17 @@ class Publisher(object):
See :attr:`self.retry`.
"""

declare = []
"""
"""

def __init__(
self, amqp_uri, use_confirms=None, serializer=None, compression=None,
delivery_mode=None, mandatory=None, priority=None, expiration=None,
retry=None, retry_policy=None, **publish_kwargs
declare=None, retry=None, retry_policy=None, **publish_kwargs
):
self.amqp_uri = amqp_uri

# MYB: accept exchange and/or routing_key here? if not, justify

# publish confirms
self.use_confirms = use_confirms or self.use_confirms

Expand All @@ -116,6 +118,9 @@ def __init__(
self.retry = retry or self.retry
self.retry_policy = retry_policy or self.retry_policy

# declarations
self.declare = declare or self.declare

# other publish arguments
self.publish_kwargs = publish_kwargs

Expand All @@ -139,6 +144,9 @@ def publish(self, msg, **kwargs):
retry = kwargs.pop('retry', self.retry)
retry_policy = kwargs.pop('retry_policy', self.retry_policy)

declare = self.declare[:]
declare.extend(kwargs.pop('declare', ()))

publish_kwargs = self.publish_kwargs.copy()
publish_kwargs.update(kwargs) # publish-time kwargs win

Expand All @@ -152,6 +160,7 @@ def publish(self, msg, **kwargs):
priority=priority,
expiration=expiration,
compression=compression,
declare=declare,
retry=retry,
retry_policy=retry_policy,
serializer=serializer,
Expand Down
7 changes: 3 additions & 4 deletions nameko/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ def emit_spam(self):
"""
def __init__(self, **defaults):
super(EventDispatcher, self).__init__(**defaults)
super(EventDispatcher, self).__init__(exchange=None, **defaults) # TODO: test/raise to say cannot provide exchange as default?

def setup(self):
self.service_name = self.container.service_name
self.config = self.container.config
self.exchange = get_event_exchange(self.service_name)
self.exchange = get_event_exchange(self.container.service_name)
self.declare.append(self.exchange)
super(EventDispatcher, self).setup()

def get_dependency(self, worker_ctx):
Expand Down
61 changes: 33 additions & 28 deletions nameko/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from functools import partial
from itertools import count
from logging import getLogger
import warnings

import eventlet
import six
Expand Down Expand Up @@ -79,19 +80,23 @@ class Publisher(DependencyProvider, HeaderEncoder):
def __init__(self, exchange=None, queue=None, **defaults):
""" Provides an AMQP message publisher method via dependency injection.
In AMQP messages are published to *exchanges* and routed to bound
*queues*. This dependency accepts either an `exchange` or a bound
`queue`, and will ensure both are declared before publishing.
In AMQP, messages are published to *exchanges* and routed to bound
*queues*. This dependency accepts the `exchange` to publish to and
will ensure that it is declared before publishing.
Optionally, you may use the `declare` keyword argument to pass a list
of other :class:`kombu.Exchange` or :class:`kombu.Queue` objects to
declare before publishing.
:Parameters:
exchange : :class:`kombu.Exchange`
Destination exchange
queue : :class:`kombu.Queue`
Bound queue. The event will be published to this queue's
exchange.
**Deprecated**: Bound queue. The event will be published to
this queue's exchange.
If neither `queue` nor `exchange` are provided, the message will be
published to the default exchange.
If `exchange` is not provided, the message will be published to the
default exchange.
Example::
Expand All @@ -103,12 +108,30 @@ def spam(self, data):
self.publish('spam:' + data)
"""
self.exchange = exchange
self.queue = queue
self.defaults = defaults

self.declare = []

if self.exchange:
self.declare.append(self.exchange)

if queue is not None:
warnings.warn(
"The signature of `Publisher` has changed. The `queue` kwarg "
"is now deprecated. You can use the `declare` kwarg "
"to provide a list of Kombu queues to be declared. "
"See CHANGES, version 2.5.2 for more details. This warning "
"will be removed in version 2.7.0.",
DeprecationWarning
)
if exchange is None:
self.exchange = queue.exchange
self.declare.append(queue)

# backwards compat
# TODO: should put serializer here too?
for compat_attr in ('retry', 'retry_policy', 'use_confirms'):
# TODO: warn
if hasattr(self, compat_attr):
self.defaults[compat_attr] = getattr(self, compat_attr)

Expand All @@ -133,41 +156,23 @@ def serializer(self):

def setup(self):

exchange = self.exchange
queue = self.queue

verify_amqp_uri(self.amqp_uri)

self.publisher = self.Publisher(
self.amqp_uri,
serializer=self.serializer,
exchange=self.exchange,
declare=self.declare,
**self.defaults
)

with get_connection(self.amqp_uri) as conn:
if queue is not None:
maybe_declare(queue, conn)
elif exchange is not None:
maybe_declare(exchange, conn)

def get_dependency(self, worker_ctx):
propagate_headers = self.get_message_headers(worker_ctx)

def publish(msg, **kwargs):

# backwards compat: if bound queue was privided but no exchange,
# publish to its exchange
exchange = self.exchange
queue = self.queue
if exchange is None and queue is not None:
exchange = queue.exchange

self.publisher.publish(
msg,
exchange=exchange,
extra_headers=propagate_headers,
**kwargs
msg, extra_headers=propagate_headers, **kwargs
)

return publish
Expand Down
8 changes: 7 additions & 1 deletion test/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ def test_event_dispatcher(mock_container, mock_producer):
'exchange': ANY,
'routing_key': 'eventtype',
'headers': headers,
'declare': event_dispatcher.declare,
'retry': event_dispatcher.Publisher.retry,
'retry_policy': custom_retry_policy,
'compression': event_dispatcher.Publisher.compression,
'mandatory': event_dispatcher.Publisher.mandatory,
'expiration': event_dispatcher.Publisher.expiration,
'delivery_mode': event_dispatcher.Publisher.delivery_mode,
'priority': event_dispatcher.Publisher.priority,
'serializer': event_dispatcher.serializer
'serializer': event_dispatcher.serializer,
}

assert mock_producer.publish.call_count == 1
Expand Down Expand Up @@ -664,6 +665,10 @@ def test_dispatch_to_rabbit(rabbit_manager, rabbit_config, mock_container):
dispatcher.setup()
dispatcher.start()

# dispatch a message to make declarations
service.dispatch = dispatcher.get_dependency(worker_ctx)
service.dispatch("eventtype", "msg")

# we should have an exchange but no queues
exchanges = rabbit_manager.get_exchanges(vhost)
queues = rabbit_manager.get_queues(vhost)
Expand All @@ -675,6 +680,7 @@ def test_dispatch_to_rabbit(rabbit_manager, rabbit_config, mock_container):
rabbit_manager.create_queue_binding(
vhost, "srcservice.events", "event-sink", routing_key="eventtype")

# dispatch another message
service.dispatch = dispatcher.get_dependency(worker_ctx)
service.dispatch("eventtype", "msg")

Expand Down
61 changes: 38 additions & 23 deletions test/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from kombu.compression import get_encoder
from kombu.connection import Connection
from kombu.serialization import registry
from mock import Mock, call, patch
from mock import Mock, call, patch, ANY
from nameko.amqp import UndeliverableMessage, get_connection, get_producer
from nameko.constants import AMQP_URI_CONFIG_KEY, HEARTBEAT_CONFIG_KEY
from nameko.containers import WorkerContext
Expand All @@ -32,12 +32,6 @@
CONSUME_TIMEOUT = 1


@pytest.yield_fixture
def patch_maybe_declare():
with patch('nameko.messaging.maybe_declare', autospec=True) as patched:
yield patched


def test_consume_provider(mock_container):

container = mock_container
Expand Down Expand Up @@ -102,7 +96,7 @@ def test_consume_provider(mock_container):

@pytest.mark.usefixtures("predictable_call_ids")
def test_publish_to_exchange(
patch_maybe_declare, mock_connection, mock_producer, mock_container
mock_connection, mock_producer, mock_container
):
container = mock_container
container.service_name = "srcservice"
Expand All @@ -111,10 +105,7 @@ def test_publish_to_exchange(
worker_ctx = WorkerContext(container, service, DummyProvider("publish"))

publisher = Publisher(exchange=foobar_ex).bind(container, "publish")

# test declarations
publisher.setup()
patch_maybe_declare.assert_called_once_with(foobar_ex, mock_connection)

# test publish
msg = "msg"
Expand All @@ -129,6 +120,7 @@ def test_publish_to_exchange(
'publish_kwarg': "value",
'exchange': foobar_ex,
'headers': headers,
'declare': publisher.declare,
'retry': publisher.Publisher.retry,
'retry_policy': publisher.Publisher.retry_policy,
'compression': publisher.Publisher.compression,
Expand All @@ -146,7 +138,7 @@ def test_publish_to_exchange(

@pytest.mark.usefixtures("predictable_call_ids")
def test_publish_to_queue(
patch_maybe_declare, mock_producer, mock_connection, mock_container
mock_producer, mock_connection, mock_container
):
container = mock_container
container.shared_extensions = {}
Expand All @@ -158,10 +150,7 @@ def test_publish_to_queue(
container, service, DummyProvider("publish"), data=ctx_data)

publisher = Publisher(queue=foobar_queue).bind(container, "publish")

# test declarations
publisher.setup()
patch_maybe_declare.assert_called_once_with(foobar_queue, mock_connection)

# test publish
msg = "msg"
Expand All @@ -177,6 +166,7 @@ def test_publish_to_queue(
'publish_kwarg': "value",
'exchange': foobar_ex,
'headers': headers,
'declare': publisher.declare,
'retry': publisher.Publisher.retry,
'retry_policy': publisher.Publisher.retry_policy,
'compression': publisher.Publisher.compression,
Expand All @@ -194,7 +184,7 @@ def test_publish_to_queue(

@pytest.mark.usefixtures("predictable_call_ids")
def test_publish_custom_headers(
mock_container, patch_maybe_declare, mock_producer, mock_connection
mock_container, mock_producer, mock_connection
):

container = mock_container
Expand All @@ -207,10 +197,7 @@ def test_publish_custom_headers(
)

publisher = Publisher(queue=foobar_queue).bind(container, "publish")

# test declarations
publisher.setup()
patch_maybe_declare.assert_called_once_with(foobar_queue, mock_connection)

# test publish
msg = "msg"
Expand All @@ -225,6 +212,7 @@ def test_publish_custom_headers(
'publish_kwarg': "value",
'exchange': foobar_ex,
'headers': headers,
'declare': publisher.declare,
'retry': publisher.Publisher.retry,
'retry_policy': publisher.Publisher.retry_policy,
'compression': publisher.Publisher.compression,
Expand Down Expand Up @@ -308,12 +296,16 @@ def test_publish_to_rabbit(rabbit_manager, rabbit_config, mock_container):
)

publisher = Publisher(
exchange=foobar_ex, queue=foobar_queue).bind(container, "publish")
exchange=foobar_ex, queue=foobar_queue).bind(container, "publish"
)

# test queue, exchange and binding created in rabbit
publisher.setup()
publisher.start()

service.publish = publisher.get_dependency(worker_ctx)
service.publish("msg")

# test queue, exchange and binding created in rabbit
exchanges = rabbit_manager.get_exchanges(vhost)
queues = rabbit_manager.get_queues(vhost)
bindings = rabbit_manager.get_queue_bindings(vhost, foobar_queue.name)
Expand All @@ -323,8 +315,6 @@ def test_publish_to_rabbit(rabbit_manager, rabbit_config, mock_container):
assert "foobar_ex" in [binding['source'] for binding in bindings]

# test message published to queue
service.publish = publisher.get_dependency(worker_ctx)
service.publish("msg")
messages = rabbit_manager.get_messages(vhost, foobar_queue.name)
assert ['"msg"'] == [msg['payload'] for msg in messages]

Expand Down Expand Up @@ -1063,6 +1053,31 @@ def proxy(self, *args, **kwargs):

return Service

@patch('kombu.messaging.maybe_declare', wraps=maybe_declare)
def test_declare(
self, maybe_declare, container_factory, rabbit_config,
get_message_from_queue, exchange, queue, service_base, routing_key
):
container = container_factory(service_base, rabbit_config)
container.start()

declare = [
Queue(name="q1", exchange=exchange, routing_key=routing_key),
Queue(name="q2", exchange=exchange, routing_key=routing_key)
]

with entrypoint_hook(container, "proxy") as publish:
publish("payload", routing_key=routing_key, declare=declare)

assert maybe_declare.call_args_list == [
call(exchange, ANY, ANY),
call(declare[0], ANY, ANY),
call(declare[1], ANY, ANY)
]

assert get_message_from_queue(declare[0].name).payload == "payload"
assert get_message_from_queue(declare[1].name).payload == "payload"

@pytest.mark.parametrize("option,value,expected", [
('delivery_mode', 1, 1),
('priority', 10, 10),
Expand Down

0 comments on commit f24395b

Please sign in to comment.