New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove queue consumer #542

Closed
wants to merge 104 commits into
base: master
from

Conversation

Projects
None yet
3 participants
@mattbennett
Copy link
Contributor

mattbennett commented May 4, 2018

On top of #536 and #537, this PR removes the QueueConsumer shared extension.

The changes affect the EventHandler and Consumer entrypoints and the RPC ReplyListener and RpcConsumer extensions. These now maintain their own AMQP consumers rather than delegating to the QueueConsumer. This removes a lot of complexity and allows more granular configuration. The refactor also allows us to detect lost RPC replies due to queue expiry and that functionality has been added.

Additional changes included here:

  1. Explicitly expose the prefetch_count via config, rather than using the max_workers value.

  2. Fixes a race condition introduced in 6f6182a where we could raise a "queue expired" error when the proxy was first starting up.

  3. Make the tests in tests/test_events.py more reliable and easier to understand.

Side-effects of this change are more connections to the RabbitMQ broker. In an ideal world, each entrypoint would maintain its own consumer but the channels and connections would be shared, but that requires more work and probably migration away from kombu. As implemented, we end up with a consumer, channel and connection for every queue from which we consume.

mattbennett added some commits Jan 22, 2018

add support for raising if reply queue expires with pending replies
now that the implementation supports it (standalone rpc proxy case)
@davidszotten
Copy link
Member

davidszotten left a comment

this all looks reasonable to me. a few small comments. would be great to get some production testing before releasing

Makefile Outdated
@@ -3,7 +3,7 @@
ENABLE_BRANCH_COVERAGE ?= 0
AUTO_FIX_IMPORTS ?= 0

ifneq ($(AUTO_FIX_IMPORTS), 1)
ifneq ($(FIX), 1)

This comment has been minimized.

@davidszotten

davidszotten May 20, 2018

Member

what's going on here?

This comment has been minimized.

@mattbennett

mattbennett May 29, 2018

Contributor

I got tired of typing AUTO_FIX_IMPORTS whenever I wanted make imports to actually fix the problem rather than just complaining. Didn't intend to commit it, will revert.



class HeaderDecoder(object):

header_prefix = HEADER_PREFIX

def _strip_header_name(self, key):

This comment has been minimized.

@davidszotten

davidszotten May 20, 2018

Member

this isn't public api right? time to drop this class entirely and just call the functions?

This comment has been minimized.

@mattbennett

mattbennett May 29, 2018

Contributor

Good idea. I will drop the class

)
)
accept = self.consumer_options.pop(
'accept', serialization.setup(config)[1]

This comment has been minimized.

@davidszotten

davidszotten May 20, 2018

Member

could maybe have serialization.setup return a namedtuple

This comment has been minimized.

@mattbennett

mattbennett May 29, 2018

Contributor

That would be nice. I tried not to make any changes outside the scope of the AMQP refactor (serialization.setup landed in #535) but now we're in principle happy with these changes I'll allow myself some out-of-scope improvements too :)

ident = u"{}.wait_for_worker_pool[{}.{}]".format(
type(self).__name__, service_name, method_name
)
self.container.spawn_managed_thread(spawn_worker, identifier=ident)

This comment has been minimized.

@davidszotten

davidszotten May 20, 2018

Member

maybe a comment to explain what's going on?

from nameko.amqp.publish import Publisher as PublisherCore
from nameko.amqp.publish import get_connection
from nameko.amqp.utils import verify_amqp_uri
from nameko.constants import (
AMQP_URI_CONFIG_KEY, DEFAULT_HEARTBEAT, HEADER_PREFIX,

This comment has been minimized.

@davidszotten

davidszotten May 20, 2018

Member

i'm less and less convinced of the benefits of these constants

This comment has been minimized.

@mattbennett

mattbennett May 29, 2018

Contributor

They are cumbersome. What's the alternative?

This comment has been minimized.

@davidszotten

davidszotten May 29, 2018

Member

inline strings...

)
accept = self.consumer_options.pop(
'accept', serialization.setup(config)[1]
)

This comment has been minimized.

@davidszotten

davidszotten May 20, 2018

Member

not possible to share in a nice way?

self.publish,
self.register_for_reply,
self.service_name or name,
self.service_name and name

This comment has been minimized.

@davidszotten

davidszotten May 20, 2018

Member

possibly a bit too clever

This comment has been minimized.

@mattbennett

mattbennett May 29, 2018

Contributor

Fair enough. I'll make it more explicit


class Proxy(object):
""" Helper object for making RPC calls.

This comment has been minimized.

@davidszotten
@@ -2,242 +2,154 @@

This comment has been minimized.

@davidszotten

davidszotten May 20, 2018

Member

do you have any standalone rpc proxies in production? (for testing the pre-release of this)


class Proxy(object):
""" Helper object for making RPC calls.

This comment has been minimized.

@davidszotten

davidszotten May 20, 2018

Member

i wonder if we can/want to do something about the seemingly common case of wanting the dependency provider and the thing it provides have the same name

@kooba

This comment has been minimized.

Copy link
Member

kooba commented May 22, 2018

Would drop of QueueConsumer be considered a breaking change and this land in v3.x.x?

declare : list
List of :class:`kombu.Exchange` or :class:`kombu.Queue` objects
to declare before publishing.
**publisher_options
Options to configure the
:class:`~nameko.amqqp.publish.Publisher` that sends the

This comment has been minimized.

@kooba

kooba May 22, 2018

Member

amqqp/amqp

target_service : str
Target service name
**publisher_options
Options to configure the :class:`~nameko.amqqp.publish.Publisher`

This comment has been minimized.

@kooba

kooba May 22, 2018

Member

amqqp/amqp

class MethodProxy(HeaderEncoder):
The target service and method name may be specified at construction time
or by attibute or dict access, for example:

This comment has been minimized.

@kooba

kooba May 22, 2018

Member

attibute/attribute

This comment has been minimized.

@davidszotten

davidszotten May 22, 2018

Member

a bit surprised that the spellchecker (with all its false positives!) isn't catching these...

This comment has been minimized.

@kooba

kooba May 23, 2018

Member

Actually not sure if the sentence is correct. Is dict access really another way to specify target service and method name?

This comment has been minimized.

@mattbennett

mattbennett May 29, 2018

Contributor

Yep, you can do n.rpc["service-name"]["method-name"]()

This comment has been minimized.

@mattbennett

mattbennett May 29, 2018

Contributor

Spellchecker unfortunately only runs on the .rst files, not docstrings. Might be because we don't include the apidoc. I'll see if I can fix it.

proxy are converted into RPC calls to the service, with responses
returned directly.
Single-threaded RPC proxy to a cluster of services. The target service
and method are specified with attibutes.

This comment has been minimized.

@kooba

kooba May 23, 2018

Member

Maybe you taught your spellchecked to accept attibutes by now ;)

Single-threaded RPC proxy to a cluster of services. The target service
and method are specified with attibutes.
Method calls on the locsl object are converted into RPC calls to the

This comment has been minimized.

@kooba

kooba May 23, 2018

Member

locsl/local

assert service_rpc.echo(1) == 1
Attempting to read from the socket after it's closed raises a
socket.error and the connection will be re-established. If `timeout`
is longer than twice the heartbeat interval, the behaviour is the same

This comment has been minimized.

@kooba

kooba May 23, 2018

Member

Not a typo but I think Nameko settled on American English spelling? behaviour/behavior?

This comment has been minimized.

@mattbennett

mattbennett May 29, 2018

Contributor

My spelling always seems to end up as "transatlantic" 😀 Will fix.

This comment has been minimized.

@mattbennett

mattbennett May 31, 2018

Contributor

Actually spelling_lang = 'en_GB' in the sphinx config. Explains why these weren't flagged by the spell checker already.

service_rpc.echo(2)
assert "ECONNREFUSED" in str(exc_info.value)
This failure mode means that the socket between the consumer and the
rabbit broker times for out `timeout` milliseconds and then closes.

This comment has been minimized.

@kooba

kooba May 23, 2018

Member

times for out / times out for

@kooba
Copy link
Member

kooba left a comment

Wow, how did you wrap your head around this? Just few typos. Let's put it in prod!

@mattbennett

This comment has been minimized.

Copy link
Contributor

mattbennett commented May 29, 2018

@kooba Thanks for the review. I will fix all the typos.

Yes, I think this should land in a v3.x.x release. It is compatible in terms of the external API, but I think the internal changes (e.g. many more rabbit connections) warrant a version bump.

I would also like to have upgraded kombu as part of the final 3.x release, but we can possibly make an alpha 3.x release to test beforehand.

This was referenced May 30, 2018

@mattbennett

This comment has been minimized.

Copy link
Contributor

mattbennett commented Jun 1, 2018

@davidszotten @kooba comments addressed.

If you're both happy I'd like to create a 3.x branch that we can do some testing with.

@kooba
Copy link
Member

kooba left a comment

👍

@mattbennett

This comment has been minimized.

Copy link
Contributor

mattbennett commented Oct 2, 2018

Merged into v3.0.0-rc branch with #578

@mattbennett mattbennett closed this Oct 2, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment