Skip to content

Commit

Permalink
Merge pull request #257 from onefinestay/polling_queue_consumer
Browse files Browse the repository at this point in the history
refactor the polling queue consumer
  • Loading branch information
davidszotten committed May 26, 2015
2 parents 994b83f + 3ade58e commit 62aa5bb
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 201 deletions.
8 changes: 8 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ are in form of *headline.major.minor* numbers. Backwards-compatible changes
increment the minor version number only.


Version 2.1.2
-------------

Released 2015-05-26

* Refactor the standalone queue consumer for more extensibility


Version 2.1.1
-------------

Expand Down
49 changes: 0 additions & 49 deletions nameko/kombu_helpers.py

This file was deleted.

134 changes: 69 additions & 65 deletions nameko/standalone/rpc.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from __future__ import absolute_import

import logging
import socket

from amqp.exceptions import ConnectionError
from kombu import Connection
from kombu.common import maybe_declare
from kombu.messaging import Consumer

from nameko.amqp import verify_amqp_uri
from nameko.constants import AMQP_URI_CONFIG_KEY
from nameko.containers import WorkerContext
from nameko.extensions import Entrypoint
from nameko.exceptions import RpcConnectionError, RpcTimeout
from nameko.kombu_helpers import queue_iterator
from nameko.rpc import ServiceProxy, ReplyListener


Expand Down Expand Up @@ -47,7 +48,7 @@ def wait(self):
if self.exception:
raise self.exception

if self.queue_consumer.channel.connection is None:
if self.queue_consumer.consumer.connection is None:
raise RuntimeError(
"This consumer has been stopped, and can no longer be used"
)
Expand All @@ -65,87 +66,90 @@ class PollingQueueConsumer(object):
separate thread it provides a polling method to block until a message with
the same correlation ID of the RPC-proxy call arrives.
"""
consumer = None

def __init__(self, timeout=None):
self.timeout = timeout
self.replies = {}

def _setup_queue(self):
self.channel = self.connection.channel()
def _setup_consumer(self):
if self.consumer is not None:
try:
self.consumer.cancel()
except socket.error: # pragma: no cover
# On some systems (e.g. os x) we need to explicitly cancel the
# consumer here. However, e.g. on ubuntu 14.04, the
# disconnection has already closed the socket. We try to
# cancel, and ignore any socket errors.
pass

channel = self.connection.channel()
# queue.bind returns a bound copy
self.queue = self.queue.bind(self.channel)
maybe_declare(self.queue, self.channel)
self.queue = self.queue.bind(channel)
maybe_declare(self.queue, channel)
consumer = Consumer(channel, queues=[self.queue], no_ack=False)
consumer.callbacks = [self.on_message]
consumer.consume()
self.consumer = consumer

def register_provider(self, provider):
self.provider = provider
amqp_uri = provider.container.config[AMQP_URI_CONFIG_KEY]
verify_amqp_uri(amqp_uri)
self.connection = Connection(amqp_uri)
self.queue = provider.queue
self._setup_queue()
message_iterator = self._poll_messages()
message_iterator.send(None) # start generator
self.get_message = message_iterator.send
self._setup_consumer()

def unregister_provider(self, provider):
self.connection.close()

def ack_message(self, msg):
msg.ack()

def _poll_messages(self):
replies = {}

correlation_id = yield

while True:
try:
for body, msg in queue_iterator(
self.queue, timeout=self.timeout
):
msg_correlation_id = msg.properties.get('correlation_id')

if msg_correlation_id not in self.provider._reply_events:
_logger.debug(
"Unknown correlation id: %s", msg_correlation_id)
continue

replies[msg_correlation_id] = (body, msg)

# Here, and every time we re-enter this coroutine (at the
# `yield` statement below) we check if we already have the
# data for the new correlation_id before polling for new
# messages.
while correlation_id in replies:
body, msg = replies.pop(correlation_id)
self.provider.handle_message(body, msg)
correlation_id = yield

except RpcTimeout as exc:
event = self.provider._reply_events.pop(correlation_id)
event.send_exception(exc)

# timeout is implemented using socket timeout, so when it
# fires the connection is closed, causing the reply queue
# to be deleted
self._setup_queue()
correlation_id = yield

except ConnectionError as exc:
for event in self.provider._reply_events.values():
rpc_connection_error = RpcConnectionError(
'Disconnected while waiting for reply: %s', exc)
event.send_exception(rpc_connection_error)
self.provider._reply_events.clear()
# In case this was a temporary error, attempt to reconnect. If
# we fail, the connection error will bubble.
self._setup_queue()
correlation_id = yield

except KeyboardInterrupt as exc:
event = self.provider._reply_events.pop(correlation_id)
event.send_exception(exc)
# exception may have killed the connection
self._setup_queue()
correlation_id = yield
def on_message(self, body, message):
msg_correlation_id = message.properties.get('correlation_id')
if msg_correlation_id not in self.provider._reply_events:
_logger.debug(
"Unknown correlation id: %s", msg_correlation_id)

self.replies[msg_correlation_id] = (body, message)

def get_message(self, correlation_id):

try:
while correlation_id not in self.replies:
self.consumer.channel.connection.client.drain_events(
timeout=self.timeout
)

body, message = self.replies.pop(correlation_id)
self.provider.handle_message(body, message)

except socket.timeout:
timeout_error = RpcTimeout(self.timeout)
event = self.provider._reply_events.pop(correlation_id)
event.send_exception(timeout_error)

# timeout is implemented using socket timeout, so when it
# fires the connection is closed, causing the reply queue
# to be deleted
self._setup_consumer()

except ConnectionError as exc:
for event in self.provider._reply_events.values():
rpc_connection_error = RpcConnectionError(
'Disconnected while waiting for reply: %s', exc)
event.send_exception(rpc_connection_error)
self.provider._reply_events.clear()
# In case this was a temporary error, attempt to reconnect. If
# we fail, the connection error will bubble.
self._setup_consumer()

except KeyboardInterrupt as exc:
event = self.provider._reply_events.pop(correlation_id)
event.send_exception(exc)
# exception may have killed the connection
self._setup_consumer()


class SingleThreadedReplyListener(ReplyListener):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

setup(
name='nameko',
version='2.1.1',
version='2.1.2',
description='A microservices framework for Python that lets service '
'developers concentrate on application logic and encourages '
'testability.',
Expand Down
41 changes: 39 additions & 2 deletions test/standalone/test_rpc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ def test_recover_from_keyboardinterrupt(
def call():
return proxy.spam(ham=0)

with patch('nameko.standalone.rpc.queue_iterator') as iterator:
iterator.side_effect = KeyboardInterrupt('killing from test')
with patch('kombu.connection.Connection.drain_events') as drain_events:
drain_events.side_effect = KeyboardInterrupt('killing from test')
with pytest.raises(KeyboardInterrupt):
proxy.spam(ham=0)

Expand All @@ -383,3 +383,40 @@ def call():

# proxy should still work
assert proxy.spam(ham=1) == 1


def test_consumer_replacing(container_factory, rabbit_manager, rabbit_config):
container = container_factory(FooService, rabbit_config)
container.start()

class FakeRepliesDict(dict):
# act like the internal replies dict, but keep a list of messages
# passing through for later inspection
def __init__(self):
self.messages = []

def __setitem__(self, key, value):
self.messages.append(value)
super(FakeRepliesDict, self).__setitem__(key, value)

fake_replies = FakeRepliesDict()

with ServiceRpcProxy('foobar', rabbit_config) as proxy:
# extra setup, as after e.g. connection error
proxy.reply_listener.queue_consumer._setup_consumer()

with patch.object(
proxy.reply_listener.queue_consumer,
'replies',
fake_replies
):
count = 10
replies = [proxy.spam.async('hello') for _ in range(count)]
assert [reply.result() for reply in replies] == ['hello'] * count

consumer_tags = set()
# there should only be a single consumer. we check by looking at the
# consumer tag on the received messages
for _, message in fake_replies.messages:
consumer_tags.add(message.delivery_info['consumer_tag'])
assert len(consumer_tags) == 1
83 changes: 0 additions & 83 deletions test/test_kombu_helpers.py

This file was deleted.

0 comments on commit 62aa5bb

Please sign in to comment.