Skip to content

Commit

Permalink
feature #36152 [Messenger] dispatch event when a message is retried (…
Browse files Browse the repository at this point in the history
…nikophil)

This PR was squashed before being merged into the 5.2-dev branch.

Discussion
----------

[Messenger] dispatch event when a message is retried

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| License       | MIT

Hello,

i'm working on a bundle which helps to monitor messenger queues (add some stats for queues/transports + ability to manage failed messages from the browser)
https://github.com/SymfonyCasts/messenger-monitor-bundle/

and we're missing some hooks in the messaging system:
1. a way to know when a message has been retried (fixed by dispatching a new `WorkerMessageRetriedEvent` in `SendFailedMessageForRetryListener::onMessageFailed()`)
2. a way to update the enveloppe in worker message events (fixed by adding `AbstractWorkerMessageEvent::setEnvelope()`)

if needed i can provide some precise use cases.

thanks.

Commits
-------

55bddcb [Messenger] dispatch event when a message is retried
  • Loading branch information
fabpot committed Oct 2, 2020
2 parents 1b88b8b + 55bddcb commit 14942db
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
abstract_arg('senders service locator'),
service('messenger.retry_strategy_locator'),
service('logger')->ignoreOnInvalid(),
service('event_dispatcher'),
])
->tag('kernel.event_subscriber')
->tag('monolog.logger', ['channel' => 'messenger'])
Expand Down
2 changes: 2 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ CHANGELOG
* Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set.
* Added factory methods to `DelayStamp`.
* Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call
* Added `WorkerMessageRetriedEvent`
* Added `WorkerMessageReceivedEvent::setEnvelope()` and made event mutable

5.1.0
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Event;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\StampInterface;

abstract class AbstractWorkerMessageEvent
{
Expand All @@ -36,4 +37,9 @@ public function getReceiverName(): string
{
return $this->receiverName;
}

public function addStamps(StampInterface ...$stamps): void
{
$this->envelope = $this->envelope->with(...$stamps);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Event;

/**
* Dispatched after a message has been sent for retry.
*
* The event name is the class name.
*/
final class WorkerMessageRetriedEvent extends AbstractWorkerMessageEvent
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
use Symfony\Component\Messenger\Exception\RuntimeException;
Expand All @@ -24,6 +25,7 @@
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
* @author Tobias Schultze <http://tobion.de>
Expand All @@ -33,13 +35,15 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
private $sendersLocator;
private $retryStrategyLocator;
private $logger;
private $eventDispatcher;
private $historySize;

public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10)
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
{
$this->sendersLocator = $sendersLocator;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->logger = $logger;
$this->eventDispatcher = $eventDispatcher;
$this->historySize = $historySize;
}

Expand Down Expand Up @@ -74,6 +78,10 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)

// re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);

if (null !== $this->eventDispatcher) {
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
}
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

class SendFailedMessageForRetryListenerTest extends TestCase
{
Expand Down Expand Up @@ -107,7 +108,10 @@ public function testEnvelopeIsSentToTransportOnRetry()
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);

$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
$eventDispatcher = $this->createMock(EventDispatcherInterface::class);
$eventDispatcher->expects($this->once())->method('dispatch');

$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator, null, $eventDispatcher);

$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);

Expand Down
39 changes: 39 additions & 0 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
Expand Down Expand Up @@ -243,14 +244,45 @@ public function testWorkerWithMultipleReceivers()
// make sure they were processed in the correct order
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
}

public function testWorkerMessageReceivedEventMutability()
{
$envelope = new Envelope(new DummyMessage('Hello'));
$receiver = new DummyReceiver([[$envelope]]);

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willReturnArgument(0);

$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

$stamp = new class() implements StampInterface {
};
$listener = function (WorkerMessageReceivedEvent $event) use ($stamp) {
$event->addStamps($stamp);
};

$eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);

$worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run();

$envelope = current($receiver->getAcknowledgedEnvelopes());
$this->assertCount(1, $envelope->all(\get_class($stamp)));
}
}

class DummyReceiver implements ReceiverInterface
{
private $deliveriesOfEnvelopes;
private $acknowledgedEnvelopes;
private $rejectedEnvelopes;
private $acknowledgeCount = 0;
private $rejectCount = 0;

/**
* @param Envelope[][] $deliveriesOfEnvelopes
*/
public function __construct(array $deliveriesOfEnvelopes)
{
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
Expand All @@ -266,11 +298,13 @@ public function get(): iterable
public function ack(Envelope $envelope): void
{
++$this->acknowledgeCount;
$this->acknowledgedEnvelopes[] = $envelope;
}

public function reject(Envelope $envelope): void
{
++$this->rejectCount;
$this->rejectedEnvelopes[] = $envelope;
}

public function getAcknowledgeCount(): int
Expand All @@ -282,4 +316,9 @@ public function getRejectCount(): int
{
return $this->rejectCount;
}

public function getAcknowledgedEnvelopes(): array
{
return $this->acknowledgedEnvelopes;
}
}
9 changes: 7 additions & 2 deletions src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
{
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
$this->dispatchEvent($event);
$envelope = $event->getEnvelope();

if (!$event->shouldHandle()) {
return;
Expand All @@ -123,7 +124,9 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
$envelope = $throwable->getEnvelope();
}

$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
$failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable);
$this->dispatchEvent($failedEvent);
$envelope = $failedEvent->getEnvelope();

if (!$rejectFirst) {
$receiver->reject($envelope);
Expand All @@ -132,7 +135,9 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
return;
}

$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
$handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
$this->dispatchEvent($handledEvent);
$envelope = $handledEvent->getEnvelope();

if (null !== $this->logger) {
$message = $envelope->getMessage();
Expand Down

0 comments on commit 14942db

Please sign in to comment.