From f5f13b0c5fb5cead34984a1b01ff61064a17023d Mon Sep 17 00:00:00 2001 From: David Badura Date: Mon, 21 Apr 2025 09:18:59 +0200 Subject: [PATCH] add retry outdated aggregate command bus --- docs/pages/command_bus.md | 38 +++++ docs/pages/repository.md | 8 +- phpstan-baseline.neon | 18 +++ src/Attribute/RetryAggregateOutdated.php | 16 ++ .../RetryOutdatedAggregateCommandBus.php | 49 ++++++ .../RetryOutdatedAggregateCommandBusTest.php | 143 ++++++++++++++++++ 6 files changed, 271 insertions(+), 1 deletion(-) create mode 100644 src/Attribute/RetryAggregateOutdated.php create mode 100644 src/CommandBus/RetryOutdatedAggregateCommandBus.php create mode 100644 tests/Unit/CommandBus/RetryOutdatedAggregateCommandBusTest.php diff --git a/docs/pages/command_bus.md b/docs/pages/command_bus.md index f5452ba8..7ac15e6c 100644 --- a/docs/pages/command_bus.md +++ b/docs/pages/command_bus.md @@ -231,6 +231,44 @@ $commandBus = new SyncCommandBus($handlerProvider); $commandBus->dispatch(new CreateProfile($profileId, 'name')); $commandBus->dispatch(new ChangeProfileName($profileId, 'new name')); ``` +### Retry Outdated Aggregate + +If you want to retry the command when an `AggregateOutdated` exception occurs, +you can use the `RetryOutdatedAggregateCommandBus` decorator. + +```php +use Patchlevel\EventSourcing\CommandBus; +use Patchlevel\EventSourcing\CommandBus\RetryOutdatedAggregateCommandBus; + +/** + * @var HandlerProvider $handlerProvider + * @var CommandBus $store + */ +$commandBus = new RetryOutdatedAggregateCommandBus( + $commandBus, +); +``` +And you need to mark the command class with the `#[RetryAggregateOutdated]` attribute, +if you want to retry the command when an `AggregateOutdated` exception occurs. + +```php +use Patchlevel\EventSourcing\Attribute\RetryAggregateOutdated; + +#[RetryAggregateOutdated] +final class CreateProfile +{ + public function __construct( + public readonly ProfileId $id, + public readonly string $name, + ) { + } +} +``` +!!! tip + + You can specify the maximum number of retries in the `#[RetryAggregateOutdated]` attribute. + The default value is 3. + ## Provider There are different types of providers that you can use to register handlers. diff --git a/docs/pages/repository.md b/docs/pages/repository.md index a313f5ab..cd0cd0a4 100644 --- a/docs/pages/repository.md +++ b/docs/pages/repository.md @@ -163,7 +163,7 @@ $profile = Profile::create($id, 'david.badura@patchlevel.de'); /** @var Repository $repository */ $repository->save($profile); ``` -!!! Warning +!!! warning All events are written to the database with one transaction in order to ensure data consistency. If an exception occurs during the save process, @@ -174,6 +174,12 @@ $repository->save($profile); Due to the nature of the aggregate having a playhead, we have a unique constraint that ensures that no race condition happens here. + An `AggregateOutdated` exception is thrown if a conflict occurs. + +!!! tip + + If you use the Command Bus, you can use the [RetryOutdatedAggregateCommandBus](command_bus.md#retry-outdated-aggregate-command-bus) + to retry the command when an `AggregateOutdated` exception occurs automatically. ### Load an aggregate diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index eeaad944..49202891 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -1,5 +1,23 @@ parameters: ignoreErrors: + - + message: '#^Dead catch \- Patchlevel\\EventSourcing\\Repository\\AggregateOutdated is never thrown in the try block\.$#' + identifier: catch.neverThrown + count: 1 + path: src/CommandBus/RetryOutdatedAggregateCommandBus.php + + - + message: '#^Method Patchlevel\\EventSourcing\\CommandBus\\RetryOutdatedAggregateCommandBus\:\:maxRetries\(\) is unused\.$#' + identifier: method.unused + count: 1 + path: src/CommandBus/RetryOutdatedAggregateCommandBus.php + + - + message: '#^Method Patchlevel\\EventSourcing\\CommandBus\\RetryOutdatedAggregateCommandBus\:\:maxRetries\(\) never returns null so it can be removed from the return type\.$#' + identifier: return.unusedType + count: 1 + path: src/CommandBus/RetryOutdatedAggregateCommandBus.php + - message: '#^Cannot unset offset ''url'' on array\{application_name\?\: string, charset\?\: string, dbname\?\: string, defaultTableOptions\?\: array\, driver\?\: ''ibm_db2''\|''mysqli''\|''oci8''\|''pdo_mysql''\|''pdo_oci''\|''pdo_pgsql''\|''pdo_sqlite''\|''pdo_sqlsrv''\|''pgsql''\|''sqlite3''\|''sqlsrv'', driverClass\?\: class\-string\, driverOptions\?\: array\, host\?\: string, \.\.\.\}\.$#' identifier: unset.offset diff --git a/src/Attribute/RetryAggregateOutdated.php b/src/Attribute/RetryAggregateOutdated.php new file mode 100644 index 00000000..0bd68749 --- /dev/null +++ b/src/Attribute/RetryAggregateOutdated.php @@ -0,0 +1,16 @@ +doDispatch($command, 0); + } + + private function doDispatch(object $command, int $retry, int|null $maxRetries = null): void + { + try { + $this->commandBus->dispatch($command); + } catch (AggregateOutdated $exception) { + $maxRetries ??= $this->maxRetries($command); + + if ($retry >= $maxRetries) { + throw $exception; + } + + $this->doDispatch($command, $retry + 1, $maxRetries); + } + } + + private function maxRetries(object $command): int|null + { + $reflectionClass = new ReflectionClass($command); + $attributes = $reflectionClass->getAttributes(RetryAggregateOutdated::class); + + if ($attributes === []) { + return 0; + } + + return $attributes[0]->newInstance()->maxRetries; + } +} diff --git a/tests/Unit/CommandBus/RetryOutdatedAggregateCommandBusTest.php b/tests/Unit/CommandBus/RetryOutdatedAggregateCommandBusTest.php new file mode 100644 index 00000000..81b84cc4 --- /dev/null +++ b/tests/Unit/CommandBus/RetryOutdatedAggregateCommandBusTest.php @@ -0,0 +1,143 @@ +createMock(CommandBus::class); + $innerCommandBus + ->expects($this->once()) + ->method('dispatch') + ->with($command); + + $retryCommandBus = new RetryOutdatedAggregateCommandBus($innerCommandBus); + + $retryCommandBus->dispatch($command); + + $this->assertTrue(true); // If no exception is thrown, the test passes + } + + public function testDispatchRetriesUntilSuccess(): void + { + $command = new #[RetryAggregateOutdated(maxRetries: 3)] + class { + public function __construct() + { + } + }; + + $innerCommandBus = $this->createMock(CommandBus::class); + $innerCommandBus + ->expects($this->exactly(3)) + ->method('dispatch') + ->with($command) + ->willReturnOnConsecutiveCalls( + $this->throwException(new AggregateOutdated('profile', ProfileId::fromString('profile'))), + $this->throwException(new AggregateOutdated('profile', ProfileId::fromString('profile'))), + null, // Success on the third attempt + ); + + $retryCommandBus = new RetryOutdatedAggregateCommandBus($innerCommandBus); + + $retryCommandBus->dispatch($command); + + $this->assertTrue(true); // If no exception is thrown, the test passes + } + + public function testDispatchThrowsAfterMaxRetries(): void + { + $command = new #[RetryAggregateOutdated(maxRetries: 2)] + class { + public function __construct() + { + } + }; + + $innerCommandBus = $this->createMock(CommandBus::class); + $innerCommandBus + ->expects($this->exactly(3)) + ->method('dispatch') + ->with($command) + ->willThrowException( + new AggregateOutdated( + 'profile', + ProfileId::fromString('profile'), + ), + ); + + $retryCommandBus = new RetryOutdatedAggregateCommandBus($innerCommandBus); + + $this->expectException(AggregateOutdated::class); + + $retryCommandBus->dispatch($command); + } + + public function testDispatchNotRetry(): void + { + $command = new class { + public function __construct() + { + } + }; + + $innerCommandBus = $this->createMock(CommandBus::class); + $innerCommandBus + ->expects($this->once()) + ->method('dispatch') + ->with($command) + ->willThrowException( + new AggregateOutdated( + 'profile', + ProfileId::fromString('profile'), + ), + ); + + $retryCommandBus = new RetryOutdatedAggregateCommandBus($innerCommandBus); + + $this->expectException(AggregateOutdated::class); + + $retryCommandBus->dispatch($command); + } + + public function testSkipOtherExceptions(): void + { + $command = new #[RetryAggregateOutdated(maxRetries: 2)] + class { + public function __construct() + { + } + }; + + $innerCommandBus = $this->createMock(CommandBus::class); + $innerCommandBus + ->expects($this->once()) + ->method('dispatch') + ->with($command) + ->willThrowException(new RuntimeException('Some other exception')); + + $retryCommandBus = new RetryOutdatedAggregateCommandBus($innerCommandBus); + + $this->expectException(RuntimeException::class); + + $retryCommandBus->dispatch($command); + } +}