Skip to content

Commit

Permalink
bug #51468 [Messenger] Fix forced bus name gone after an error in del…
Browse files Browse the repository at this point in the history
…ayed message handling (valtzu)

This PR was merged into the 6.3 branch.

Discussion
----------

[Messenger] Fix forced bus name gone after an error in delayed message handling

| Q             | A
| ------------- | ---
| Branch?       | 6.3
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| License       | MIT

When consuming messages from an external source which do not have metadata/headers like bus name, you have to force the bus name with `--bus` in the `messenger:consume` command. However, if in the message handler you dispatch another message with `DispatchAfterCurrentBusStamp`, which then fails, end result is that the original message from external source is retried, but that forced bus name (`BusNameStamp`) is lost – so it will be retried on default bus instead. This isn't intended behavior, is it?

---

Not sure if changing `DelayedMessageHandlingException::__construct` signature is considered a BC break, it is not marked as internal so I am a bit worried 🤔

Commits
-------

2f04a64 Fix missing stamps in delayed message handling
  • Loading branch information
nicolas-grekas committed Sep 29, 2023
2 parents 1981bc9 + 2f04a64 commit b9be69f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

namespace Symfony\Component\Messenger\Exception;

use Symfony\Component\Messenger\Envelope;

/**
* When handling queued messages from {@link DispatchAfterCurrentBusMiddleware},
* some handlers caused an exception. This exception contains all those handler exceptions.
Expand All @@ -20,9 +22,12 @@
class DelayedMessageHandlingException extends RuntimeException
{
private array $exceptions;
private Envelope $envelope;

public function __construct(array $exceptions)
public function __construct(array $exceptions, Envelope $envelope)
{
$this->envelope = $envelope;

$exceptionMessages = implode(", \n", array_map(
fn (\Throwable $e) => $e::class.': '.$e->getMessage(),
$exceptions
Expand All @@ -43,4 +48,9 @@ public function getExceptions(): array
{
return $this->exceptions;
}

public function getEnvelope(): Envelope
{
return $this->envelope;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope

$this->isRootDispatchCallRunning = false;
if (\count($exceptions) > 0) {
throw new DelayedMessageHandlingException($exceptions);
throw new DelayedMessageHandlingException($exceptions, $returnedEnvelope);
}

return $returnedEnvelope;
Expand Down
86 changes: 86 additions & 0 deletions src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
Expand Down Expand Up @@ -354,6 +359,87 @@ public function testMultipleFailedTransportsWithoutGlobalFailureTransport()
// After the message fails again, the message is discarded from the "the_failure_transport2"
$this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived());
}

public function testStampsAddedByMiddlewaresDontDisappearWhenDelayedMessageFails()
{
$transport1 = new DummyFailureTestSenderAndReceiver();

$transports = [
'transport1' => $transport1,
];

$locator = $this->createMock(ContainerInterface::class);
$locator->expects($this->any())
->method('has')
->willReturn(true);
$locator->expects($this->any())
->method('get')
->willReturnCallback(fn ($transportName) => $transports[$transportName]);
$senderLocator = new SendersLocator([], $locator);

$retryStrategyLocator = $this->createMock(ContainerInterface::class);
$retryStrategyLocator->expects($this->any())
->method('has')
->willReturn(true);
$retryStrategyLocator->expects($this->any())
->method('get')
->willReturn(new MultiplierRetryStrategy(1));

$syncHandlerThatFails = new DummyTestHandler(true);

$middlewareStack = new \ArrayIterator([
new AddBusNameStampMiddleware('some.bus'),
new DispatchAfterCurrentBusMiddleware(),
new SendMessageMiddleware($senderLocator),
]);

$bus = new MessageBus($middlewareStack);

$transport1Handler = fn () => $bus->dispatch(new \stdClass(), [new DispatchAfterCurrentBusStamp()]);

$handlerLocator = new HandlersLocator([
DummyMessage::class => [new HandlerDescriptor($transport1Handler)],
\stdClass::class => [new HandlerDescriptor($syncHandlerThatFails)],
]);

$middlewareStack->append(new HandleMessageMiddleware($handlerLocator));

$dispatcher = new EventDispatcher();

$dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator));
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

$runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable {
$throwable = null;
$failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) {
$throwable = $event->getThrowable();
};
$dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener);

$worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher);

$worker->run();

$dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener);

return $throwable;
};

// Simulate receive from external source
$transport1->send(new Envelope(new DummyMessage('API')));

// Receive the message from "transport1"
$throwable = $runWorker('transport1');

$this->assertInstanceOf(DelayedMessageHandlingException::class, $throwable, $throwable->getMessage());
$this->assertSame(1, $syncHandlerThatFails->getTimesCalled());

$messagesWaiting = $transport1->getMessagesWaitingToBeReceived();

// Stamps should not be dropped on message that's queued for retry
$this->assertCount(1, $messagesWaiting);
$this->assertSame('some.bus', $messagesWaiting[0]->last(BusNameStamp::class)?->getBusName());
}
}

class DummyFailureTestSenderAndReceiver implements ReceiverInterface, SenderInterface
Expand Down
3 changes: 2 additions & 1 deletion src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\RuntimeException;
Expand Down Expand Up @@ -186,7 +187,7 @@ private function ack(): bool
$receiver->reject($envelope);
}

if ($e instanceof HandlerFailedException) {
if ($e instanceof HandlerFailedException || $e instanceof DelayedMessageHandlingException) {
$envelope = $e->getEnvelope();
}

Expand Down

0 comments on commit b9be69f

Please sign in to comment.