Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Publisher queue parameter with queues_to_declare #398

Closed

Conversation

juliotrigo
Copy link

@juliotrigo juliotrigo commented Jan 22, 2017

The nameko.messging.Publisher class currently accepts two parameters: exchange and queue.

The two things the Publisher does with the provided queue is to declare it and use its internal Exchange as the exchange to publish messages to, in case no exchange parameter is provided.

Sometimes, there is a need to declare multiple queues (instead of just one) that will be used by the publisher, to ensure they will be correctly declared by the time messages are published. This is useful, for instance, when queues are federated across different regions and there's no consumer in the region where the publisher is instantiated, since messages published to those queues will be federated to another region, but queues do still need to be correctly declared and bound in all regions.

Additionally, the Publisher has the ability of publishing messages using the optional routing_key parameter, which could not be bound to the provided queue. So providing a queue to a Publisher will not ensure messages will be published to it.

Also, if a message needs to be published to an specific Exchange, it seems a good choice to provide the exchange explicitly, rather than providing it contained in the queue object. There is also a possibility where we could provide both exchange and queue.exchange and they are different.

With the change contained here, we'd ensure that we explicitly provide an exchange argument when messages should be published to a different exchange, rather than having the possibility of using queue.

Additionally, the Publisher would accept an optional list of queues_to_declare (instead of a single one) and would make sure that all of them get declared.

@mattbennett here you have as promised
cc @davidszotten

Copy link
Member

@mattbennett mattbennett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Definitely an enhancement.

I've requested one change regarding backwards compatibility, and there's another optional one to possibly simplify the implementation.

if queues_to_declare is not None:
for queue in queues_to_declare:
if queue is not None:
maybe_declare(queue, conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that producer.publish accepts a declare kwarg, that will do this for you. It might be easier to test within nameko, but you could drop these declarations and allow kombu to do it. Up to you.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. As far as I see it, the big change here would be that we will ensure queues and exchanges are correctly bound before publishing a message rather than just once when the dependency is set up, so Kombu will have to declare those entities each time.

I am happy to change it and let Kombu do it for us when publishing, if you think those extra declarations are acceptable.

@@ -75,22 +74,22 @@ def unpack_message_headers(self, worker_ctx_cls, message):

class Publisher(DependencyProvider, HeaderEncoder):

def __init__(self, exchange=None, queue=None):
def __init__(self, exchange=None, queues_to_declare=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to continue to accept the queue argument for backwards compatibility.

If you pass exchange=None and queue=<some bound queue> to the current implementation, it'll declare the queue and publish to the exchange the queue is bound to. So to maintain compatibility you'd need to:

  • Accept a queue argument
  • If the queue is bound and no exchange is provided, set self.exchange to the queue's exchange
  • Add the queue to queues_to_declare
  • Ideally raise a warning noting that the queue arg is deprecated

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the queue argument back with the changes that you suggested to maintain compatibility. I should have included those ones in the first place with my request.

We're now also working with a copy of the original queues_to_declare inside the Publisher to ensure the provided sequence is a list (so that we can add elements to it later) and to avoid side effects outside this class when adding queue to it.

The code now also raises a warning when queue is provided. Having a look at other examples in the codebase, I have assumed these changes could be included in version 2.5.2 and also that this warning would be removed in version 2.7.0. Please let me know if you are happy with this.

@mattbennett
Copy link
Member

Hi @juliotrigo,

This turns out to have some overlap with #400, which exposes the keyword arguments to publisher.publish to the Publisher DependencyProvider constructor.

If it's OK with you I'd like to cherry pick some of these changes (particularly the backwards compatibility shim and docs) and put it on top of #400.

mattbennett added a commit to mattbennett/nameko that referenced this pull request Feb 2, 2017
@juliotrigo
Copy link
Author

Hi @mattbennett

Sorry I am a bit late in my reply, of course you can cherry pick those changes. I was also preparing the changes for the publish declare as you suggested.

I've had a look at #400 and how keyword arguments have been added to the Publisher interface, including the declare parameter. How is this going to affect this current PR? Is it going to be replaced by #400?

@mattbennett
Copy link
Member

I made a couple of commits against #400 in a separate branch -- mattbennett#7. Would you mind reviewing it?

I would like to wait for #400 to be reviewed before merging mattbennett#7 though, since it's a related but slightly different concern.

@juliotrigo
Copy link
Author

I have reviewed mattbennett#7 and left a few comments.

mattbennett added a commit to mattbennett/nameko that referenced this pull request Feb 24, 2017
@mattbennett
Copy link
Member

This is now supported by the declare kwarg that you can pass to the Publisher constructor or Publisher.publish.

One difference between this implementation and the existing one is that the declare kwarg declares queues lazily on publish. I think this is sufficient -- there's no point in a queue existing if before it's either published to or consumed from (the consumer also declares queues). If the queue declaration is needed at startup publisher for some reason, you could subclass the Publisher and customise the setup() method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants