From 8498f5940fe3beb7baab2b5f5a2ec3926da03d87 Mon Sep 17 00:00:00 2001 From: Dmitriy Derepko Date: Fri, 12 Jan 2024 13:02:46 +0700 Subject: [PATCH] Introduce Envelope (#185) --- composer.json | 4 +- src/Adapter/AdapterInterface.php | 4 +- src/Adapter/SynchronousAdapter.php | 7 ++-- src/Debug/QueueDecorator.php | 2 +- src/Exception/JobFailureException.php | 3 +- src/Message/EnvelopeInterface.php | 15 ++++++++ src/Message/EnvelopeTrait.php | 38 +++++++++++++++++++ src/Message/IdEnvelope.php | 38 +++++++++++++++++++ src/Message/Message.php | 11 ------ src/Message/MessageInterface.php | 9 ----- .../FailureHandling/FailureEnvelope.php | 25 ++++++++++++ .../ExponentialDelayMiddleware.php | 18 ++++----- .../Implementation/SendAgainMiddleware.php | 24 +++++------- src/Middleware/Push/AdapterPushHandler.php | 6 +-- src/Queue.php | 8 ++-- src/QueueInterface.php | 2 +- src/Worker/Worker.php | 3 +- tests/App/DummyQueue.php | 4 +- tests/App/FakeAdapter.php | 6 ++- .../ExponentialDelayMiddlewareTest.php | 6 ++- tests/Unit/QueueTest.php | 14 +++++-- tests/Unit/SynchronousAdapterTest.php | 20 ++++++---- tests/Unit/WorkerTest.php | 2 +- 23 files changed, 188 insertions(+), 81 deletions(-) create mode 100644 src/Message/EnvelopeInterface.php create mode 100644 src/Message/EnvelopeTrait.php create mode 100644 src/Message/IdEnvelope.php create mode 100644 src/Middleware/FailureHandling/FailureEnvelope.php diff --git a/composer.json b/composer.json index f57717e3..b4f1e0fe 100644 --- a/composer.json +++ b/composer.json @@ -40,9 +40,9 @@ "rector/rector": "^0.19.0", "roave/infection-static-analysis-plugin": "^1.16", "spatie/phpunit-watcher": "^1.23", - "yiisoft/yii-debug": "dev-master", "vimeo/psalm": "^4.30|^5.8", - "yiisoft/test-support": "^3.0" + "yiisoft/test-support": "^3.0", + "yiisoft/yii-debug": "dev-master|dev-php80" }, "suggest": { "ext-pcntl": "Need for process signals" diff --git a/src/Adapter/AdapterInterface.php b/src/Adapter/AdapterInterface.php index 86e01e07..856355bc 100644 --- a/src/Adapter/AdapterInterface.php +++ b/src/Adapter/AdapterInterface.php @@ -26,12 +26,12 @@ public function runExisting(callable $handlerCallback): void; * * @return JobStatus */ - public function status(string $id): JobStatus; + public function status(string|int $id): JobStatus; /** * Pushing a message to the queue. Adapter sets message ID if available. */ - public function push(MessageInterface $message): void; + public function push(MessageInterface $message): MessageInterface; /** * Listen to the queue and pass messages to the given handler as they come. diff --git a/src/Adapter/SynchronousAdapter.php b/src/Adapter/SynchronousAdapter.php index 889efc97..e0430733 100644 --- a/src/Adapter/SynchronousAdapter.php +++ b/src/Adapter/SynchronousAdapter.php @@ -10,6 +10,7 @@ use Yiisoft\Queue\QueueFactory; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Worker\WorkerInterface; +use Yiisoft\Queue\Message\IdEnvelope; final class SynchronousAdapter implements AdapterInterface { @@ -42,7 +43,7 @@ public function runExisting(callable $handlerCallback): void } } - public function status(string $id): JobStatus + public function status(string|int $id): JobStatus { $id = (int) $id; @@ -61,12 +62,12 @@ public function status(string $id): JobStatus throw new InvalidArgumentException('There is no message with the given ID.'); } - public function push(MessageInterface $message): void + public function push(MessageInterface $message): MessageInterface { $key = count($this->messages) + $this->current; $this->messages[] = $message; - $message->setId((string) $key); + return new IdEnvelope($message, $key); } public function subscribe(callable $handlerCallback): void diff --git a/src/Debug/QueueDecorator.php b/src/Debug/QueueDecorator.php index f3403c93..4ebee030 100644 --- a/src/Debug/QueueDecorator.php +++ b/src/Debug/QueueDecorator.php @@ -18,7 +18,7 @@ public function __construct( ) { } - public function status(string $id): JobStatus + public function status(string|int $id): JobStatus { $result = $this->queue->status($id); $this->collector->collectStatus($id, $result); diff --git a/src/Exception/JobFailureException.php b/src/Exception/JobFailureException.php index 0983f517..acfeee3b 100644 --- a/src/Exception/JobFailureException.php +++ b/src/Exception/JobFailureException.php @@ -7,13 +7,14 @@ use RuntimeException; use Throwable; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Message\IdEnvelope; class JobFailureException extends RuntimeException { public function __construct(private MessageInterface $queueMessage, Throwable $previous) { $error = $previous->getMessage(); - $messageId = $queueMessage->getId() ?? 'null'; + $messageId = $queueMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null'; $messageText = "Processing of message #$messageId is stopped because of an exception:\n$error."; parent::__construct($messageText, 0, $previous); diff --git a/src/Message/EnvelopeInterface.php b/src/Message/EnvelopeInterface.php new file mode 100644 index 00000000..bf336112 --- /dev/null +++ b/src/Message/EnvelopeInterface.php @@ -0,0 +1,15 @@ +message; + } + + public function withMessage(MessageInterface $message): self + { + $instance = clone $this; + $instance->message = $message; + + return $instance; + } + + public function getHandlerName(): string + { + return $this->message->getHandlerName(); + } + + public function getData(): mixed + { + return $this->message->getData(); + } + + public function getMetadata(): array + { + return $this->message->getMetadata(); + } +} diff --git a/src/Message/IdEnvelope.php b/src/Message/IdEnvelope.php new file mode 100644 index 00000000..a1ffccad --- /dev/null +++ b/src/Message/IdEnvelope.php @@ -0,0 +1,38 @@ +id = $id; + } + + public function getId(): string|int|null + { + return $this->id ?? $this->message->getMetadata()[self::MESSAGE_ID_KEY] ?? null; + } + + public function getMetadata(): array + { + return array_merge($this->message->getMetadata(), [ + self::MESSAGE_ID_KEY => $this->getId(), + ]); + } +} diff --git a/src/Message/Message.php b/src/Message/Message.php index 4604b0bd..07278d1f 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -15,7 +15,6 @@ public function __construct( private string $handlerName, private mixed $data, private array $metadata = [], - private ?string $id = null ) { } @@ -29,16 +28,6 @@ public function getData(): mixed return $this->data; } - public function setId(?string $id): void - { - $this->id = $id; - } - - public function getId(): ?string - { - return $this->id; - } - public function getMetadata(): array { return $this->metadata; diff --git a/src/Message/MessageInterface.php b/src/Message/MessageInterface.php index 85c2bf17..c65f32fd 100644 --- a/src/Message/MessageInterface.php +++ b/src/Message/MessageInterface.php @@ -6,15 +6,6 @@ interface MessageInterface { - public function setId(?string $id): void; - - /** - * Returns unique message ID. - * - * @return string|null - */ - public function getId(): ?string; - /** * Returns handler name. * diff --git a/src/Middleware/FailureHandling/FailureEnvelope.php b/src/Middleware/FailureHandling/FailureEnvelope.php new file mode 100644 index 00000000..3af8d24c --- /dev/null +++ b/src/Middleware/FailureHandling/FailureEnvelope.php @@ -0,0 +1,25 @@ +message->getMetadata(), $this->meta); + } +} diff --git a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php index 4e1b08ab..1997367f 100644 --- a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php +++ b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php @@ -12,6 +12,7 @@ use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface; use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface; use Yiisoft\Queue\QueueInterface; +use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope; /** * Failure strategy which resends the given message to a queue with an exponentially increasing delay. @@ -62,15 +63,12 @@ public function processFailure( ): FailureHandlingRequest { $message = $request->getMessage(); if ($this->suites($message)) { - $messageNew = new Message( - handlerName: $message->getHandlerName(), - data: $message->getData(), - metadata: $this->formNewMeta($message), - id: $message->getId(), - ); - ($this->queue ?? $request->getQueue())->push( - $messageNew, - $this->delayMiddleware->withDelay($this->getDelay($message)) + $envelope = new FailureEnvelope($message, $this->createNewMeta($message)); + $queue = $this->queue ?? $request->getQueue(); + $middlewareDefinitions = $this->delayMiddleware->withDelay($this->getDelay($envelope)); + $messageNew = $queue->push( + $envelope, + $middlewareDefinitions ); return $request->withMessage($messageNew); @@ -84,7 +82,7 @@ private function suites(MessageInterface $message): bool return $this->maxAttempts > $this->getAttempts($message); } - private function formNewMeta(MessageInterface $message): array + private function createNewMeta(MessageInterface $message): array { $meta = $message->getMetadata(); $meta[self::META_KEY_DELAY . "-$this->id"] = $this->getDelay($message); diff --git a/src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php b/src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php index 44814455..43ac16f4 100644 --- a/src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php +++ b/src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php @@ -5,8 +5,8 @@ namespace Yiisoft\Queue\Middleware\FailureHandling\Implementation; use InvalidArgumentException; -use Yiisoft\Queue\Message\Message; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope; use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest; use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface; @@ -40,15 +40,11 @@ public function processFailure( ): FailureHandlingRequest { $message = $request->getMessage(); if ($this->suites($message)) { - $message = new Message( - handlerName: $message->getHandlerName(), - data: $message->getData(), - metadata: $this->createMeta($message), - id: $message->getId(), - ); - $message = $this->queue?->push($message) ?? $request->getQueue()->push($message); + $envelope = new FailureEnvelope($message, $this->createMeta($message)); + $envelope = ($this->queue ?? $request->getQueue())->push($envelope); - return $request->withMessage($message)->withQueue($this->queue ?? $request->getQueue()); + return $request->withMessage($envelope) + ->withQueue($this->queue ?? $request->getQueue()); } return $handler->handleFailure($request); @@ -59,11 +55,6 @@ private function suites(MessageInterface $message): bool return $this->getAttempts($message) < $this->maxAttempts; } - private function getMetaKey(): string - { - return self::META_KEY_RESEND . "-$this->id"; - } - private function createMeta(MessageInterface $message): array { $metadata = $message->getMetadata(); @@ -81,4 +72,9 @@ private function getAttempts(MessageInterface $message): int return (int) $result; } + + private function getMetaKey(): string + { + return self::META_KEY_RESEND . "-$this->id"; + } } diff --git a/src/Middleware/Push/AdapterPushHandler.php b/src/Middleware/Push/AdapterPushHandler.php index 8d052d92..be4b1e67 100644 --- a/src/Middleware/Push/AdapterPushHandler.php +++ b/src/Middleware/Push/AdapterPushHandler.php @@ -13,11 +13,9 @@ final class AdapterPushHandler implements MessageHandlerPushInterface { public function handlePush(PushRequest $request): PushRequest { - if ($request->getAdapter() === null) { + if (($adapter = $request->getAdapter()) === null) { throw new AdapterNotConfiguredException(); } - $request->getAdapter()->push($request->getMessage()); - - return $request; + return $request->withMessage($adapter->push($request->getMessage())); } } diff --git a/src/Queue.php b/src/Queue.php index 6f13e487..796704a9 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -16,6 +16,7 @@ use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; use Yiisoft\Queue\Middleware\Push\PushRequest; use Yiisoft\Queue\Worker\WorkerInterface; +use Yiisoft\Queue\Message\IdEnvelope; final class Queue implements QueueInterface { @@ -57,9 +58,10 @@ public function push( ->dispatch($request, $this->createPushHandler($middlewareDefinitions)) ->getMessage(); + $messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null'; $this->logger->info( 'Pushed message with handler name "{handlerName}" to the queue. Assigned ID #{id}.', - ['handlerName' => $message->getHandlerName(), 'id' => $message->getId() ?? 'null'] + ['handlerName' => $message->getHandlerName(), 'id' => $messageId] ); return $message; @@ -100,7 +102,7 @@ public function listen(): void $this->logger->info('Finish listening to the queue.'); } - public function status(string $id): JobStatus + public function status(string|int $id): JobStatus { $this->checkAdapter(); @@ -159,7 +161,7 @@ private function createPushHandler(array $middlewares): MessageHandlerPushInterf return new class ( $this->adapterPushHandler, $this->pushMiddlewareDispatcher, - [...array_values($this->middlewareDefinitions), ...array_values($middlewares)] + array_merge($this->middlewareDefinitions, $middlewares) ) implements MessageHandlerPushInterface { public function __construct( private AdapterPushHandler $adapterPushHandler, diff --git a/src/QueueInterface.php b/src/QueueInterface.php index aaa0c14f..2fa93fb3 100644 --- a/src/QueueInterface.php +++ b/src/QueueInterface.php @@ -45,7 +45,7 @@ public function listen(): void; * * @return JobStatus */ - public function status(string $id): JobStatus; + public function status(string|int $id): JobStatus; public function withAdapter(AdapterInterface $adapter): self; diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index 6e22bf9a..a42f0c75 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -25,6 +25,7 @@ use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher; use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; use Yiisoft\Queue\QueueInterface; +use Yiisoft\Queue\Message\IdEnvelope; final class Worker implements WorkerInterface { @@ -45,7 +46,7 @@ public function __construct( */ public function process(MessageInterface $message, QueueInterface $queue): MessageInterface { - $this->logger->info('Processing message #{message}.', ['message' => $message->getId()]); + $this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']); $name = $message->getHandlerName(); $handler = $this->getHandler($name); diff --git a/tests/App/DummyQueue.php b/tests/App/DummyQueue.php index f7798007..dc8eceb4 100644 --- a/tests/App/DummyQueue.php +++ b/tests/App/DummyQueue.php @@ -13,7 +13,7 @@ final class DummyQueue implements QueueInterface { - public function __construct(private $channelName) + public function __construct(private string $channelName) { } @@ -32,7 +32,7 @@ public function listen(): void { } - public function status(string $id): JobStatus + public function status(string|int $id): JobStatus { throw new Exception('`status()` method is not implemented yet.'); } diff --git a/tests/App/FakeAdapter.php b/tests/App/FakeAdapter.php index fea8ce2b..8caa67c9 100644 --- a/tests/App/FakeAdapter.php +++ b/tests/App/FakeAdapter.php @@ -13,9 +13,11 @@ final class FakeAdapter implements AdapterInterface public array $pushMessages = []; public string $channel = 'default'; - public function push(MessageInterface $message): void + public function push(MessageInterface $message): MessageInterface { $this->pushMessages[] = $message; + + return $message; } public function runExisting(callable $handlerCallback): void @@ -23,7 +25,7 @@ public function runExisting(callable $handlerCallback): void //skip } - public function status(string $id): JobStatus + public function status(string|int $id): JobStatus { //skip } diff --git a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php index fb52f4c7..7629cb8f 100644 --- a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php +++ b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php @@ -150,6 +150,7 @@ public function testPipelineSuccess(): void { $message = new Message('test', null); $queue = $this->createMock(QueueInterface::class); + $queue->method('push')->willReturnArgument(0); $middleware = new ExponentialDelayMiddleware( 'test', 1, @@ -165,8 +166,9 @@ public function testPipelineSuccess(): void $result = $middleware->processFailure($request, $nextHandler); self::assertNotEquals($request, $result); - self::assertArrayHasKey(ExponentialDelayMiddleware::META_KEY_ATTEMPTS . '-test', $result->getMessage()->getMetadata()); - self::assertArrayHasKey(ExponentialDelayMiddleware::META_KEY_DELAY . '-test', $result->getMessage()->getMetadata()); + $message = $result->getMessage(); + self::assertArrayHasKey(ExponentialDelayMiddleware::META_KEY_ATTEMPTS . '-test', $message->getMetadata()); + self::assertArrayHasKey(ExponentialDelayMiddleware::META_KEY_DELAY . '-test', $message->getMetadata()); } public function testPipelineFailure(): void diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 95a156ff..7a477e66 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -9,6 +9,7 @@ use Yiisoft\Queue\Message\Message; use Yiisoft\Queue\Tests\App\FakeAdapter; use Yiisoft\Queue\Tests\TestCase; +use Yiisoft\Queue\Message\IdEnvelope; final class QueueTest extends TestCase { @@ -86,8 +87,13 @@ public function testStatus(): void ->getQueue() ->withAdapter($this->getAdapter()); $message = new Message('simple', null); - $queue->push($message); - $id = $message->getId(); + $envelope = $queue->push($message); + + self::assertArrayHasKey(IdEnvelope::MESSAGE_ID_KEY, $envelope->getMetadata()); + /** + * @var int|string $id + */ + $id = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; $status = $queue->status($id); self::assertTrue($status->isWaiting()); @@ -102,8 +108,8 @@ public function testAdapterNotConfiguredException(): void try { $queue = $this->getQueue(); $message = new Message('simple', null); - $queue->push($message); - $queue->status($message->getId()); + $envelope = $queue->push($message); + $queue->status($envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]); } catch (AdapterNotConfiguredException $exception) { self::assertSame($exception::class, AdapterNotConfiguredException::class); self::assertSame($exception->getName(), 'Adapter is not configured'); diff --git a/tests/Unit/SynchronousAdapterTest.php b/tests/Unit/SynchronousAdapterTest.php index 0a741951..31a43f50 100644 --- a/tests/Unit/SynchronousAdapterTest.php +++ b/tests/Unit/SynchronousAdapterTest.php @@ -8,6 +8,7 @@ use Yiisoft\Queue\Message\Message; use Yiisoft\Queue\QueueFactory; use Yiisoft\Queue\Tests\TestCase; +use Yiisoft\Queue\Message\IdEnvelope; final class SynchronousAdapterTest extends TestCase { @@ -22,8 +23,11 @@ public function testNonIntegerId(): void ->getQueue() ->withAdapter($this->getAdapter()); $message = new Message('simple', null); - $queue->push($message); - $id = $message->getId(); + $envelope = $queue->push($message); + + self::assertArrayHasKey(IdEnvelope::MESSAGE_ID_KEY, $envelope->getMetadata()); + $id = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; + $wrongId = "$id "; self::assertEquals(JobStatus::waiting(), $queue->status($wrongId)); } @@ -34,12 +38,12 @@ public function testIdSetting(): void $adapter = $this->getAdapter(); $ids = []; - $adapter->push($message); - $ids[] = $message->getId(); - $adapter->push($message); - $ids[] = $message->getId(); - $adapter->push($message); - $ids[] = $message->getId(); + $envelope = $adapter->push($message); + $ids[] = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; + $envelope = $adapter->push($message); + $ids[] = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; + $envelope = $adapter->push($message); + $ids[] = $envelope->getMetadata()[IdEnvelope::MESSAGE_ID_KEY]; self::assertCount(3, array_unique($ids)); } diff --git a/tests/Unit/WorkerTest.php b/tests/Unit/WorkerTest.php index 6b81483a..4187358f 100644 --- a/tests/Unit/WorkerTest.php +++ b/tests/Unit/WorkerTest.php @@ -43,7 +43,7 @@ public function testJobExecutedWithCallableHandler(): void $messages = $logger->getMessages(); $this->assertNotEmpty($messages); - $this->assertStringContainsString('Processing message #{message}.', $messages[0]['message']); + $this->assertStringContainsString('Processing message #null.', $messages[0]['message']); } public function testJobExecutedWithDefinitionHandler(): void