From c105779ce7e63a73e3cda045daf579aeae2e4393 Mon Sep 17 00:00:00 2001 From: MarshmallowFox Date: Sun, 27 Jul 2025 14:20:58 +0500 Subject: [PATCH 1/7] Compatibility with updated MessageBus --- composer.lock | 220 +++++++++++++----------------------- src/PostgresStorage.php | 73 +++++++----- src/PostgresTransaction.php | 55 ++++----- 3 files changed, 152 insertions(+), 196 deletions(-) diff --git a/composer.lock b/composer.lock index d047307..7d9a346 100644 --- a/composer.lock +++ b/composer.lock @@ -456,86 +456,24 @@ }, "time": "2025-01-25T19:27:39+00:00" }, - { - "name": "thesis/message", - "version": "0.4.0", - "source": { - "type": "git", - "url": "https://github.com/thesis-php/message.git", - "reference": "33c891c54f96c115e7c0f78150095488619630a5" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/thesis-php/message/zipball/33c891c54f96c115e7c0f78150095488619630a5", - "reference": "33c891c54f96c115e7c0f78150095488619630a5", - "shasum": "" - }, - "require": { - "php": "^8.3" - }, - "require-dev": { - "bamarni/composer-bin-plugin": "^1.8.2" - }, - "type": "library", - "extra": { - "bamarni-bin": { - "bin-links": false, - "forward-command": true, - "target-directory": "tools" - } - }, - "autoload": { - "psr-4": { - "Thesis\\Message\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Valentin Udaltsov", - "email": "udaltsov.valentin@gmail.com" - }, - { - "name": "Thesis Team", - "homepage": "https://github.com/orgs/thesisphp/people" - } - ], - "description": "Thesis Message", - "support": { - "issues": "https://github.com/thesis-php/message/issues", - "source": "https://github.com/thesis-php/message/tree/0.4.0" - }, - "funding": [ - { - "url": "https://www.tinkoff.ru/cf/5MqZQas2dk7", - "type": "custom" - } - ], - "time": "2025-07-16T22:53:52+00:00" - }, { "name": "thesis/message-bus", "version": "0.4.x-dev", "source": { "type": "git", "url": "https://github.com/thesis-php/message-bus.git", - "reference": "6e34563a0cb17895a3fd613850943d5858fd14c7" + "reference": "0903768845016ae707621395bb95f3404bfa4a3e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/thesis-php/message-bus/zipball/6e34563a0cb17895a3fd613850943d5858fd14c7", - "reference": "6e34563a0cb17895a3fd613850943d5858fd14c7", + "url": "https://api.github.com/repos/thesis-php/message-bus/zipball/0903768845016ae707621395bb95f3404bfa4a3e", + "reference": "0903768845016ae707621395bb95f3404bfa4a3e", "shasum": "" }, "require": { - "amphp/amp": "^3.1", "php": "^8.4", "psr/clock": "^1.0", - "thesis/message": "^0.4.0", - "typhoon/formatter": "^0.1@dev" + "revolt/event-loop": "^1.0" }, "require-dev": { "amphp/postgres": "^2.1.1", @@ -544,9 +482,9 @@ "ext-pcntl": "*", "ext-pdo": "*", "phpunit/phpunit": "^12.2.7", - "symfony/var-dumper": "^6.4.15 || ^7.3.1", - "thesis/message-bus-persistence-postgres": "^0.1@dev" + "symfony/var-dumper": "^6.4.15 || ^7.3.1" }, + "default-branch": true, "type": "library", "extra": { "bamarni-bin": { @@ -588,79 +526,7 @@ "type": "custom" } ], - "time": "2025-07-18T01:34:14+00:00" - }, - { - "name": "typhoon/formatter", - "version": "0.1.x-dev", - "source": { - "type": "git", - "url": "https://github.com/typhoon-php/formatter.git", - "reference": "023568a25e0485501cf7119f8d288b7fa8eaf1fa" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/typhoon-php/formatter/zipball/023568a25e0485501cf7119f8d288b7fa8eaf1fa", - "reference": "023568a25e0485501cf7119f8d288b7fa8eaf1fa", - "shasum": "" - }, - "require": { - "php": "^8.1" - }, - "require-dev": { - "bamarni/composer-bin-plugin": "^1.8.2", - "phpunit/phpunit": "^10.5.46", - "symfony/var-dumper": "^6.4.21 || ^7.2.3" - }, - "default-branch": true, - "type": "library", - "extra": { - "bamarni-bin": { - "bin-links": false, - "forward-command": true, - "target-directory": "tools" - } - }, - "autoload": { - "files": [ - "src/format.fn.php", - "src/formatClass.fn.php", - "src/formatFunction.fn.php", - "src/formatParameter.fn.php", - "src/formatProperty.fn.php", - "src/formatReflectedClass.fn.php", - "src/formatReflectedFunction.fn.php", - "src/formatReflectedParameter.fn.php", - "src/formatReflectedProperty.fn.php", - "src/formatReflectedType.fn.php" - ] - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Valentin Udaltsov", - "email": "udaltsov.valentin@gmail.com" - }, - { - "name": "Typhoon Team", - "homepage": "https://github.com/orgs/typhoon-php/people" - } - ], - "description": "Typhoon Formatter", - "support": { - "issues": "https://github.com/typhoon-php/formatter/issues", - "source": "https://github.com/typhoon-php/formatter/tree/0.1.x" - }, - "funding": [ - { - "url": "https://www.tinkoff.ru/cf/5MqZQas2dk7", - "type": "custom" - } - ], - "time": "2025-06-03T10:22:45+00:00" + "time": "2025-07-26T22:07:13+00:00" } ], "packages-dev": [ @@ -2566,6 +2432,78 @@ } ], "time": "2024-03-03T12:36:25+00:00" + }, + { + "name": "typhoon/formatter", + "version": "0.1.x-dev", + "source": { + "type": "git", + "url": "https://github.com/typhoon-php/formatter.git", + "reference": "023568a25e0485501cf7119f8d288b7fa8eaf1fa" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/typhoon-php/formatter/zipball/023568a25e0485501cf7119f8d288b7fa8eaf1fa", + "reference": "023568a25e0485501cf7119f8d288b7fa8eaf1fa", + "shasum": "" + }, + "require": { + "php": "^8.1" + }, + "require-dev": { + "bamarni/composer-bin-plugin": "^1.8.2", + "phpunit/phpunit": "^10.5.46", + "symfony/var-dumper": "^6.4.21 || ^7.2.3" + }, + "default-branch": true, + "type": "library", + "extra": { + "bamarni-bin": { + "bin-links": false, + "forward-command": true, + "target-directory": "tools" + } + }, + "autoload": { + "files": [ + "src/format.fn.php", + "src/formatClass.fn.php", + "src/formatFunction.fn.php", + "src/formatParameter.fn.php", + "src/formatProperty.fn.php", + "src/formatReflectedClass.fn.php", + "src/formatReflectedFunction.fn.php", + "src/formatReflectedParameter.fn.php", + "src/formatReflectedProperty.fn.php", + "src/formatReflectedType.fn.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Valentin Udaltsov", + "email": "udaltsov.valentin@gmail.com" + }, + { + "name": "Typhoon Team", + "homepage": "https://github.com/orgs/typhoon-php/people" + } + ], + "description": "Typhoon Formatter", + "support": { + "issues": "https://github.com/typhoon-php/formatter/issues", + "source": "https://github.com/typhoon-php/formatter/tree/0.1.x" + }, + "funding": [ + { + "url": "https://www.tinkoff.ru/cf/5MqZQas2dk7", + "type": "custom" + } + ], + "time": "2025-06-03T10:22:45+00:00" } ], "aliases": [], diff --git a/src/PostgresStorage.php b/src/PostgresStorage.php index 8910db2..d66e99d 100644 --- a/src/PostgresStorage.php +++ b/src/PostgresStorage.php @@ -6,12 +6,11 @@ use Amp\Postgres\PostgresLink; use Amp\Postgres\PostgresTransaction as AmphpPostgresTransaction; -use Thesis\Message\Command; -use Thesis\Message\Event; +use Thesis\MessageBus\Endpoint; use Thesis\MessageBus\Envelope; use Thesis\MessageBus\Persistence\Outbox; use Thesis\MessageBus\Persistence\Storage; -use Thesis\MessageBus\Persistence\Transaction; +use Thesis\MessageBus\Persistence\LazyTransaction; /** * @api @@ -45,58 +44,74 @@ public function setup(): void ); } - public function beginTransaction(string $endpoint, string $incomingMessageId): Transaction + /** + * @return LazyTransaction + */ + public function createLazyTransaction(Endpoint $endpoint): LazyTransaction { return new PostgresTransaction( wrappedTransaction: $this->postgres->beginTransaction(), outboxTable: $this->outboxTable, endpoint: $endpoint, - incomingMessageId: $incomingMessageId, ); } - public function findOutbox(string $endpoint, string $incomingMessageId): ?Outbox + public function findOutboxes(Endpoint $endpoint, array $incomingMessageIds): array { - /** @var ?array{commands: string, events: string, dispatched: bool} $row */ - $row = $this + $placeholders = implode(',', array_fill(0, count($incomingMessageIds), '?')); + + $query = $this ->postgres ->execute( <<outboxTable} - where endpoint = ? and incoming_message_id = ? + where endpoint = ? and incoming_message_id in ({$placeholders}) SQL, - [$endpoint, $incomingMessageId], - ) - ->fetchRow(); + [$endpoint->toString(), ...$incomingMessageIds], + ); - if ($row === null) { - return null; - } + /** + * @var list $outboxes + */ + $outboxes = []; + do { + /** @var ?array{incoming_message_id: non-empty-string, commands: string, events: string, dispatched: bool} $row */ + $row = $query->fetchRow(); - /** @var list> */ - $commands = unserialize($row['commands']); - /** @var list> */ - $events = unserialize($row['events']); + if ($row !== null) { + /** @var list $commands */ + $commands = unserialize($row['commands']); + /** @var list $events */ + $events = unserialize($row['events']); - return new Outbox( - commands: $commands, - events: $events, - dispatched: $row['dispatched'], - ); + $outboxes[] = new Outbox( + incomingMessageId: $row['incoming_message_id'], + commands: $commands, + events: $events, + dispatched: $row['dispatched'], + ); + + $query = $query->getNextResult(); + } + } while ($row !== null && $query !== null); + + return $outboxes; } - public function markOutboxDispatched(string $endpoint, string $incomingMessageId): void + public function markOutboxesDispatched(Endpoint $endpoint, array $incomingMessageIds): void { + $placeholders = implode(',', array_fill(0, count($incomingMessageIds), '?')); + $this->postgres->execute( <<outboxTable} set dispatched_at = now() - where endpoint = ? and incoming_message_id = ? and dispatched_at is null + where endpoint = ? and incoming_message_id = ({$placeholders}) and dispatched_at is null SQL, [ - $endpoint, - $incomingMessageId, + $endpoint->toString(), + ...$incomingMessageIds, ], ); } diff --git a/src/PostgresTransaction.php b/src/PostgresTransaction.php index 8b49be9..ed4dea6 100644 --- a/src/PostgresTransaction.php +++ b/src/PostgresTransaction.php @@ -7,58 +7,61 @@ use Amp\Postgres\PostgresByteA; use Amp\Postgres\PostgresTransaction as AmphpPostgresTransaction; use Amp\Sql\SqlTransactionError; +use Thesis\MessageBus\Endpoint; use Thesis\MessageBus\Persistence\Outbox; use Thesis\MessageBus\Persistence\OutboxAlreadyExists; -use Thesis\MessageBus\Persistence\Transaction; +use Thesis\MessageBus\Persistence\LazyTransaction; use Thesis\MessageBus\Persistence\TransactionClosed; /** * @internal * @psalm-internal Thesis\MessageBus\Persistence\Postgres - * @implements Transaction + * @implements LazyTransaction */ -final readonly class PostgresTransaction implements Transaction +final readonly class PostgresTransaction implements LazyTransaction { /** * @param non-empty-string $outboxTable - * @param non-empty-string $endpoint - * @param non-empty-string $incomingMessageId */ public function __construct( public AmphpPostgresTransaction $wrappedTransaction, private string $outboxTable, - private string $endpoint, - private string $incomingMessageId, + private Endpoint $endpoint, ) {} - public function recordOutbox(Outbox $outbox): void + public function recordOutboxes(array $outboxes): void { - $dispatched = $outbox->dispatched ? 'now()' : 'null'; + /** + * @var Outbox $outbox + */ + foreach ($outboxes as $outbox) { + $dispatched = $outbox->dispatched ? 'now()' : 'null'; - try { - $result = $this->wrappedTransaction->execute( - <<wrappedTransaction->execute( + <<outboxTable} (endpoint, incoming_message_id, commands, events, dispatched_at) values (?, ?, ?, ?, {$dispatched}) on conflict (endpoint, incoming_message_id) do nothing SQL, - [ - $this->endpoint, - $this->incomingMessageId, - new PostgresByteA(serialize($outbox->commands)), - new PostgresByteA(serialize($outbox->events)), - ], - ); - } catch (SqlTransactionError $exception) { - throw new TransactionClosed(previous: $exception); - } + [ + $this->endpoint->toString(), + $outbox->incomingMessageId, + new PostgresByteA(serialize($outbox->commands)), + new PostgresByteA(serialize($outbox->events)), + ], + ); + } catch (SqlTransactionError $exception) { + throw new TransactionClosed(previous: $exception); + } - if ($result->getRowCount() !== 1) { - throw new OutboxAlreadyExists(); + if ($result->getRowCount() !== 1) { + throw new OutboxAlreadyExists(); + } } } - public function commit(): void + public function commitIfBegun(): void { try { $this->wrappedTransaction->commit(); @@ -67,7 +70,7 @@ public function commit(): void } } - public function rollback(): void + public function rollbackIfBegun(): void { try { $this->wrappedTransaction->rollback(); From 394891269563557bf301e4420bf7aed2e90e1f91 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Sun, 27 Jul 2025 12:41:50 +0300 Subject: [PATCH 2/7] Review --- ...action.php => PostgresLazyTransaction.php} | 19 ++++++---- src/PostgresStorage.php | 36 ++++++++----------- 2 files changed, 28 insertions(+), 27 deletions(-) rename src/{PostgresTransaction.php => PostgresLazyTransaction.php} (81%) diff --git a/src/PostgresTransaction.php b/src/PostgresLazyTransaction.php similarity index 81% rename from src/PostgresTransaction.php rename to src/PostgresLazyTransaction.php index ed4dea6..5a32f7a 100644 --- a/src/PostgresTransaction.php +++ b/src/PostgresLazyTransaction.php @@ -5,7 +5,7 @@ namespace Thesis\MessageBus\Persistence\Postgres; use Amp\Postgres\PostgresByteA; -use Amp\Postgres\PostgresTransaction as AmphpPostgresTransaction; +use Amp\Postgres\PostgresTransaction; use Amp\Sql\SqlTransactionError; use Thesis\MessageBus\Endpoint; use Thesis\MessageBus\Persistence\Outbox; @@ -16,15 +16,22 @@ /** * @internal * @psalm-internal Thesis\MessageBus\Persistence\Postgres - * @implements LazyTransaction + * @implements LazyTransaction */ -final readonly class PostgresTransaction implements LazyTransaction +final class PostgresLazyTransaction implements LazyTransaction { + private ?PostgresTransaction $postgresTransaction = null; + + public object $transaction { + get => $this->postgresTransaction ??= ($this->begin)(); + } + /** + * @param \Closure(): PostgresTransaction $begin * @param non-empty-string $outboxTable */ public function __construct( - public AmphpPostgresTransaction $wrappedTransaction, + private \Closure $begin, private string $outboxTable, private Endpoint $endpoint, ) {} @@ -64,7 +71,7 @@ public function recordOutboxes(array $outboxes): void public function commitIfBegun(): void { try { - $this->wrappedTransaction->commit(); + $this->postgresTransaction?->commit(); } catch (SqlTransactionError $exception) { throw new TransactionClosed(previous: $exception); } @@ -73,7 +80,7 @@ public function commitIfBegun(): void public function rollbackIfBegun(): void { try { - $this->wrappedTransaction->rollback(); + $this->postgresTransaction?->rollback(); } catch (SqlTransactionError $exception) { throw new TransactionClosed(previous: $exception); } diff --git a/src/PostgresStorage.php b/src/PostgresStorage.php index d66e99d..aaa154e 100644 --- a/src/PostgresStorage.php +++ b/src/PostgresStorage.php @@ -49,8 +49,8 @@ public function setup(): void */ public function createLazyTransaction(Endpoint $endpoint): LazyTransaction { - return new PostgresTransaction( - wrappedTransaction: $this->postgres->beginTransaction(), + return new PostgresLazyTransaction( + begin: $this->postgres->beginTransaction(...), outboxTable: $this->outboxTable, endpoint: $endpoint, ); @@ -60,7 +60,7 @@ public function findOutboxes(Endpoint $endpoint, array $incomingMessageIds): arr { $placeholders = implode(',', array_fill(0, count($incomingMessageIds), '?')); - $query = $this + $result = $this ->postgres ->execute( << $outboxes */ $outboxes = []; - do { - /** @var ?array{incoming_message_id: non-empty-string, commands: string, events: string, dispatched: bool} $row */ - $row = $query->fetchRow(); - if ($row !== null) { - /** @var list $commands */ - $commands = unserialize($row['commands']); - /** @var list $events */ - $events = unserialize($row['events']); + while (null !== $row = $result->fetchRow()) { + /** @var list $commands */ + $commands = unserialize($row['commands']); + /** @var list $events */ + $events = unserialize($row['events']); - $outboxes[] = new Outbox( - incomingMessageId: $row['incoming_message_id'], - commands: $commands, - events: $events, - dispatched: $row['dispatched'], - ); - - $query = $query->getNextResult(); - } - } while ($row !== null && $query !== null); + $outboxes[] = new Outbox( + incomingMessageId: $row['incoming_message_id'], + commands: $commands, + events: $events, + dispatched: $row['dispatched'], + ); + } return $outboxes; } From d7242e77a8a047fd80fc9d59ee0a09ca51ab3a23 Mon Sep 17 00:00:00 2001 From: MarshmallowFox Date: Sun, 27 Jul 2025 15:57:48 +0500 Subject: [PATCH 3/7] Improved insert --- src/PostgresLazyTransaction.php | 45 +++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/PostgresLazyTransaction.php b/src/PostgresLazyTransaction.php index 5a32f7a..e52cb7a 100644 --- a/src/PostgresLazyTransaction.php +++ b/src/PostgresLazyTransaction.php @@ -38,33 +38,46 @@ public function __construct( public function recordOutboxes(array $outboxes): void { + /** + * @var list $insertData + */ + $insertData = []; + + /** + * @var list $placeholders + */ + $placeholders = []; /** * @var Outbox $outbox */ foreach ($outboxes as $outbox) { - $dispatched = $outbox->dispatched ? 'now()' : 'null'; + array_push($insertData, + $this->endpoint->toString(), + $outbox->incomingMessageId, + new PostgresByteA(serialize($outbox->commands)), + new PostgresByteA(serialize($outbox->events)), + ); + + $placeholders[] = '(?, ?, ?, ?,' . ($outbox->dispatched ? 'now()' : 'null') . ')'; + } + + try { + $placeholders = implode(',', $placeholders); - try { - $result = $this->wrappedTransaction->execute( - <<transaction->execute( + <<outboxTable} (endpoint, incoming_message_id, commands, events, dispatched_at) - values (?, ?, ?, ?, {$dispatched}) + values {$placeholders} on conflict (endpoint, incoming_message_id) do nothing SQL, - [ - $this->endpoint->toString(), - $outbox->incomingMessageId, - new PostgresByteA(serialize($outbox->commands)), - new PostgresByteA(serialize($outbox->events)), - ], - ); - } catch (SqlTransactionError $exception) { - throw new TransactionClosed(previous: $exception); - } + $insertData, + ); - if ($result->getRowCount() !== 1) { + if ($result->getRowCount() !== count($outboxes)) { throw new OutboxAlreadyExists(); } + } catch (SqlTransactionError $exception) { + throw new TransactionClosed(previous: $exception); } } From 59db9200c5e0067cc6b1d364d4d8012e6cbfe2ff Mon Sep 17 00:00:00 2001 From: MarshmallowFox Date: Sun, 27 Jul 2025 15:58:04 +0500 Subject: [PATCH 4/7] Types of row --- src/PostgresStorage.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/PostgresStorage.php b/src/PostgresStorage.php index aaa154e..a10bac1 100644 --- a/src/PostgresStorage.php +++ b/src/PostgresStorage.php @@ -77,6 +77,10 @@ public function findOutboxes(Endpoint $endpoint, array $incomingMessageIds): arr $outboxes = []; while (null !== $row = $result->fetchRow()) { + /** + * @var array{commands: string, events: string, incoming_message_id: non-empty-string, dispatched: bool} $row + */ + /** @var list $commands */ $commands = unserialize($row['commands']); /** @var list $events */ From 6fbbf30350ee6cd274744ec24f7880e14433f71a Mon Sep 17 00:00:00 2001 From: MarshmallowFox Date: Sun, 27 Jul 2025 16:05:18 +0500 Subject: [PATCH 5/7] Transaction class --- src/PostgresStorage.php | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/PostgresStorage.php b/src/PostgresStorage.php index a10bac1..a9bcac1 100644 --- a/src/PostgresStorage.php +++ b/src/PostgresStorage.php @@ -16,8 +16,14 @@ * @api * @implements Storage */ -final readonly class PostgresStorage implements Storage +final class PostgresStorage implements Storage { + public array $transactionClasses { + get { + return [AmphpPostgresTransaction::class]; + } + } + /** * @param non-empty-string $outboxTable */ From c064236e22bd86eb63c28c050711832d8415cc0c Mon Sep 17 00:00:00 2001 From: MarshmallowFox Date: Sun, 27 Jul 2025 18:07:20 +0500 Subject: [PATCH 6/7] Removing no needed types --- src/PostgresLazyTransaction.php | 28 +++++++++------------------- src/PostgresStorage.php | 10 ++-------- 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/src/PostgresLazyTransaction.php b/src/PostgresLazyTransaction.php index e52cb7a..4696b5a 100644 --- a/src/PostgresLazyTransaction.php +++ b/src/PostgresLazyTransaction.php @@ -38,25 +38,15 @@ public function __construct( public function recordOutboxes(array $outboxes): void { - /** - * @var list $insertData - */ $insertData = []; - /** - * @var list $placeholders - */ $placeholders = []; - /** - * @var Outbox $outbox - */ + foreach ($outboxes as $outbox) { - array_push($insertData, - $this->endpoint->toString(), - $outbox->incomingMessageId, - new PostgresByteA(serialize($outbox->commands)), - new PostgresByteA(serialize($outbox->events)), - ); + $insertData[] = $this->endpoint->toString(); + $insertData[] = $outbox->incomingMessageId; + $insertData[] = new PostgresByteA(serialize($outbox->commands)); + $insertData[] = new PostgresByteA(serialize($outbox->events)); $placeholders[] = '(?, ?, ?, ?,' . ($outbox->dispatched ? 'now()' : 'null') . ')'; } @@ -72,13 +62,13 @@ public function recordOutboxes(array $outboxes): void SQL, $insertData, ); - - if ($result->getRowCount() !== count($outboxes)) { - throw new OutboxAlreadyExists(); - } } catch (SqlTransactionError $exception) { throw new TransactionClosed(previous: $exception); } + + if ($result->getRowCount() !== count($outboxes)) { + throw new OutboxAlreadyExists(); + } } public function commitIfBegun(): void diff --git a/src/PostgresStorage.php b/src/PostgresStorage.php index a9bcac1..6499e0e 100644 --- a/src/PostgresStorage.php +++ b/src/PostgresStorage.php @@ -28,8 +28,8 @@ final class PostgresStorage implements Storage * @param non-empty-string $outboxTable */ public function __construct( - private PostgresLink $postgres, - private string $outboxTable = 'outbox', + private readonly PostgresLink $postgres, + private readonly string $outboxTable = 'outbox', ) {} public function setup(): void @@ -50,9 +50,6 @@ public function setup(): void ); } - /** - * @return LazyTransaction - */ public function createLazyTransaction(Endpoint $endpoint): LazyTransaction { return new PostgresLazyTransaction( @@ -77,9 +74,6 @@ public function findOutboxes(Endpoint $endpoint, array $incomingMessageIds): arr [$endpoint->toString(), ...$incomingMessageIds], ); - /** - * @var list $outboxes - */ $outboxes = []; while (null !== $row = $result->fetchRow()) { From 8c8b6d32b6d7eff09a9c55e94a065e7a271cb1e8 Mon Sep 17 00:00:00 2001 From: Valentin Udaltsov Date: Mon, 28 Jul 2025 01:36:05 +0300 Subject: [PATCH 7/7] Imrpovements after review --- src/PostgresLazyTransaction.php | 32 +++++++++++++++---------------- src/PostgresStorage.php | 34 ++++++++++++++++++--------------- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/src/PostgresLazyTransaction.php b/src/PostgresLazyTransaction.php index 4696b5a..79b187a 100644 --- a/src/PostgresLazyTransaction.php +++ b/src/PostgresLazyTransaction.php @@ -8,9 +8,8 @@ use Amp\Postgres\PostgresTransaction; use Amp\Sql\SqlTransactionError; use Thesis\MessageBus\Endpoint; -use Thesis\MessageBus\Persistence\Outbox; -use Thesis\MessageBus\Persistence\OutboxAlreadyExists; use Thesis\MessageBus\Persistence\LazyTransaction; +use Thesis\MessageBus\Persistence\OutboxAlreadyExists; use Thesis\MessageBus\Persistence\TransactionClosed; /** @@ -31,42 +30,41 @@ final class PostgresLazyTransaction implements LazyTransaction * @param non-empty-string $outboxTable */ public function __construct( - private \Closure $begin, - private string $outboxTable, - private Endpoint $endpoint, + private readonly \Closure $begin, + private readonly string $outboxTable, + private readonly Endpoint $endpoint, ) {} public function recordOutboxes(array $outboxes): void { - $insertData = []; - - $placeholders = []; + $placeholderGroups = []; + $params = []; foreach ($outboxes as $outbox) { - $insertData[] = $this->endpoint->toString(); - $insertData[] = $outbox->incomingMessageId; - $insertData[] = new PostgresByteA(serialize($outbox->commands)); - $insertData[] = new PostgresByteA(serialize($outbox->events)); + $placeholderGroups[] = '(?, ?, ?, ?, ' . ($outbox->dispatched ? 'now()' : 'null') . ')'; - $placeholders[] = '(?, ?, ?, ?,' . ($outbox->dispatched ? 'now()' : 'null') . ')'; + $params[] = $this->endpoint->toString(); + $params[] = $outbox->incomingMessageId; + $params[] = new PostgresByteA(serialize($outbox->commands)); + $params[] = new PostgresByteA(serialize($outbox->events)); } - try { - $placeholders = implode(',', $placeholders); + $placeholders = implode(',', $placeholderGroups); + try { $result = $this->transaction->execute( <<outboxTable} (endpoint, incoming_message_id, commands, events, dispatched_at) values {$placeholders} on conflict (endpoint, incoming_message_id) do nothing SQL, - $insertData, + $params, ); } catch (SqlTransactionError $exception) { throw new TransactionClosed(previous: $exception); } - if ($result->getRowCount() !== count($outboxes)) { + if ($result->getRowCount() !== \count($outboxes)) { throw new OutboxAlreadyExists(); } } diff --git a/src/PostgresStorage.php b/src/PostgresStorage.php index 6499e0e..3eb4a7a 100644 --- a/src/PostgresStorage.php +++ b/src/PostgresStorage.php @@ -5,23 +5,21 @@ namespace Thesis\MessageBus\Persistence\Postgres; use Amp\Postgres\PostgresLink; -use Amp\Postgres\PostgresTransaction as AmphpPostgresTransaction; +use Amp\Postgres\PostgresTransaction; use Thesis\MessageBus\Endpoint; use Thesis\MessageBus\Envelope; +use Thesis\MessageBus\Persistence\LazyTransaction; use Thesis\MessageBus\Persistence\Outbox; use Thesis\MessageBus\Persistence\Storage; -use Thesis\MessageBus\Persistence\LazyTransaction; /** * @api - * @implements Storage + * @implements Storage */ final class PostgresStorage implements Storage { public array $transactionClasses { - get { - return [AmphpPostgresTransaction::class]; - } + get => [PostgresTransaction::class]; } /** @@ -61,26 +59,30 @@ public function createLazyTransaction(Endpoint $endpoint): LazyTransaction public function findOutboxes(Endpoint $endpoint, array $incomingMessageIds): array { - $placeholders = implode(',', array_fill(0, count($incomingMessageIds), '?')); + $placeholders = str_repeat('?, ', \count($incomingMessageIds) - 1) . '?'; $result = $this ->postgres ->execute( <<outboxTable} where endpoint = ? and incoming_message_id in ({$placeholders}) SQL, - [$endpoint->toString(), ...$incomingMessageIds], + [ + $endpoint->toString(), + ...$incomingMessageIds, + ], ); $outboxes = []; while (null !== $row = $result->fetchRow()) { - /** - * @var array{commands: string, events: string, incoming_message_id: non-empty-string, dispatched: bool} $row - */ - + /** @var array{commands: string, events: string, incoming_message_id: non-empty-string, dispatched: bool} $row */ /** @var list $commands */ $commands = unserialize($row['commands']); /** @var list $events */ @@ -99,13 +101,15 @@ public function findOutboxes(Endpoint $endpoint, array $incomingMessageIds): arr public function markOutboxesDispatched(Endpoint $endpoint, array $incomingMessageIds): void { - $placeholders = implode(',', array_fill(0, count($incomingMessageIds), '?')); + $placeholders = str_repeat('?, ', \count($incomingMessageIds) - 1) . '?'; $this->postgres->execute( <<outboxTable} set dispatched_at = now() - where endpoint = ? and incoming_message_id = ({$placeholders}) and dispatched_at is null + where endpoint = ? + and incoming_message_id in ({$placeholders}) + and dispatched_at is null SQL, [ $endpoint->toString(),