Skip to content

Commit

Permalink
bring in ideas from nameko#398 but use kombu to perform the declarations
Browse files Browse the repository at this point in the history
  • Loading branch information
mattbennett committed Feb 2, 2017
1 parent c18c9ce commit 03f3efb
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 48 deletions.
57 changes: 33 additions & 24 deletions nameko/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
import six
from eventlet.event import Event
from kombu import Connection
from kombu.common import maybe_declare
from kombu.mixins import ConsumerMixin
from six.moves import queue as Queue

from nameko.amqp import (
UndeliverableMessage, get_connection, get_producer, verify_amqp_uri)
from nameko.amqp import UndeliverableMessage, get_producer, verify_amqp_uri
from nameko.constants import (
AMQP_URI_CONFIG_KEY, DEFAULT_HEARTBEAT, DEFAULT_RETRY_POLICY,
DEFAULT_SERIALIZER, HEARTBEAT_CONFIG_KEY, SERIALIZER_CONFIG_KEY)
Expand Down Expand Up @@ -78,19 +76,23 @@ class Publisher(DependencyProvider, HeaderEncoder):
def __init__(self, exchange=None, queue=None, **defaults):
""" Provides an AMQP message publisher method via dependency injection.
In AMQP messages are published to *exchanges* and routed to bound
*queues*. This dependency accepts either an `exchange` or a bound
`queue`, and will ensure both are declared before publishing.
In AMQP, messages are published to *exchanges* and routed to bound
*queues*. This dependency accepts the `exchange` to publish to and
will ensure that it is declared before publishing.
Optionally, you may use the `declare` keyword argument to pass a list
of other :class:`kombu.Exchange` or :class:`kombu.Queue` objects to
declare before publishing.
:Parameters:
exchange : :class:`kombu.Exchange`
Destination exchange
queue : :class:`kombu.Queue`
Bound queue. The event will be published to this queue's
exchange.
**Deprecated**: Bound queue. The event will be published to
this queue's exchange.
If neither `queue` nor `exchange` are provided, the message will be
published to the default exchange.
If `exchange` is not provided, the message will be published to the
default exchange.
Example::
Expand All @@ -102,8 +104,22 @@ def spam(self, data):
self.publish('spam:' + data)
"""
self.exchange = exchange
self.queue = queue
self.defaults = defaults
self.declare = self.defaults.pop('declare', [])

if queue is not None:
warnings.warn(
"The signature of `Publisher` has changed. The `queue` kwarg "
"is now deprecated. You can use the `declare` kwarg "
"to provide a list of Kombu queues to be declared. "
"See CHANGES, version 2.5.2 for more details. This warning "
"will be removed in version 2.7.0.",
DeprecationWarning
)
if exchange is None:
self.exchange = queue.exchange

self.declare.append(queue)

@property
def amqp_uri(self):
Expand Down Expand Up @@ -166,26 +182,15 @@ def retry_policy(self):
return DEFAULT_RETRY_POLICY

def setup(self):

exchange = self.exchange
queue = self.queue
if self.exchange is not None:
self.declare.append(self.exchange)

verify_amqp_uri(self.amqp_uri)

with get_connection(self.amqp_uri) as conn:
if queue is not None:
maybe_declare(queue, conn)
elif exchange is not None:
maybe_declare(exchange, conn)

def publish(self, propagating_headers, msg, **kwargs):
"""
"""
exchange = self.exchange
queue = self.queue

if exchange is None and queue is not None:
exchange = queue.exchange

# add any new headers to the existing ones we're propagating
headers = propagating_headers.copy()
Expand All @@ -194,6 +199,9 @@ def publish(self, propagating_headers, msg, **kwargs):
retry = kwargs.pop('retry', self.retry)
retry_policy = kwargs.pop('retry_policy', self.retry_policy)

declare = self.declare[:]
declare.extend(kwargs.pop('declare', ()))

for key in self.delivery_options:
if key not in kwargs:
kwargs[key] = self.delivery_options[key]
Expand All @@ -212,6 +220,7 @@ def publish(self, propagating_headers, msg, **kwargs):
retry=retry,
retry_policy=retry_policy,
mandatory=mandatory,
declare=declare,
**kwargs
)

Expand Down
6 changes: 6 additions & 0 deletions test/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def test_event_dispatcher(mock_container, mock_producer):
'headers': headers,
'retry': event_dispatcher.retry,
'retry_policy': custom_retry_policy,
'declare': event_dispatcher.declare,
'mandatory': False
}
expected_kwargs.update(event_dispatcher.delivery_options)
Expand Down Expand Up @@ -661,6 +662,10 @@ def test_dispatch_to_rabbit(rabbit_manager, rabbit_config, mock_container):
dispatcher.setup()
dispatcher.start()

# dispatch a message to make declarations
service.dispatch = dispatcher.get_dependency(worker_ctx)
service.dispatch("eventtype", "msg")

# we should have an exchange but no queues
exchanges = rabbit_manager.get_exchanges(vhost)
queues = rabbit_manager.get_queues(vhost)
Expand All @@ -672,6 +677,7 @@ def test_dispatch_to_rabbit(rabbit_manager, rabbit_config, mock_container):
rabbit_manager.create_queue_binding(
vhost, "srcservice.events", "event-sink", routing_key="eventtype")

# dispatch another message
service.dispatch = dispatcher.get_dependency(worker_ctx)
service.dispatch("eventtype", "msg")

Expand Down
34 changes: 10 additions & 24 deletions test/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@
CONSUME_TIMEOUT = 1


@pytest.yield_fixture
def patch_maybe_declare():
with patch('nameko.messaging.maybe_declare', autospec=True) as patched:
yield patched


@pytest.yield_fixture
def warnings():
with patch('nameko.messaging.warnings') as patched:
Expand Down Expand Up @@ -106,7 +100,7 @@ def test_consume_provider(mock_container):

@pytest.mark.usefixtures("predictable_call_ids")
def test_publish_to_exchange(
patch_maybe_declare, mock_connection, mock_producer, mock_container
mock_connection, mock_producer, mock_container
):
container = mock_container
container.service_name = "srcservice"
Expand All @@ -116,10 +110,6 @@ def test_publish_to_exchange(

publisher = Publisher(exchange=foobar_ex).bind(container, "publish")

# test declarations
publisher.setup()
patch_maybe_declare.assert_called_once_with(foobar_ex, mock_connection)

# test publish
msg = "msg"
service.publish = publisher.get_dependency(worker_ctx)
Expand All @@ -135,6 +125,7 @@ def test_publish_to_exchange(
'headers': headers,
'retry': publisher.retry,
'retry_policy': publisher.retry_policy,
'declare': publisher.declare,
'mandatory': False
}
expected_kwargs.update(publisher.delivery_options)
Expand All @@ -147,7 +138,7 @@ def test_publish_to_exchange(

@pytest.mark.usefixtures("predictable_call_ids")
def test_publish_to_queue(
patch_maybe_declare, mock_producer, mock_connection, mock_container
mock_producer, mock_connection, mock_container
):
container = mock_container
container.shared_extensions = {}
Expand All @@ -160,10 +151,6 @@ def test_publish_to_queue(

publisher = Publisher(queue=foobar_queue).bind(container, "publish")

# test declarations
publisher.setup()
patch_maybe_declare.assert_called_once_with(foobar_queue, mock_connection)

# test publish
msg = "msg"
headers = {
Expand All @@ -180,6 +167,7 @@ def test_publish_to_queue(
'headers': headers,
'retry': publisher.retry,
'retry_policy': publisher.retry_policy,
'declare': publisher.declare,
'mandatory': False
}
expected_kwargs.update(publisher.delivery_options)
Expand All @@ -192,7 +180,7 @@ def test_publish_to_queue(

@pytest.mark.usefixtures("predictable_call_ids")
def test_publish_custom_headers(
mock_container, patch_maybe_declare, mock_producer, mock_connection
mock_container, mock_producer, mock_connection
):

container = mock_container
Expand All @@ -206,10 +194,6 @@ def test_publish_custom_headers(

publisher = Publisher(queue=foobar_queue).bind(container, "publish")

# test declarations
publisher.setup()
patch_maybe_declare.assert_called_once_with(foobar_queue, mock_connection)

# test publish
msg = "msg"
headers = {'nameko.language': 'en',
Expand All @@ -225,6 +209,7 @@ def test_publish_custom_headers(
'headers': headers,
'retry': publisher.retry,
'retry_policy': publisher.retry_policy,
'declare': publisher.declare,
'mandatory': False
}
expected_kwargs.update(publisher.delivery_options)
Expand Down Expand Up @@ -305,10 +290,13 @@ def test_publish_to_rabbit(rabbit_manager, rabbit_config, mock_container):
publisher = Publisher(
exchange=foobar_ex, queue=foobar_queue).bind(container, "publish")

# test queue, exchange and binding created in rabbit
publisher.setup()
publisher.start()

service.publish = publisher.get_dependency(worker_ctx)
service.publish("msg")

# test queue, exchange and binding created in rabbit
exchanges = rabbit_manager.get_exchanges(vhost)
queues = rabbit_manager.get_queues(vhost)
bindings = rabbit_manager.get_queue_bindings(vhost, foobar_queue.name)
Expand All @@ -318,8 +306,6 @@ def test_publish_to_rabbit(rabbit_manager, rabbit_config, mock_container):
assert "foobar_ex" in [binding['source'] for binding in bindings]

# test message published to queue
service.publish = publisher.get_dependency(worker_ctx)
service.publish("msg")
messages = rabbit_manager.get_messages(vhost, foobar_queue.name)
assert ['"msg"'] == [msg['payload'] for msg in messages]

Expand Down

0 comments on commit 03f3efb

Please sign in to comment.