Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] AMQP Pubsub style exchange with individual queue per consumer #32171

Open
aurimasniekis opened this issue Jun 25, 2019 · 4 comments

Comments

@aurimasniekis
Copy link
Contributor

commented Jun 25, 2019

Description
At the moment Messenger only supports single handling of message per transport. AMQP supports sending message to exchange which has multiple queues binding to it (Every new consumer creates a queue with a random name (queue name is an empty string server generates random queue name for it) and binds to specific exchange that way whenever exchange receives a new message it will fan out to all queue (consumers) aka typical PubSub behavior.

Example

framework:
    messenger:
        transports:
            amqp:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue:
                        name: 'pubsub_%rand(5)%'
                        binding_keys: pubsub
                    exchange:
                        name: pubsub
                        type: fanout
@Tobion

This comment has been minimized.

Copy link
Member

commented Jun 25, 2019

I don't see yet what exactly you cannot achieve. Can't you define a queue with an empty name? Also why do you define a binding key in your example when you use a fanout exchange? A fanout exchange does not care about routing keys.

@Tobion

This comment has been minimized.

Copy link
Member

commented Jun 25, 2019

Btw, if you didn't know, you can already define several queues per transport:

* * queues[name]: An array of queues, keyed by the name
* * binding_keys: The binding keys (if any) to bind to this queue
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments

@aurimasniekis

This comment has been minimized.

Copy link
Contributor Author

commented Jun 26, 2019

@Tobion thanks for the review. First thing regarding the empty name AMQP ext doesn't allow you to call setName function with an empty string:

https://github.com/pdezwart/php-amqp/blob/dd7e90a940c4f9d0153d4cb9799025436eb0bcf7/amqp_queue.c#L118

Second, the idea is you create fanout exchange, and every time you spawn a consumer server would generate a random name and would bind that queue to that exchange. As I set the flag exclusive that queue will be deleted after channel/connection is closed. This way every consumer spawn will receive the same message which was dispatched to that queue.

Regarding binding key yes you are right it's not needed, it was my assumption it's not bound and used default exchange or etc.

@Tobion

This comment has been minimized.

Copy link
Member

commented Jul 30, 2019

So all we need to do here is to skip calling setName when the name is empty? This way a random queue name is generated by amqp.

@Tobion Tobion added this to Tobion's in Help Wanted Jul 30, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
3 participants
You can’t perform that action at this time.