Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports #30708

Merged
merged 1 commit into from Mar 30, 2019

Conversation

Projects
None yet
8 participants
@weaverryan
Copy link
Member

commented Mar 26, 2019

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets Helps with #30699
License MIT
Doc PR TODO

Highlights:

  • messenger:consume can now consume messages from multiple transports with priority ❗️
bin/console messenger:consume amqp_high amqp_medium amqp_low
  • How long you want to sleep before checking more messages is now an option to messenger:consume
  • ReceiverInterface::receive() is replaced with ReceiverInterface::get()
  • Logic for looping & sleeping is moved into Worker

@weaverryan weaverryan requested a review from sroze as a code owner Mar 26, 2019

@weaverryan weaverryan force-pushed the weaverryan:receiver-no-handler branch from e7c770a to d5fd552 Mar 26, 2019

@nicolas-grekas nicolas-grekas added this to the next milestone Mar 26, 2019

@weaverryan weaverryan force-pushed the weaverryan:receiver-no-handler branch 5 times, most recently from 2ee34de to 357cdb9 Mar 27, 2019

}
$handled(null);
usleep(1000000);

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 27, 2019

Author Member

Two changes here:

  1. I changed from .2 seconds to 1 second. Waiting only 200ms between when there is not a message is quite rapid. For reference, in Laravel, this defaults to 3 and is configurable on the command/worker.

@weaverryan weaverryan force-pushed the weaverryan:receiver-no-handler branch from 357cdb9 to 2d1f17b Mar 27, 2019

@weaverryan

This comment has been minimized.

Copy link
Member Author

commented Mar 27, 2019

This is ready!

tl;dr In recent changes, the ReceiverInterface is not really the layer that's handling the "looping and waiting for messages" layer - the Worker is doing that. This completes that transformation. It will also allow us to create a Worker that listens to multiple queues at once, in a priority order (#30699).

@sroze

This comment has been minimized.

Copy link
Member

commented Mar 27, 2019

The reasoning of having handle($handler) was for the transports to be able to catch the exceptions from the handlers and deal with the errors (i.e. ack, nack, etc...). Since we moved this to the worker, I'm happy to move back to get one message. Though, why wouldn't we return an iterable instead to match the behaviour of prefetching (i.e. some transports would have loaded X messages, not just one)?

@weaverryan weaverryan force-pushed the weaverryan:receiver-no-handler branch 2 times, most recently from 3864a11 to 8866804 Mar 28, 2019

@weaverryan weaverryan changed the title [Messenger] Changes ReceiverInterface::handle() to get() [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports Mar 28, 2019

* Stop receiving some messages.
*/
public function stop(): void;
public function get(): iterable;

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 28, 2019

Author Member

The iterable, like always, was kind of a pain, compared to just returning one. But, it is more flexible and end-users won't touch this method much anyways.

Show resolved Hide resolved src/Symfony/Component/Messenger/Worker.php

@weaverryan weaverryan force-pushed the weaverryan:receiver-no-handler branch from 8866804 to 9c9c4e0 Mar 28, 2019

@weaverryan

This comment has been minimized.

Copy link
Member Author

commented Mar 28, 2019

This is ready for review! Description updated.

Better viewed (especially the Worker with ?w=1). The 3 StopWhen*** classes are not new - but were moved & changed enough (they decorate WorkerInterface instead of ReceiverInerface so they look new.

@Nyholm
Copy link
Member

left a comment

I really like this.

I am not a fan of priority in a message system, but this is priority of transports. That is a different thing. Im all 👍
(after my comments are addressed/answered)

Show resolved Hide resolved src/Symfony/Component/Messenger/Worker.php
Show resolved Hide resolved src/Symfony/Component/Messenger/Worker.php Outdated
Show resolved Hide resolved src/Symfony/Component/Messenger/Worker.php
Show resolved Hide resolved src/Symfony/Component/Messenger/Worker.php

@weaverryan weaverryan force-pushed the weaverryan:receiver-no-handler branch from f5ebe36 to 8356476 Mar 29, 2019

@Nyholm

Nyholm approved these changes Mar 29, 2019

Copy link
Member

left a comment

Good. I’m happy with the updated doc block on ReceiverInterface::get().

Thank you

@fabpot

fabpot approved these changes Mar 30, 2019

Copy link
Member

left a comment

minor typos

@fabpot fabpot force-pushed the weaverryan:receiver-no-handler branch from 8356476 to e800bd5 Mar 30, 2019

@fabpot

This comment has been minimized.

Copy link
Member

commented Mar 30, 2019

Thank you @weaverryan.

@fabpot fabpot merged commit e800bd5 into symfony:master Mar 30, 2019

1 of 3 checks passed

continuous-integration/appveyor/pr Waiting for AppVeyor build to complete
Details
continuous-integration/travis-ci/pr The Travis CI build is in progress
Details
fabbot.io Your code looks good.
Details

fabpot added a commit that referenced this pull request Mar 30, 2019

feature #30708 [Messenger] ReceiverInterface::handle() to get() & Wor…
…ker with prioritized transports (weaverryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30708).

Discussion
----------

[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Helps with #30699
| License       | MIT
| Doc PR        | TODO

Highlights:

* `messenger:consume` can now consume messages from multiple transports with priority ❗️

```
bin/console messenger:consume amqp_high amqp_medium amqp_low
```

* How long you want to sleep before checking more messages is now an option to `messenger:consume`
* `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()`
* Logic for looping & sleeping is moved into `Worker`

Commits
-------

e800bd5 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
@fabpot

This comment has been minimized.

Copy link
Member

commented Mar 30, 2019

typos fixed in f4176b0

@@ -25,21 +25,25 @@ interface ReceiverInterface
/**
* Receive some messages to the given handler.

This comment has been minimized.

Copy link
@Tobion

Tobion Mar 30, 2019

Member

This doc is outdated: There is no "given handler"

fabpot added a commit that referenced this pull request Mar 31, 2019

minor #30793 [Messenger] Remove the mention of handler in the `Receiv…
…erInterface::get` phpdoc. (sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Remove the mention of handler in the `ReceiverInterface::get` phpdoc.

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #30708 (review)
| License       | MIT
| Doc PR        | ø

As spotted by @Tobion, we don't have an handler as an argument anymore.

Commits
-------

9c63112 Remove the mention of handler in the phpdoc.

@weaverryan weaverryan deleted the weaverryan:receiver-no-handler branch Mar 31, 2019

fabpot added a commit that referenced this pull request Mar 31, 2019

feature #30754 [Messenger] New messenger:stop-workers Command (weaver…
…ryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30754).

Discussion
----------

[Messenger] New messenger:stop-workers Command

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Kinda of #29451
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

o/ me again.

This requires and is built on top of #30708

When you deploy, all workers need to be stopped and restarted. That's not currently possible, unless you manually track the pids and send a SIGTERM signal. We can make that much easier :).

Now run:

```
bin/console messenger:stop-workers
```

And it will signal to all workers (even if they're distributed on other servers) that they should stop, once they finish processing their current message. This is done via a key in `cache.app`.

Cheers!

Commits
-------

5897162 [Messenger] New messenger:stop-workers Command

@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019

@Tobion

This comment has been minimized.

Copy link
Member

commented May 6, 2019

Just thinking out loud for reference: The prioritized transports can be used to prioritize messages by publishing messages to different transports and consuming them in order. But it's a different way than rabbitmq uses message priority. If you want to use rabbitmq priorities instead, you can do that by setting the priority attribute on the AmqpStamp as introduced in #30913

@fabpot fabpot referenced this pull request May 9, 2019

Merged

Release v4.3.0-BETA1 #31435

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.