Skip to content

Commit

Permalink
Ensure message is handled only once per handler
Browse files Browse the repository at this point in the history
Add check to ensure that a message is only handled once per handler
Add try...catch to run all handlers before throwing exception
  • Loading branch information
keulinho committed Mar 26, 2019
1 parent 0034a0f commit 92065aa
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 16 deletions.
@@ -0,0 +1,49 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Symfony\Component\Messenger\Exception;

use Symfony\Component\Messenger\Envelope;

class ChainedHandlerFailedException extends \RuntimeException implements ExceptionInterface
{
/**
* @var \Throwable[]
*/
private $nested;

/**
* @var Envelope
*/
private $envelope;

public function __construct(Envelope $envelope, \Throwable ...$nested)
{
$this->envelope = $envelope;
$this->nested = $nested;
parent::__construct();
}

public function getEnvelope(): Envelope
{
return $this->envelope;
}

/**
* @return \Throwable[]
*/
public function getNestedExceptions(): array
{
return $this->nested;
}
}
Expand Up @@ -14,6 +14,7 @@
use Psr\Log\LoggerAwareTrait;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\ChainedHandlerFailedException;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
Expand Down Expand Up @@ -52,10 +53,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
'class' => \get_class($message),
];

$exceptions = [];
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), \is_string($alias) ? $alias : null);
$envelope = $envelope->with($handledStamp);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
$alias = \is_string($alias) ? $alias : null;

if ($this->hasMessageSeen($envelope, $handler, $alias)) {
continue;
}

try {
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
$envelope = $envelope->with($handledStamp);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
} catch (\Throwable $e) {
$exceptions[] = $e;
}
}

if (null === $handler) {
Expand All @@ -66,6 +78,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
$this->logger->info('No handler for message "{class}"', $context);
}

if (\count($exceptions)) {
throw new ChainedHandlerFailedException($envelope, ...$exceptions);
}

return $stack->next()->handle($envelope, $stack);
}

private function hasMessageSeen(Envelope $envelope, callable $handler, ?string $alias): bool
{
$some = array_filter($envelope
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
$stamp->getHandlerAlias() === $alias;
});

return \count($some) > 0;
}
}
21 changes: 13 additions & 8 deletions src/Symfony/Component/Messenger/Stamp/HandledStamp.php
Expand Up @@ -40,33 +40,38 @@ public function __construct($result, string $callableName, string $handlerAlias
/**
* @param mixed $result The returned value of the message handler
*/
public static function fromCallable(callable $handler, $result, string $handlerAlias = null): self
public static function fromCallable(callable $handler, $result, ?string $handlerAlias = null): self
{
return new self($result, self::getNameFromCallable($handler), $handlerAlias);
}

public static function getNameFromCallable(callable $handler): string
{
if (\is_array($handler)) {
if (\is_object($handler[0])) {
return new self($result, \get_class($handler[0]).'::'.$handler[1], $handlerAlias);
return \get_class($handler[0]).'::'.$handler[1];
}

return new self($result, $handler[0].'::'.$handler[1], $handlerAlias);
return $handler[0].'::'.$handler[1];
}

if (\is_string($handler)) {
return new self($result, $handler, $handlerAlias);
return $handler;
}

if ($handler instanceof \Closure) {
$r = new \ReflectionFunction($handler);
if (false !== strpos($r->name, '{closure}')) {
return new self($result, 'Closure', $handlerAlias);
return 'Closure';
}
if ($class = $r->getClosureScopeClass()) {
return new self($result, $class->name.'::'.$r->name, $handlerAlias);
return $class->name.'::'.$r->name;
}

return new self($result, $r->name, $handlerAlias);
return $r->name;
}

return new self($result, \get_class($handler).'::__invoke', $handlerAlias);
return \get_class($handler).'::__invoke';
}

/**
Expand Down
Expand Up @@ -16,6 +16,11 @@ public function __construct(callable $callable)
$this->callable = $callable;
}

public function setCallable(callable $callable): void
{
$this->callable = $callable;
}

public function receive(callable $handler): void
{
$callable = $this->callable;
Expand Down
@@ -0,0 +1,45 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Fixtures;

class DummyMessageHandler
{
/**
* @var int
*/
private $remainingExceptionCount;

/**
* @var int
*/
private $called = 0;

public function __construct(int $throwExceptionOnFirstTries = 0)
{
$this->remainingExceptionCount = $throwExceptionOnFirstTries;
}

public function __invoke(DummyMessage $message)
{
if ($this->remainingExceptionCount > 0) {
--$this->remainingExceptionCount;
throw new \Exception('Handler should throw Exception.');
}

++$this->called;
}

public function getTimesCalledWithoutThrowing(): int
{
return $this->called;
}
}
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\ChainedHandlerFailedException;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\StackMiddleware;
Expand Down Expand Up @@ -40,7 +41,7 @@ public function testItCallsTheHandlerAndNextMiddleware()
/**
* @dataProvider itAddsHandledStampsProvider
*/
public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
public function testItAddsHandledStamps(array $handlers, array $expectedStamps, bool $nextIsCalled)
{
$message = new DummyMessage('Hey');
$envelope = new Envelope($message);
Expand All @@ -49,7 +50,11 @@ public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
DummyMessage::class => $handlers,
]));

$envelope = $middleware->handle($envelope, $this->getStackMock());
try {
$envelope = $middleware->handle($envelope, $this->getStackMock($nextIsCalled));
} catch (ChainedHandlerFailedException $e) {
$envelope = $e->getEnvelope();
}

$this->assertEquals($expectedStamps, $envelope->all(HandledStamp::class));
}
Expand All @@ -64,17 +69,22 @@ public function itAddsHandledStampsProvider()
$second->method('__invoke')->willReturn(null);
$secondClass = \get_class($second);

$failing = $this->createPartialMock(\stdClass::class, ['__invoke']);
$failing->method('__invoke')->will($this->throwException(new \Exception('handler failed.')));

yield 'A stamp is added' => [
[$first],
[new HandledStamp('first result', $firstClass.'::__invoke')],
true,
];

yield 'A stamp is added per handler' => [
[$first, $second],
['first' => $first, 'second' => $second],
[
new HandledStamp('first result', $firstClass.'::__invoke'),
new HandledStamp(null, $secondClass.'::__invoke'),
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
],
true,
];

yield 'Yielded locator alias is used' => [
Expand All @@ -83,6 +93,24 @@ public function itAddsHandledStampsProvider()
new HandledStamp('first result', $firstClass.'::__invoke', 'first_alias'),
new HandledStamp(null, $secondClass.'::__invoke'),
],
true,
];

yield 'It tries all handlers' => [
['first' => $first, 'failing' => $failing, 'second' => $second],
[
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
],
false,
];

yield 'It ignores duplicated handler' => [
[$first, $first],
[
new HandledStamp('first result', $firstClass.'::__invoke'),
],
true,
];
}

Expand Down
59 changes: 59 additions & 0 deletions src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandler;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
use Symfony\Component\Messenger\Worker;

class RetryIntegrationTest extends TestCase
{
public function testRetryMechanism()
{
$apiMessage = new DummyMessage('API');

$receiver = new CallbackReceiver(function ($handler) use ($apiMessage) {
$handler(new Envelope($apiMessage, new SentStamp('Some\Sender', 'sender_alias')));
});

$senderLocator = new SendersLocator([], ['*' => true]);

$handler = new DummyMessageHandler();
$throwingHandler = new DummyMessageHandler(1);
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
'handler' => $handler,
'throwing' => $throwingHandler
],
]);

$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);

$worker = new Worker($receiver, $bus, 'receiver name', new MultiplierRetryStrategy());
$worker->run();

$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
}
}
4 changes: 4 additions & 0 deletions src/Symfony/Component/Messenger/Worker.php
Expand Up @@ -17,6 +17,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\ChainedHandlerFailedException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
Expand Down Expand Up @@ -88,6 +89,9 @@ public function run()
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
} catch (\Throwable $throwable) {
if ($throwable instanceof ChainedHandlerFailedException) {
$envelope = $throwable->getEnvelope();
}
$shouldRetry = $this->shouldRetry($throwable, $envelope);

$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry));
Expand Down

0 comments on commit 92065aa

Please sign in to comment.