Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Worker events + global retry functionality #30557

Merged
merged 1 commit into from Mar 23, 2019

Conversation

@weaverryan
Copy link
Member

commented Mar 13, 2019

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? yes, on Messenger only
Deprecations? no
Tests pass? NEEDED
Fixed tickets #29132, #27008, #27215 and part of #30540
License MIT
Doc PR TODO

This is an alternative to #29132 and #27008. There are several big things:

  1. The messenger:consume does not die if a handler has an error
  2. Events are dispatched before, after and on error a message being handled
  3. Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.
  4. A generic retry system was added, which works with Amqp and future transports should support.
    It will work out of the box for users. Retrying works by putting the received Envelope back into the bus, but with the ReceivedStamp removed. The retry functionality has an integration test for AMQP.
  5. Added a new MessageDecodingFailedException that transport Serializers should throw if decode() fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.
  6. A new DelayStamp was added, which is the first of (later) more stamps for configuring the transport layer (see #30558).

BC breaks are documented in the CHANGELOG.

Thanks!

@weaverryan weaverryan requested a review from sroze as a code owner Mar 13, 2019

@weaverryan weaverryan referenced this pull request Mar 13, 2019

Closed

[Messenger] Making it Shine #30540

30 of 36 tasks complete

@nicolas-grekas nicolas-grekas added this to the next milestone Mar 14, 2019

@weaverryan weaverryan force-pushed the weaverryan:worker-events branch from 822b289 to a2d30ac Mar 14, 2019

@weaverryan weaverryan changed the title [WIP] Events for Messenger worker and not failing [WIP][Messenger] Worker events + global retry functionality Mar 15, 2019

@weaverryan

This comment has been minimized.

Copy link
Member Author

commented Mar 15, 2019

I've now incorporated generic retry abilities from #27008. I think more details need to be considered, like #30558 (transport stamp) and how it will affect things.

But, generally speaking, what big issues do people see? I'm pushing much more control into the Worker, and making the transport a bit more agnostic to sending/receiving/acking/nacking/retrying.

Show resolved Hide resolved src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php Outdated
Show resolved Hide resolved src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php Outdated
@@ -24,6 +24,8 @@
*/
class Connection
{
public const ATTEMPT_COUNT_HEADER_NAME = 'symfony-messenger-attempts';

This comment has been minimized.

Copy link
@sroze

sroze Mar 18, 2019

Member

AMQP uses x-death when a message is dead-letter-ed. Maybe we can reuse the same? (not sure if it conflicts though, would need to try).

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 18, 2019

Author Member

Hmm. I don't know much about this, but I don't think it'll work. Each time we retry, we're re-using the headers from the existing message. This has a nice effect that the x-death header (which is an array) will have 0 items, then 1 item, then 2 items, etc - it'll increase with each "death". The "newest" death apparently (i've just tested) always becomes the 0 key - the others are "pushed back". If we try to insert an entry into x-death, it just looks like there was a "previous" death, and Rabbit pushes a new item onto the 0 index and our entry is pushed back to 0.

It seems like we shouldn't be setting values onto x-death.

Show resolved Hide resolved src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php Outdated
Show resolved Hide resolved src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php Outdated
Show resolved Hide resolved src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php Outdated
Show resolved Hide resolved src/Symfony/Component/Messenger/Worker.php Outdated
@weaverryan

This comment has been minimized.

Copy link
Member Author

commented Mar 18, 2019

Thank you @sroze for the review! I was working concurrently with your review, so I pushed a bunch of changes as you were commenting. I would love another look. I've replied to a few of your comments with the changes I've made. Specifically, the QueuedMetadata thing is now gone, replaced with a Stamp. Also (and this is important), I've given the AmqpReceiver some temporary state. The issue is that the AMQPEnvelope is needed to do things like ack, nack, etc. And because these methods are now called by Worker, I previously was passing this "message" back out to the Worker... which was unfortunate, because this is really an internal detail to each transport. I fixed that by tracking the current AMQPEnvelope being handled inside AmqpReceiver, which allows us to drop passing the AMQPEnvelope object around.

I also made the retry stuff configurable on a transport-by-transport basis using a service.

Also, question: should we ever redeliver via nack() onto the same queue? Like, if, for some reason, redelivery onto the DLX fails, should we nack() retry? Or is that just madness - trying to gracefully fail when something has gone terribly wrong.

Thanks!

@weaverryan weaverryan force-pushed the weaverryan:worker-events branch from 2f66cb6 to 8c851ca Mar 18, 2019

@weaverryan weaverryan added the BC Break label Mar 18, 2019

*
* @throws TransportException If there is an issue communicating with the transport
*/
public function retryCurrentMessage(int $delay): void;

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 18, 2019

Author Member

These all rely on being called from inside the receive() loop, because that keeps temporary "state" about which message is currently being handled. That's a key change here, which simplifies a lot, but which I want to make sure won't cause issues.

@weaverryan

This comment has been minimized.

Copy link
Member Author

commented Mar 18, 2019

This is ready for review! This represents a big change in how we handle the transports, so I really appreciate review!

$this->dispatchEvent(
WorkerMessageFailedEvent::class,
new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRequeue)
);

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 19, 2019

Author Member

The events are currently dispatched before ack/reject/retry on the queue. That's subjective, and either order could, in theory, cause a situation where one fails and so the other doesn't run (e.g. some listener throws an exception, so the retry never happens, or, if we reverse, the retry fails due to a network connection, then the event is never dispatched). Not sure if we need to be thinking about this level of failure. Catching exceptions makes things harder to debug/know about when they go wrong.

@@ -140,8 +145,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
}
if (!$this->retryStrategyLocator->has($receiverName)) {

This comment has been minimized.

Copy link
@sroze

sroze Mar 19, 2019

Member

Shouldn't we decorate the receiver instead of having this logic in the command & worker then? 🤔

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 19, 2019

Author Member

We probably could, but should we? The code reads really clearly inside Worker, and my thought is sort of that we're setting out the "core" logic that (unless you really want to) everyone gets. At this point, it includes a lot - event dispatching, retry logic (and a stamp being added for this).

So I guess I would say: someone needs to sell hard on the idea of making it a decorator.

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 19, 2019

Author Member

Or, to say it differently:

  • Do we really need this?
  • If we do want it, could we only move some parts out (e.g. logging decorator, or event listener, event decorator) and leave others
Show resolved Hide resolved src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
$AMQPEnvelope = $this->connection->get();
if (null === $AMQPEnvelope) {
try {
$this->currentAmqpEnvelope = $this->connection->get();

This comment has been minimized.

Copy link
@sroze

sroze Mar 19, 2019

Member

I'm not fond of the idea of forcing the receivers to have such local state at all. Can't you have another stamp (non-serialisable), AMQP-specific, that contains this \AMQPMessage? You'd call it AmqpStamp and that's it, you can just get it from the Envelope, no need of these *currentMessage methods anymore, just "normal" methods taking the Envelope as an argument.

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 19, 2019

Author Member

Indeed, I've gone back-and-forth on this. Your transport-specific stamp is a nice idea. It would leak this information out to userland, like middleware or event listeners. What do you think about that?

Also, in the latest commit, on retry, I re-send the Envelope for normal encoding/decoding. This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent. It would mean that this AMQP-specific stamp would be serialized & sent. You mentioned "non-serialisable"... do you basically mean: give it a "sleep" method so that if/when we serialize it, it'll just be an empty object (i.e. it won't cause an error).

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 19, 2019

Author Member

Side note: if we did this and made methods like reject() require the Envelope, we wouldn't be able to handle the MessageDecodingFailedException in the Worker - catching that and rejecting would need to remain the responsibility of each transport, because there wouldn't be any Envelope that the Worker could send back to the Receiver::reject() method.

This comment has been minimized.

Copy link
@sroze

sroze Mar 20, 2019

Member

It would leak this information out to userland, like middleware or event listeners. What do you think about that?

That's a good thing IMHO. This allows users to go deep into customising it.

This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent.

Indeed, hence my (non-serializable) comment. We need a way to make them non-transportable, it's definitely a valid use-case. The sleep method works but is only about the serialize method... I'd imagine a NonSerializableStampInterface actually.

if we did this and made methods like reject() require the Envelope, we wouldn't be able to handle the MessageDecodingFailedException in the Worker

Technically speaking we could actually create an empty envelope as part of the exception. It would make sense. But for the sake of this pull-request, let's keep it the responsibility of the transport :)

This comment has been minimized.

Copy link
@weaverryan

weaverryan Mar 20, 2019

Author Member

I've made the change to use a stamp (AmqpReceivedStamp) and it's really, really nice - good suggestion. However, I also found out that serializing that stamp works just fine. The AMQPEnvelope inside it is just a simple object that doesn't cause any problems.

So, should we still add this NonSerializableStampInterface? Serializing the AMQPEnvelope doesn't cause any problems, as we're always looking for $envelope->last(AMQPEnvelop::class). Also, to implement this, each serializer would actually need to create a new envelope with the NonSerializableStampInterface filtered out. Totally doable, but is this something we actually need to add?

Show resolved Hide resolved src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php Outdated
@weaverryan

This comment has been minimized.

Copy link
Member Author

commented Mar 19, 2019

One last update: "retrying" is no longer a special situation - we basically just call SenderInterface::send() with whatever the delay is, then "ack" the old message. That is basically what we were doing before, but the difference is that we're now re-serializing the Envelope on re-send, instead of sending a duplicate of the original message. That allows us to add stamps to the Envelope, which is really powerful because we can, for example, manage the RetryCountStamp in a way where the transport doesn't need to do anything with tracking how many retries have happened.

@weaverryan weaverryan force-pushed the weaverryan:worker-events branch from d5a83b4 to 76f6378 Mar 22, 2019

@weaverryan

This comment has been minimized.

Copy link
Member Author

commented Mar 22, 2019

Ready to go again!

Last commits guarantee that redeliveries are only sent back to the same transport.

@Nyholm

This comment has been minimized.

Copy link
Member

commented Mar 22, 2019

There are still quite a few mentions of "queue" when we really mean "transport". I think we should try to be more technically correct.

@sroze
Copy link
Member

left a comment

Fabulous. Last thing is the event name I think.

@fabpot can you update your review? 🙏

@weaverryan

This comment has been minimized.

Copy link
Member Author

commented Mar 23, 2019

Last changes look good to me. Thank you!

@fabpot

This comment has been minimized.

Copy link
Member

commented Mar 23, 2019

@weaverryan Can you squash so that I can merge? Thank you.

Adding global retry support, events & more to messenger transport
Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>

@weaverryan weaverryan force-pushed the weaverryan:worker-events branch from 895562b to a989384 Mar 23, 2019

@weaverryan

This comment has been minimized.

Copy link
Member Author

commented Mar 23, 2019

Squashed!

@fabpot

This comment has been minimized.

Copy link
Member

commented Mar 23, 2019

Thank you @weaverryan.

@fabpot fabpot merged commit a989384 into symfony:master Mar 23, 2019

2 of 3 checks passed

continuous-integration/appveyor/pr AppVeyor build failed
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
fabbot.io Your code looks good.
Details

fabpot added a commit that referenced this pull request Mar 23, 2019

feature #30557 [Messenger] Worker events + global retry functionality…
… (weaverryan)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Worker events + global retry functionality

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | yes, on Messenger only
| Deprecations? | no
| Tests pass?   | NEEDED
| Fixed tickets | #29132, #27008, #27215 and part of #30540
| License       | MIT
| Doc PR        | TODO

This is an alternative to #29132 and #27008. There are several big things:

1) The `messenger:consume` does not die if a handler has an error
2) Events are dispatched before, after and on error a message being handled
3) Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.
4) A generic retry system was added, which works with Amqp and future transports should support.
 It will work out of the box for users. Retrying works by putting the received `Envelope` back into the bus, but with the `ReceivedStamp` removed. The retry functionality has an integration test for AMQP.
5) Added a new `MessageDecodingFailedException` that transport Serializers should throw if `decode()` fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.
6) A new `DelayStamp` was added, which is the first of (later) more stamps for configuring the transport layer (see #30558).

BC breaks are documented in the CHANGELOG.

Thanks!

Commits
-------

a989384 Adding global retry support, events & more to messenger transport

@weaverryan weaverryan deleted the weaverryan:worker-events branch Mar 23, 2019

sroze added a commit that referenced this pull request Mar 23, 2019

bug #30658 [Messenger] Ensure an exception is thrown when the AMQP co…
…nnect() does not work (sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Ensure an exception is thrown when the AMQP connect() does not work

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #30557
| License       | MIT
| Doc PR        | ø

This `connectionCredentials` instance escaped the renaming in #30557.

Commits
-------

46b9476 Ensure an exception is thrown when the AMQP connect() does not work

fabpot added a commit that referenced this pull request Mar 23, 2019

feature #30650 Dispatching two events when a message is sent & handle…
…d (weaverryan)

This PR was merged into the 4.3-dev branch.

Discussion
----------

Dispatching two events when a message is sent & handled

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | none
| License       | MIT
| Doc PR        | TODO

Alternative to #30646. This uses a more generic system, so you could do anything when a message is sent. The main use-case is when a message is dispatched by a 3rd party.

I didn't try to add *exhaustive* events everywhere: I added an event for a very specific use-case:

When a message is dispatched by a 3rd party, being able to add stamps (e.g. `DelayStamp` or a future `AmqpRoutingKeyStamp` before the message is sent. Example:

```php
class MailerMessageSendToTransportEventSubscriber implements EventSubscriberInterface
{
    public function onSendMessage(SendMessageToTransportsEvent $event)
    {
        $envelope = $event->getEnvelope();
        if (!$envelope->getMessage() instanceof SomeMailerMessage) {
            return;
        }

        $event->setEnvelope($envelope->with(new AmpqRoutingKeyStamp('mailer-route')));
    }

    public static function getSubscribedEvents()
    {
        return [SendMessageToTransportsEvent::class => 'onSendMessage'];
    }
}
```

Along with #30557, we will now have the following events, regarding async messages:
* Event when a message is sent to transports (this PR)
* Event when a message is received from transport, but before handling it
* Event when a message is received from transport and after handling it

Commits
-------

a7ad1b4 Dispatching two events when a message is sent & handled

@weaverryan weaverryan referenced this pull request Mar 28, 2019

Open

[Messenger][4.3] Tracker for changes #11236

1 of 25 tasks complete

sroze added a commit that referenced this pull request Mar 31, 2019

feature #29007 [Messenger] Add a Doctrine transport (vincenttouzet)
This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Add a Doctrine transport

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        | symfony/symfony-docs#10616
| DoctrineBundle PR | doctrine/DoctrineBundle#868

As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.

Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).

# How it works

The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868

## Configuration

To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`)
```yml
        # config/packages/messenger.yaml
        framework:
            messenger:
                transports:
                    my_transport: "doctrine://default?queue=important"
```

## Table schema

Dispatched messages are stored into a database table with the following schema:

| Column       | Type     | Options                  | Description                                                       |
|--------------|----------|--------------------------|-------------------------------------------------------------------|
| id           | bigint   | AUTO_INCREMENT, NOT NULL | Primary key                                                       |
| body         | text     | NOT NULL                 | Body of the message                                               |
| headers      | text     | NOT NULL                 | Headers of the message                                            |
| queue      | varchar(32)     | NOT NULL                 | Headers of the message                                            |
| created_at   | datetime | NOT NULL                 | When the message was inserted onto the table. (automatically set) |
| available_at       | datetime   | NOT NULL                 | When the message is available to be handled                      |
| delivered_at | datetime | NULL                     | When the message was delivered to a worker                        |

## Message dispatching

When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish`

## Message consuming

The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle.

### Getting the next message

* Start a transaction
* Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query)
* Update the message in database to update the delivered_at columns
* Commit the transaction

### Handling the message

The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table.

If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`.

## Message requeueing

It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a  `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds)

# TODO

- [x] Add tests
- [x] Create DOC PR
- [x] PR on doctrine-bundle for transport factory
- [x] Add a `available_at` column
- [x] Add a `queue` column
- [x] Implement the retry functionnality : See #30557
- [x] Rebase after #29476

Commits
-------

88d008c [Messenger] Add a Doctrine transport

@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019

@fabpot fabpot referenced this pull request May 9, 2019

Merged

Release v4.3.0-BETA1 #31435

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.