diff --git a/composer.lock b/composer.lock index d047307..7d9a346 100644 --- a/composer.lock +++ b/composer.lock @@ -456,86 +456,24 @@ }, "time": "2025-01-25T19:27:39+00:00" }, - { - "name": "thesis/message", - "version": "0.4.0", - "source": { - "type": "git", - "url": "https://github.com/thesis-php/message.git", - "reference": "33c891c54f96c115e7c0f78150095488619630a5" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/thesis-php/message/zipball/33c891c54f96c115e7c0f78150095488619630a5", - "reference": "33c891c54f96c115e7c0f78150095488619630a5", - "shasum": "" - }, - "require": { - "php": "^8.3" - }, - "require-dev": { - "bamarni/composer-bin-plugin": "^1.8.2" - }, - "type": "library", - "extra": { - "bamarni-bin": { - "bin-links": false, - "forward-command": true, - "target-directory": "tools" - } - }, - "autoload": { - "psr-4": { - "Thesis\\Message\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Valentin Udaltsov", - "email": "udaltsov.valentin@gmail.com" - }, - { - "name": "Thesis Team", - "homepage": "https://github.com/orgs/thesisphp/people" - } - ], - "description": "Thesis Message", - "support": { - "issues": "https://github.com/thesis-php/message/issues", - "source": "https://github.com/thesis-php/message/tree/0.4.0" - }, - "funding": [ - { - "url": "https://www.tinkoff.ru/cf/5MqZQas2dk7", - "type": "custom" - } - ], - "time": "2025-07-16T22:53:52+00:00" - }, { "name": "thesis/message-bus", "version": "0.4.x-dev", "source": { "type": "git", "url": "https://github.com/thesis-php/message-bus.git", - "reference": "6e34563a0cb17895a3fd613850943d5858fd14c7" + "reference": "0903768845016ae707621395bb95f3404bfa4a3e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/thesis-php/message-bus/zipball/6e34563a0cb17895a3fd613850943d5858fd14c7", - "reference": "6e34563a0cb17895a3fd613850943d5858fd14c7", + "url": "https://api.github.com/repos/thesis-php/message-bus/zipball/0903768845016ae707621395bb95f3404bfa4a3e", + "reference": "0903768845016ae707621395bb95f3404bfa4a3e", "shasum": "" }, "require": { - "amphp/amp": "^3.1", "php": "^8.4", "psr/clock": "^1.0", - "thesis/message": "^0.4.0", - "typhoon/formatter": "^0.1@dev" + "revolt/event-loop": "^1.0" }, "require-dev": { "amphp/postgres": "^2.1.1", @@ -544,9 +482,9 @@ "ext-pcntl": "*", "ext-pdo": "*", "phpunit/phpunit": "^12.2.7", - "symfony/var-dumper": "^6.4.15 || ^7.3.1", - "thesis/message-bus-persistence-postgres": "^0.1@dev" + "symfony/var-dumper": "^6.4.15 || ^7.3.1" }, + "default-branch": true, "type": "library", "extra": { "bamarni-bin": { @@ -588,79 +526,7 @@ "type": "custom" } ], - "time": "2025-07-18T01:34:14+00:00" - }, - { - "name": "typhoon/formatter", - "version": "0.1.x-dev", - "source": { - "type": "git", - "url": "https://github.com/typhoon-php/formatter.git", - "reference": "023568a25e0485501cf7119f8d288b7fa8eaf1fa" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/typhoon-php/formatter/zipball/023568a25e0485501cf7119f8d288b7fa8eaf1fa", - "reference": "023568a25e0485501cf7119f8d288b7fa8eaf1fa", - "shasum": "" - }, - "require": { - "php": "^8.1" - }, - "require-dev": { - "bamarni/composer-bin-plugin": "^1.8.2", - "phpunit/phpunit": "^10.5.46", - "symfony/var-dumper": "^6.4.21 || ^7.2.3" - }, - "default-branch": true, - "type": "library", - "extra": { - "bamarni-bin": { - "bin-links": false, - "forward-command": true, - "target-directory": "tools" - } - }, - "autoload": { - "files": [ - "src/format.fn.php", - "src/formatClass.fn.php", - "src/formatFunction.fn.php", - "src/formatParameter.fn.php", - "src/formatProperty.fn.php", - "src/formatReflectedClass.fn.php", - "src/formatReflectedFunction.fn.php", - "src/formatReflectedParameter.fn.php", - "src/formatReflectedProperty.fn.php", - "src/formatReflectedType.fn.php" - ] - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Valentin Udaltsov", - "email": "udaltsov.valentin@gmail.com" - }, - { - "name": "Typhoon Team", - "homepage": "https://github.com/orgs/typhoon-php/people" - } - ], - "description": "Typhoon Formatter", - "support": { - "issues": "https://github.com/typhoon-php/formatter/issues", - "source": "https://github.com/typhoon-php/formatter/tree/0.1.x" - }, - "funding": [ - { - "url": "https://www.tinkoff.ru/cf/5MqZQas2dk7", - "type": "custom" - } - ], - "time": "2025-06-03T10:22:45+00:00" + "time": "2025-07-26T22:07:13+00:00" } ], "packages-dev": [ @@ -2566,6 +2432,78 @@ } ], "time": "2024-03-03T12:36:25+00:00" + }, + { + "name": "typhoon/formatter", + "version": "0.1.x-dev", + "source": { + "type": "git", + "url": "https://github.com/typhoon-php/formatter.git", + "reference": "023568a25e0485501cf7119f8d288b7fa8eaf1fa" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/typhoon-php/formatter/zipball/023568a25e0485501cf7119f8d288b7fa8eaf1fa", + "reference": "023568a25e0485501cf7119f8d288b7fa8eaf1fa", + "shasum": "" + }, + "require": { + "php": "^8.1" + }, + "require-dev": { + "bamarni/composer-bin-plugin": "^1.8.2", + "phpunit/phpunit": "^10.5.46", + "symfony/var-dumper": "^6.4.21 || ^7.2.3" + }, + "default-branch": true, + "type": "library", + "extra": { + "bamarni-bin": { + "bin-links": false, + "forward-command": true, + "target-directory": "tools" + } + }, + "autoload": { + "files": [ + "src/format.fn.php", + "src/formatClass.fn.php", + "src/formatFunction.fn.php", + "src/formatParameter.fn.php", + "src/formatProperty.fn.php", + "src/formatReflectedClass.fn.php", + "src/formatReflectedFunction.fn.php", + "src/formatReflectedParameter.fn.php", + "src/formatReflectedProperty.fn.php", + "src/formatReflectedType.fn.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Valentin Udaltsov", + "email": "udaltsov.valentin@gmail.com" + }, + { + "name": "Typhoon Team", + "homepage": "https://github.com/orgs/typhoon-php/people" + } + ], + "description": "Typhoon Formatter", + "support": { + "issues": "https://github.com/typhoon-php/formatter/issues", + "source": "https://github.com/typhoon-php/formatter/tree/0.1.x" + }, + "funding": [ + { + "url": "https://www.tinkoff.ru/cf/5MqZQas2dk7", + "type": "custom" + } + ], + "time": "2025-06-03T10:22:45+00:00" } ], "aliases": [], diff --git a/src/PostgresLazyTransaction.php b/src/PostgresLazyTransaction.php new file mode 100644 index 0000000..79b187a --- /dev/null +++ b/src/PostgresLazyTransaction.php @@ -0,0 +1,89 @@ + + */ +final class PostgresLazyTransaction implements LazyTransaction +{ + private ?PostgresTransaction $postgresTransaction = null; + + public object $transaction { + get => $this->postgresTransaction ??= ($this->begin)(); + } + + /** + * @param \Closure(): PostgresTransaction $begin + * @param non-empty-string $outboxTable + */ + public function __construct( + private readonly \Closure $begin, + private readonly string $outboxTable, + private readonly Endpoint $endpoint, + ) {} + + public function recordOutboxes(array $outboxes): void + { + $placeholderGroups = []; + $params = []; + + foreach ($outboxes as $outbox) { + $placeholderGroups[] = '(?, ?, ?, ?, ' . ($outbox->dispatched ? 'now()' : 'null') . ')'; + + $params[] = $this->endpoint->toString(); + $params[] = $outbox->incomingMessageId; + $params[] = new PostgresByteA(serialize($outbox->commands)); + $params[] = new PostgresByteA(serialize($outbox->events)); + } + + $placeholders = implode(',', $placeholderGroups); + + try { + $result = $this->transaction->execute( + <<outboxTable} (endpoint, incoming_message_id, commands, events, dispatched_at) + values {$placeholders} + on conflict (endpoint, incoming_message_id) do nothing + SQL, + $params, + ); + } catch (SqlTransactionError $exception) { + throw new TransactionClosed(previous: $exception); + } + + if ($result->getRowCount() !== \count($outboxes)) { + throw new OutboxAlreadyExists(); + } + } + + public function commitIfBegun(): void + { + try { + $this->postgresTransaction?->commit(); + } catch (SqlTransactionError $exception) { + throw new TransactionClosed(previous: $exception); + } + } + + public function rollbackIfBegun(): void + { + try { + $this->postgresTransaction?->rollback(); + } catch (SqlTransactionError $exception) { + throw new TransactionClosed(previous: $exception); + } + } +} diff --git a/src/PostgresStorage.php b/src/PostgresStorage.php index 8910db2..3eb4a7a 100644 --- a/src/PostgresStorage.php +++ b/src/PostgresStorage.php @@ -5,26 +5,29 @@ namespace Thesis\MessageBus\Persistence\Postgres; use Amp\Postgres\PostgresLink; -use Amp\Postgres\PostgresTransaction as AmphpPostgresTransaction; -use Thesis\Message\Command; -use Thesis\Message\Event; +use Amp\Postgres\PostgresTransaction; +use Thesis\MessageBus\Endpoint; use Thesis\MessageBus\Envelope; +use Thesis\MessageBus\Persistence\LazyTransaction; use Thesis\MessageBus\Persistence\Outbox; use Thesis\MessageBus\Persistence\Storage; -use Thesis\MessageBus\Persistence\Transaction; /** * @api - * @implements Storage + * @implements Storage */ -final readonly class PostgresStorage implements Storage +final class PostgresStorage implements Storage { + public array $transactionClasses { + get => [PostgresTransaction::class]; + } + /** * @param non-empty-string $outboxTable */ public function __construct( - private PostgresLink $postgres, - private string $outboxTable = 'outbox', + private readonly PostgresLink $postgres, + private readonly string $outboxTable = 'outbox', ) {} public function setup(): void @@ -45,58 +48,72 @@ public function setup(): void ); } - public function beginTransaction(string $endpoint, string $incomingMessageId): Transaction + public function createLazyTransaction(Endpoint $endpoint): LazyTransaction { - return new PostgresTransaction( - wrappedTransaction: $this->postgres->beginTransaction(), + return new PostgresLazyTransaction( + begin: $this->postgres->beginTransaction(...), outboxTable: $this->outboxTable, endpoint: $endpoint, - incomingMessageId: $incomingMessageId, ); } - public function findOutbox(string $endpoint, string $incomingMessageId): ?Outbox + public function findOutboxes(Endpoint $endpoint, array $incomingMessageIds): array { - /** @var ?array{commands: string, events: string, dispatched: bool} $row */ - $row = $this + $placeholders = str_repeat('?, ', \count($incomingMessageIds) - 1) . '?'; + + $result = $this ->postgres ->execute( <<outboxTable} - where endpoint = ? and incoming_message_id = ? + where endpoint = ? and incoming_message_id in ({$placeholders}) SQL, - [$endpoint, $incomingMessageId], - ) - ->fetchRow(); + [ + $endpoint->toString(), + ...$incomingMessageIds, + ], + ); - if ($row === null) { - return null; - } + $outboxes = []; - /** @var list> */ - $commands = unserialize($row['commands']); - /** @var list> */ - $events = unserialize($row['events']); + while (null !== $row = $result->fetchRow()) { + /** @var array{commands: string, events: string, incoming_message_id: non-empty-string, dispatched: bool} $row */ + /** @var list $commands */ + $commands = unserialize($row['commands']); + /** @var list $events */ + $events = unserialize($row['events']); - return new Outbox( - commands: $commands, - events: $events, - dispatched: $row['dispatched'], - ); + $outboxes[] = new Outbox( + incomingMessageId: $row['incoming_message_id'], + commands: $commands, + events: $events, + dispatched: $row['dispatched'], + ); + } + + return $outboxes; } - public function markOutboxDispatched(string $endpoint, string $incomingMessageId): void + public function markOutboxesDispatched(Endpoint $endpoint, array $incomingMessageIds): void { + $placeholders = str_repeat('?, ', \count($incomingMessageIds) - 1) . '?'; + $this->postgres->execute( <<outboxTable} set dispatched_at = now() - where endpoint = ? and incoming_message_id = ? and dispatched_at is null + where endpoint = ? + and incoming_message_id in ({$placeholders}) + and dispatched_at is null SQL, [ - $endpoint, - $incomingMessageId, + $endpoint->toString(), + ...$incomingMessageIds, ], ); } diff --git a/src/PostgresTransaction.php b/src/PostgresTransaction.php deleted file mode 100644 index 8b49be9..0000000 --- a/src/PostgresTransaction.php +++ /dev/null @@ -1,78 +0,0 @@ - - */ -final readonly class PostgresTransaction implements Transaction -{ - /** - * @param non-empty-string $outboxTable - * @param non-empty-string $endpoint - * @param non-empty-string $incomingMessageId - */ - public function __construct( - public AmphpPostgresTransaction $wrappedTransaction, - private string $outboxTable, - private string $endpoint, - private string $incomingMessageId, - ) {} - - public function recordOutbox(Outbox $outbox): void - { - $dispatched = $outbox->dispatched ? 'now()' : 'null'; - - try { - $result = $this->wrappedTransaction->execute( - <<outboxTable} (endpoint, incoming_message_id, commands, events, dispatched_at) - values (?, ?, ?, ?, {$dispatched}) - on conflict (endpoint, incoming_message_id) do nothing - SQL, - [ - $this->endpoint, - $this->incomingMessageId, - new PostgresByteA(serialize($outbox->commands)), - new PostgresByteA(serialize($outbox->events)), - ], - ); - } catch (SqlTransactionError $exception) { - throw new TransactionClosed(previous: $exception); - } - - if ($result->getRowCount() !== 1) { - throw new OutboxAlreadyExists(); - } - } - - public function commit(): void - { - try { - $this->wrappedTransaction->commit(); - } catch (SqlTransactionError $exception) { - throw new TransactionClosed(previous: $exception); - } - } - - public function rollback(): void - { - try { - $this->wrappedTransaction->rollback(); - } catch (SqlTransactionError $exception) { - throw new TransactionClosed(previous: $exception); - } - } -}