Skip to content

Commit

Permalink
Merge pull request #15 from juntalis/master
Browse files Browse the repository at this point in the history
Exchange 'internal' option & exchange-to-exchange bindings
  • Loading branch information
mosquito committed May 1, 2017
2 parents 8900eb9 + ac4ff40 commit 77b3365
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 6 deletions.
9 changes: 5 additions & 4 deletions aio_pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, connection,
ExchangeType.DIRECT,
durable=None,
auto_delete=None,
internal=None,
arguments=None,
loop=self.loop,
future_store=self._futures.get_child(),
Expand Down Expand Up @@ -100,7 +101,7 @@ def _on_delivery_confirmation(self, method_frame):
@asyncio.coroutine
def declare_exchange(self, name: str, type: ExchangeType = ExchangeType.DIRECT,
durable: bool = None, auto_delete: bool = False,
arguments: dict = None, timeout: int = None) -> Exchange:
internal: bool = False, arguments: dict = None, timeout: int = None) -> Exchange:

if auto_delete and durable is None:
durable = False
Expand All @@ -110,15 +111,15 @@ def declare_exchange(self, name: str, type: ExchangeType = ExchangeType.DIRECT,
self.__channel.exchange_declare(
f.set_result,
name, ExchangeType(type).value, durable=durable,
auto_delete=auto_delete, arguments=arguments
auto_delete=auto_delete, internal=internal, arguments=arguments
)

yield from f

exchange = Exchange(
self.__channel, self._publish, name, type,
durable=durable, auto_delete=auto_delete, arguments=arguments,
loop=self.loop, future_store=self._futures.get_child(),
durable=durable, auto_delete=auto_delete, internal=internal,
arguments=arguments, loop=self.loop, future_store=self._futures.get_child(),
)

log.debug("Exchange declared %r", exchange)
Expand Down
73 changes: 71 additions & 2 deletions aio_pika/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ class ExchangeType(Enum):
class Exchange(BaseChannel):
""" Exchange abstraction """

__slots__ = 'name', '__type', '__publish_method', 'arguments', 'durable', 'auto_delete', '_channel'
__slots__ = 'name', '__type', '__publish_method', 'arguments', 'durable', 'auto_delete', 'internal', '_channel'

def __init__(self, channel: Channel, publish_method, name: str,
type: ExchangeType=ExchangeType.DIRECT, *, auto_delete: bool,
durable: bool, arguments: dict, loop: asyncio.AbstractEventLoop, future_store: FutureStore):
durable: bool, internal: bool, arguments: dict, loop: asyncio.AbstractEventLoop,
future_store: FutureStore):

super().__init__(loop, future_store)

Expand All @@ -34,6 +35,7 @@ def __init__(self, channel: Channel, publish_method, name: str,
self.name = name
self.auto_delete = auto_delete
self.durable = durable
self.internal = internal
self.arguments = arguments

def __str__(self):
Expand All @@ -44,6 +46,70 @@ def __repr__(self):
self, self.auto_delete, self.durable, self.arguments
)

@BaseChannel._ensure_channel_is_open
def bind(self, exchange,
routing_key: str='', *, arguments=None, timeout: int = None) -> asyncio.Future:

""" A binding can also be a relationship between two exchanges. This can be
simply read as: this exchange is interested in messages from another exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion
with a basic_publish parameter we're going to call it a binding key.
:param exchange: :class:`aio_pika.exchange.Exchange` instance
:param routing_key: routing key
:param arguments: additional arguments (will be passed to `pika`)
:param timeout: execution timeout
:return: :class:`None`
"""

log.debug(
"Binding exchange %r to exchange %r, routing_key=%r, arguments=%r",
self, exchange, routing_key, arguments
)

f = self._create_future(timeout)

self._channel.exchange_bind(
f.set_result,
self.name,
exchange.name,
routing_key=routing_key,
arguments=arguments
)

return f

@BaseChannel._ensure_channel_is_open
def unbind(self, exchange, routing_key: str = '',
arguments: dict = None, timeout: int = None) -> asyncio.Future:

""" Remove exchange-to-exchange binding for this :class:`Exchange` instance
:param exchange: :class:`aio_pika.exchange.Exchange` instance
:param routing_key: routing key
:param arguments: additional arguments (will be passed to `pika`)
:param timeout: execution timeout
:return: :class:`None`
"""

log.debug(
"Unbinding exchange %r from exchange %r, routing_key=%r, arguments=%r",
self, exchange, routing_key, arguments
)

f = self._create_future(timeout)

self._channel.exchange_unbind(
f.set_result,
self.name,
exchange.name,
routing_key=routing_key,
arguments=arguments
)

return f

@BaseChannel._ensure_channel_is_open
@asyncio.coroutine
def publish(self, message: Message, routing_key, *, mandatory=True, immediate=False):
Expand All @@ -55,6 +121,9 @@ def publish(self, message: Message, routing_key, *, mandatory=True, immediate=Fa
"""

log.debug("Publishing message via exchange %s: %r", self, message)
if self.internal:
# Caught on the client side to prevent channel closure
raise ValueError("cannot publish to internal exchange: '%s'!" % self.name)

return (
yield from self.__publish_method(
Expand Down
65 changes: 65 additions & 0 deletions tests/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,35 @@ def test_temporary_queue(self):

yield from wait((client.close(), client.closing), loop=self.loop)

@pytest.mark.asyncio
def test_internal_exchange(self):
client = yield from connect(AMQP_URL, loop=self.loop)

routing_key = self.get_random_name()
exchange_name = self.get_random_name("internal", "exchange")

channel = yield from client.channel()
exchange = yield from channel.declare_exchange(exchange_name, auto_delete=True, internal=True)
queue = yield from channel.declare_queue(auto_delete=True)

yield from queue.bind(exchange, routing_key)

body = bytes(shortuuid.uuid(), 'utf-8')

with self.assertRaises(ValueError):
f = exchange.publish(
Message(
body, content_type='text/plain',
headers={'foo': 'bar'}
),
routing_key
)
yield from f

yield from queue.unbind(exchange, routing_key)
yield from queue.delete()
yield from wait((client.close(), client.closing), loop=self.loop)

@pytest.mark.asyncio
def test_simple_publish_and_receive(self):
client = yield from connect(AMQP_URL, loop=self.loop)
Expand Down Expand Up @@ -159,6 +188,42 @@ def test_simple_publish_and_receive_delivery_mode_explicitly_none(self):
yield from queue.delete()
yield from wait((client.close(), client.closing), loop=self.loop)

@pytest.mark.asyncio
def test_simple_publish_and_receive_to_bound_exchange(self):
client = yield from connect(AMQP_URL, loop=self.loop)

routing_key = self.get_random_name()
src_name = self.get_random_name("source", "exchange")
dest_name = self.get_random_name("destination", "exchange")

channel = yield from client.channel()
src_exchange = yield from channel.declare_exchange(src_name, auto_delete=True)
dest_exchange = yield from channel.declare_exchange(dest_name, auto_delete=True)
queue = yield from channel.declare_queue(auto_delete=True)

yield from queue.bind(dest_exchange, routing_key)
yield from dest_exchange.bind(src_exchange, routing_key)

body = bytes(shortuuid.uuid(), 'utf-8')

yield from src_exchange.publish(
Message(
body, content_type='text/plain',
headers={'foo': 'bar'}
),
routing_key
)

incoming_message = yield from queue.get(timeout=5)
incoming_message.ack()

self.assertEqual(incoming_message.body, body)

yield from dest_exchange.unbind(src_exchange, routing_key)
yield from queue.unbind(dest_exchange, routing_key)
yield from queue.delete()
yield from wait((client.close(), client.closing), loop=self.loop)

@pytest.mark.asyncio
def test_incoming_message_info(self):
client = yield from connect(AMQP_URL, loop=self.loop)
Expand Down

0 comments on commit 77b3365

Please sign in to comment.