Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Publisher queue parameter with queues_to_declare #398

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 36 additions & 19 deletions nameko/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from kombu import Connection
from kombu.common import maybe_declare
from kombu.mixins import ConsumerMixin
from six.moves import queue

from nameko.amqp import (
UndeliverableMessage, get_connection, get_producer, verify_amqp_uri)
Expand Down Expand Up @@ -75,22 +74,28 @@ def unpack_message_headers(self, worker_ctx_cls, message):

class Publisher(DependencyProvider, HeaderEncoder):

def __init__(self, exchange=None, queue=None):
def __init__(self, exchange=None, queue=None, queues_to_declare=None):
""" Provides an AMQP message publisher method via dependency injection.

In AMQP messages are published to *exchanges* and routed to bound
*queues*. This dependency accepts either an `exchange` or a bound
`queue`, and will ensure both are declared before publishing.
In AMQP, messages are published to *exchanges* and routed to
bound *queues*. This dependency accepts both an `exchange`
and `queues_to_declare`, a list of Kombu queues, and will ensure
that all of them are declared before publishing.

It also accepts a `queue` and will ensure it's declared before
publishing. (**Deprecated**)

:Parameters:
exchange : :class:`kombu.Exchange`
Destination exchange
queue : :class:`kombu.Queue`
Bound queue. The event will be published to this queue's
exchange.
(**Deprecated**) Bound queue. The event will be published to
this queue's exchange.
queues_to_declare : list
A list of :class:`kombu.Queue` to be declared.

If neither `queue` nor `exchange` are provided, the message will be
published to the default exchange.
If neither `queue` nor `exchange` are provided, the message will
be published to the default exchange.

Example::

Expand All @@ -102,7 +107,22 @@ def spam(self, data):
self.publish('spam:' + data)
"""
self.exchange = exchange
self.queue = queue
self.queues_to_declare = list(queues_to_declare or [])

if queue is not None:
warnings.warn(
"The signature of `Publisher` has changed. The `queue` kwarg "
"is now deprecated. You can use the `queues_to_declare` kwarg "
"to provide a list of Kombu queues to be declared. "
"See CHANGES, version 2.5.2 for more details. This warning "
"will be removed in version 2.7.0.",
DeprecationWarning
)

self.queues_to_declare.append(queue)

if exchange is None:
self.exchange = queue.exchange

@property
def amqp_uri(self):
Expand Down Expand Up @@ -149,26 +169,23 @@ def retry_policy(self):
return DEFAULT_RETRY_POLICY

def setup(self):

exchange = self.exchange
queue = self.queue
queues_to_declare = self.queues_to_declare

verify_amqp_uri(self.amqp_uri)

with get_connection(self.amqp_uri) as conn:
if queue is not None:
maybe_declare(queue, conn)
elif exchange is not None:
for queue in queues_to_declare:
if queue is not None:
maybe_declare(queue, conn)
if exchange is not None:
maybe_declare(exchange, conn)

def get_dependency(self, worker_ctx):
def publish(msg, **kwargs):
exchange = self.exchange
serializer = self.serializer

if exchange is None and self.queue is not None:
exchange = self.queue.exchange

retry = kwargs.pop('retry', self.retry)
retry_policy = kwargs.pop('retry_policy', self.retry_policy)
mandatory = kwargs.pop('mandatory', False)
Expand All @@ -192,7 +209,7 @@ def publish(msg, **kwargs):
try:
returned_messages = producer.channel.returned_messages
returned = returned_messages.get_nowait()
except queue.Empty:
except six.moves.queue.Empty:
pass
else:
raise UndeliverableMessage(returned)
Expand Down
145 changes: 128 additions & 17 deletions test/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
foobar_ex = Exchange('foobar_ex', durable=False)
foobar_queue = Queue('foobar_queue', exchange=foobar_ex, durable=False)

baz_ex = Exchange('baz_ex', durable=False)
baz_queue = Queue('baz_queue', exchange=baz_ex, durable=False)

qux_ex = Exchange('qux_ex', durable=False)
qux_queue = Queue('qux_queue', exchange=qux_ex, durable=False)

CONSUME_TIMEOUT = 1


Expand Down Expand Up @@ -119,19 +125,61 @@ def test_publish_to_exchange(

# test publish
msg = "msg"
headers = {'nameko.call_id_stack': ['srcservice.publish.0']}
service.publish = publisher.get_dependency(worker_ctx)
service.publish(msg, publish_kwarg="value")

mock_producer.publish.assert_called_once_with(
msg, headers=headers, exchange=foobar_ex, retry=True,
serializer=container.serializer, mandatory=False,
retry_policy=DEFAULT_RETRY_POLICY, publish_kwarg="value"
)


@pytest.mark.usefixtures("predictable_call_ids")
def test_publish_to_queue_exchange(
maybe_declare, mock_producer, mock_connection, mock_container
):
container = mock_container
container.shared_extensions = {}
container.service_name = "srcservice"

ctx_data = {'language': 'en'}
service = Mock()
worker_ctx = WorkerContext(
container, service, DummyProvider("publish"), data=ctx_data)

publisher = Publisher(
queue=qux_queue, queues_to_declare=[foobar_queue, baz_queue]
).bind(container, "publish")

# test declarations
publisher.setup()
assert maybe_declare.call_args_list == [
call(foobar_queue, mock_connection),
call(baz_queue, mock_connection),
call(qux_queue, mock_connection),
call(qux_ex, mock_connection),
]

# test publish
msg = "msg"
headers = {
'nameko.call_id_stack': ['srcservice.publish.0']
'nameko.language': 'en',
'nameko.call_id_stack': ['srcservice.publish.0'],
}
service.publish = publisher.get_dependency(worker_ctx)
service.publish(msg, publish_kwarg="value")

mock_producer.publish.assert_called_once_with(
msg, headers=headers, exchange=foobar_ex, retry=True,
msg, headers=headers, exchange=qux_ex, retry=True,
serializer=container.serializer, mandatory=False,
retry_policy=DEFAULT_RETRY_POLICY, publish_kwarg="value")
retry_policy=DEFAULT_RETRY_POLICY, publish_kwarg="value"
)


@pytest.mark.usefixtures("predictable_call_ids")
def test_publish_to_queue(
def test_publish_to_default_exchange(
maybe_declare, mock_producer, mock_connection, mock_container
):
container = mock_container
Expand All @@ -143,11 +191,16 @@ def test_publish_to_queue(
worker_ctx = WorkerContext(
container, service, DummyProvider("publish"), data=ctx_data)

publisher = Publisher(queue=foobar_queue).bind(container, "publish")
publisher = Publisher(
queues_to_declare=[foobar_queue, baz_queue]
).bind(container, "publish")

# test declarations
publisher.setup()
maybe_declare.assert_called_once_with(foobar_queue, mock_connection)
assert maybe_declare.call_args_list == [
call(foobar_queue, mock_connection),
call(baz_queue, mock_connection),
]

# test publish
msg = "msg"
Expand All @@ -157,10 +210,53 @@ def test_publish_to_queue(
}
service.publish = publisher.get_dependency(worker_ctx)
service.publish(msg, publish_kwarg="value")

mock_producer.publish.assert_called_once_with(
msg, headers=headers, exchange=None, retry=True,
serializer=container.serializer, mandatory=False,
retry_policy=DEFAULT_RETRY_POLICY, publish_kwarg="value"
)


@pytest.mark.usefixtures("predictable_call_ids")
def test_publish_to_provided_exchange_takes_precedence(
maybe_declare, mock_producer, mock_connection, mock_container
):
container = mock_container
container.shared_extensions = {}
container.service_name = "srcservice"

ctx_data = {'language': 'en'}
service = Mock()
worker_ctx = WorkerContext(
container, service, DummyProvider("publish"), data=ctx_data)

publisher = Publisher(
exchange=foobar_ex, queue=qux_queue, queues_to_declare=[baz_queue]
).bind(container, "publish")

# test declarations
publisher.setup()
assert maybe_declare.call_args_list == [
call(baz_queue, mock_connection),
call(qux_queue, mock_connection),
call(foobar_ex, mock_connection),
]

# test publish
msg = "msg"
headers = {
'nameko.language': 'en',
'nameko.call_id_stack': ['srcservice.publish.0'],
}
service.publish = publisher.get_dependency(worker_ctx)
service.publish(msg, publish_kwarg="value")

mock_producer.publish.assert_called_once_with(
msg, headers=headers, exchange=foobar_ex, retry=True,
serializer=container.serializer, mandatory=False,
retry_policy=DEFAULT_RETRY_POLICY, publish_kwarg="value")
retry_policy=DEFAULT_RETRY_POLICY, publish_kwarg="value"
)


@pytest.mark.usefixtures("predictable_call_ids")
Expand All @@ -177,7 +273,9 @@ def test_publish_custom_headers(
container, service, DummyProvider('method'), data=ctx_data
)

publisher = Publisher(queue=foobar_queue).bind(container, "publish")
publisher = Publisher(
queues_to_declare=[foobar_queue]
).bind(container, "publish")

# test declarations
publisher.setup()
Expand All @@ -190,10 +288,12 @@ def test_publish_custom_headers(
'nameko.call_id_stack': ['srcservice.method.0']}
service.publish = publisher.get_dependency(worker_ctx)
service.publish(msg, publish_kwarg="value")

mock_producer.publish.assert_called_once_with(
msg, headers=headers, exchange=foobar_ex, retry=True,
msg, headers=headers, exchange=None, retry=True,
serializer=container.serializer, mandatory=False,
retry_policy=DEFAULT_RETRY_POLICY, publish_kwarg="value")
retry_policy=DEFAULT_RETRY_POLICY, publish_kwarg="value"
)


def test_header_encoder(empty_config):
Expand Down Expand Up @@ -264,19 +364,29 @@ def test_publish_to_rabbit(rabbit_manager, rabbit_config, mock_container):
)

publisher = Publisher(
exchange=foobar_ex, queue=foobar_queue).bind(container, "publish")
exchange=foobar_ex, queues_to_declare=[foobar_queue, baz_queue]
).bind(container, "publish")

# test queue, exchange and binding created in rabbit
# test queues, exchange and binding created in rabbit
publisher.setup()
publisher.start()

exchanges = rabbit_manager.get_exchanges(vhost)
queues = rabbit_manager.get_queues(vhost)
bindings = rabbit_manager.get_queue_bindings(vhost, foobar_queue.name)
foobar_bindings = rabbit_manager.get_queue_bindings(
vhost, foobar_queue.name
)
baz_bindings = rabbit_manager.get_queue_bindings(vhost, baz_queue.name)

assert "foobar_ex" in [exchange['name'] for exchange in exchanges]
assert "foobar_queue" in [queue['name'] for queue in queues]
assert "foobar_ex" in [binding['source'] for binding in bindings]
exchange_names = [exchange['name'] for exchange in exchanges]
queue_names = [queue['name'] for queue in queues]

assert "foobar_ex" in exchange_names
assert "baz_ex" in exchange_names
assert "foobar_queue" in queue_names
assert "baz_queue" in queue_names
assert "foobar_ex" in [binding['source'] for binding in foobar_bindings]
assert "baz_ex" in [binding['source'] for binding in baz_bindings]

# test message published to queue
service.publish = publisher.get_dependency(worker_ctx)
Expand Down Expand Up @@ -309,7 +419,8 @@ def test_unserialisable_headers(rabbit_manager, rabbit_config, mock_container):
)

publisher = Publisher(
exchange=foobar_ex, queue=foobar_queue).bind(container, "publish")
exchange=foobar_ex, queues_to_declare=[foobar_queue]
).bind(container, "publish")

publisher.setup()
publisher.start()
Expand Down