From fb5d3c6a9b160d2acdd7d9d851c5264dbfa3de63 Mon Sep 17 00:00:00 2001 From: Dmitriy Derepko Date: Mon, 22 Jan 2024 14:36:45 +0700 Subject: [PATCH] Add envelope stack + MessageSerializer (#188) --- config/di.php | 3 + src/Message/EnvelopeInterface.php | 4 + src/Message/EnvelopeTrait.php | 21 ++- src/Message/IdEnvelope.php | 6 +- src/Message/JsonMessageSerializer.php | 59 +++++++ src/Message/Message.php | 8 + src/Message/MessageSerializerInterface.php | 12 ++ .../FailureHandling/FailureEnvelope.php | 2 +- src/QueueInterface.php | 4 - src/Worker/Worker.php | 2 +- tests/Unit/EnvelopeTest.php | 47 +++++ .../Message/JsonMessageSerializerTest.php | 160 ++++++++++++++++++ tests/Unit/WorkerTest.php | 6 +- 13 files changed, 320 insertions(+), 14 deletions(-) create mode 100644 src/Message/JsonMessageSerializer.php create mode 100644 src/Message/MessageSerializerInterface.php create mode 100644 tests/Unit/EnvelopeTest.php create mode 100644 tests/Unit/Message/JsonMessageSerializerTest.php diff --git a/config/di.php b/config/di.php index ea0f54b3..dcb37cbf 100644 --- a/config/di.php +++ b/config/di.php @@ -8,6 +8,8 @@ use Yiisoft\Queue\Cli\SimpleLoop; use Yiisoft\Queue\Command\ListenAllCommand; use Yiisoft\Queue\Command\RunCommand; +use Yiisoft\Queue\Message\JsonMessageSerializer; +use Yiisoft\Queue\Message\MessageSerializerInterface; use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume; use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface; @@ -54,6 +56,7 @@ FailureMiddlewareDispatcher::class => [ '__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-fail']], ], + MessageSerializerInterface::class => JsonMessageSerializer::class, RunCommand::class => [ '__construct()' => [ 'channels' => array_keys($params['yiisoft/yii-queue']['channel-definitions']), diff --git a/src/Message/EnvelopeInterface.php b/src/Message/EnvelopeInterface.php index bf336112..b0f8d89f 100644 --- a/src/Message/EnvelopeInterface.php +++ b/src/Message/EnvelopeInterface.php @@ -9,6 +9,10 @@ */ interface EnvelopeInterface extends MessageInterface { + public const ENVELOPE_STACK_KEY = 'envelopes'; + + public static function fromMessage(MessageInterface $message): self; + public function getMessage(): MessageInterface; public function withMessage(MessageInterface $message): self; diff --git a/src/Message/EnvelopeTrait.php b/src/Message/EnvelopeTrait.php index 13e593ab..7e58a97b 100644 --- a/src/Message/EnvelopeTrait.php +++ b/src/Message/EnvelopeTrait.php @@ -31,8 +31,27 @@ public function getData(): mixed return $this->message->getData(); } + public static function fromMessage(MessageInterface $message): self + { + return new static($message); + } + public function getMetadata(): array { - return $this->message->getMetadata(); + return array_merge( + $this->message->getMetadata(), + [ + self::ENVELOPE_STACK_KEY => array_merge( + $this->message->getMetadata()[self::ENVELOPE_STACK_KEY] ?? [], + [self::class], + ), + ], + $this->getEnvelopeMetadata(), + ); + } + + public function getEnvelopeMetadata(): array + { + return []; } } diff --git a/src/Message/IdEnvelope.php b/src/Message/IdEnvelope.php index a1ffccad..dec2d679 100644 --- a/src/Message/IdEnvelope.php +++ b/src/Message/IdEnvelope.php @@ -29,10 +29,8 @@ public function getId(): string|int|null return $this->id ?? $this->message->getMetadata()[self::MESSAGE_ID_KEY] ?? null; } - public function getMetadata(): array + private function getEnvelopeMetadata(): array { - return array_merge($this->message->getMetadata(), [ - self::MESSAGE_ID_KEY => $this->getId(), - ]); + return [self::MESSAGE_ID_KEY => $this->getId()]; } } diff --git a/src/Message/JsonMessageSerializer.php b/src/Message/JsonMessageSerializer.php new file mode 100644 index 00000000..81a6220c --- /dev/null +++ b/src/Message/JsonMessageSerializer.php @@ -0,0 +1,59 @@ + $message->getHandlerName(), + 'data' => $message->getData(), + 'meta' => $message->getMetadata(), + ]; + + return json_encode($payload, JSON_THROW_ON_ERROR); + } + + /** + * @throws JsonException + * @throws InvalidArgumentException + */ + public function unserialize(string $value): MessageInterface + { + $payload = json_decode($value, true, 512, JSON_THROW_ON_ERROR); + if (!is_array($payload)) { + throw new InvalidArgumentException('Payload must be array. Got ' . get_debug_type($payload) . '.'); + } + + $meta = $payload['meta'] ?? []; + if (!is_array($meta)) { + throw new InvalidArgumentException('Metadata must be array. Got ' . get_debug_type($meta) . '.'); + } + + // TODO: will be removed later + $message = new Message($payload['name'] ?? '$name', $payload['data'] ?? null, $meta); + + if (isset($meta[EnvelopeInterface::ENVELOPE_STACK_KEY]) && is_array($meta[EnvelopeInterface::ENVELOPE_STACK_KEY])) { + $message = $message->withMetadata( + array_merge($message->getMetadata(), [EnvelopeInterface::ENVELOPE_STACK_KEY => []]), + ); + foreach ($meta[EnvelopeInterface::ENVELOPE_STACK_KEY] as $envelope) { + if (is_string($envelope) && class_exists($envelope) && is_subclass_of($envelope, EnvelopeInterface::class)) { + $message = $envelope::fromMessage($message); + } + } + } + + + return $message; + } +} diff --git a/src/Message/Message.php b/src/Message/Message.php index 07278d1f..a414ffb0 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -32,4 +32,12 @@ public function getMetadata(): array { return $this->metadata; } + + public function withMetadata(array $metadata): self + { + $instance = clone $this; + $instance->metadata = $metadata; + + return $instance; + } } diff --git a/src/Message/MessageSerializerInterface.php b/src/Message/MessageSerializerInterface.php new file mode 100644 index 00000000..b034590c --- /dev/null +++ b/src/Message/MessageSerializerInterface.php @@ -0,0 +1,12 @@ +getHandlerName(); $handler = $this->getHandler($name); if ($handler === null) { - throw new RuntimeException("Queue handler with name $name doesn't exist"); + throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $name)); } $request = new ConsumeRequest($message, $queue); diff --git a/tests/Unit/EnvelopeTest.php b/tests/Unit/EnvelopeTest.php new file mode 100644 index 00000000..af412125 --- /dev/null +++ b/tests/Unit/EnvelopeTest.php @@ -0,0 +1,47 @@ +assertEquals('test', $message->getMessage()->getData()); + + $stack = $message->getMetadata()[EnvelopeInterface::ENVELOPE_STACK_KEY]; + $this->assertIsArray($stack); + + $this->assertEquals([ + IdEnvelope::class, + ], $stack); + } + + public function testEnvelopeDuplicates(): void + { + $message = new Message('handler', 'test'); + $message = new IdEnvelope($message, 'test-id'); + $message = new IdEnvelope($message, 'test-id'); + $message = new IdEnvelope($message, 'test-id'); + + $this->assertEquals('test', $message->getMessage()->getData()); + + $stack = $message->getMetadata()[EnvelopeInterface::ENVELOPE_STACK_KEY]; + $this->assertIsArray($stack); + + $this->assertEquals([ + IdEnvelope::class, + IdEnvelope::class, + IdEnvelope::class, + ], $stack); + } +} diff --git a/tests/Unit/Message/JsonMessageSerializerTest.php b/tests/Unit/Message/JsonMessageSerializerTest.php new file mode 100644 index 00000000..776a9835 --- /dev/null +++ b/tests/Unit/Message/JsonMessageSerializerTest.php @@ -0,0 +1,160 @@ +createSerializer(); + + $this->expectExceptionMessage(sprintf('Payload must be array. Got %s.', get_debug_type($payload))); + $this->expectException(InvalidArgumentException::class); + $serializer->unserialize(json_encode($payload)); + } + + public static function dataUnsupportedPayloadFormat(): iterable + { + yield 'string' => ['']; + yield 'number' => [1]; + yield 'boolean' => [true]; + yield 'null' => [null]; + } + + /** + * @dataProvider dataUnsupportedMetadataFormat + */ + public function testMetadataFormat(mixed $meta): void + { + $payload = ['data' => 'test', 'meta' => $meta]; + $serializer = $this->createSerializer(); + + $this->expectExceptionMessage(sprintf('Metadata must be array. Got %s.', get_debug_type($meta))); + $this->expectException(InvalidArgumentException::class); + $serializer->unserialize(json_encode($payload)); + } + + public static function dataUnsupportedMetadataFormat(): iterable + { + yield 'string' => ['']; + yield 'number' => [1]; + yield 'boolean' => [true]; + } + + public function testUnserializeFromData(): void + { + $payload = ['data' => 'test']; + $serializer = $this->createSerializer(); + + $message = $serializer->unserialize(json_encode($payload)); + + $this->assertInstanceOf(MessageInterface::class, $message); + $this->assertEquals($payload['data'], $message->getData()); + $this->assertEquals([], $message->getMetadata()); + } + + public function testUnserializeWithMetadata(): void + { + $payload = ['data' => 'test', 'meta' => ['int' => 1, 'str' => 'string', 'bool' => true]]; + $serializer = $this->createSerializer(); + + $message = $serializer->unserialize(json_encode($payload)); + + $this->assertInstanceOf(MessageInterface::class, $message); + $this->assertEquals($payload['data'], $message->getData()); + $this->assertEquals(['int' => 1, 'str' => 'string', 'bool' => true], $message->getMetadata()); + } + + public function testUnserializeEnvelopeStack(): void + { + $payload = [ + 'data' => 'test', + 'meta' => [ + EnvelopeInterface::ENVELOPE_STACK_KEY => [ + IdEnvelope::class, + ], + ], + ]; + $serializer = $this->createSerializer(); + + $message = $serializer->unserialize(json_encode($payload)); + + $this->assertInstanceOf(MessageInterface::class, $message); + $this->assertEquals($payload['data'], $message->getData()); + $this->assertEquals([IdEnvelope::class], $message->getMetadata()[EnvelopeInterface::ENVELOPE_STACK_KEY]); + + $this->assertInstanceOf(IdEnvelope::class, $message); + $this->assertNull($message->getId()); + $this->assertInstanceOf(Message::class, $message->getMessage()); + } + + public function testSerialize(): void + { + $message = new Message('handler', 'test'); + + $serializer = $this->createSerializer(); + + $json = $serializer->serialize($message); + + $this->assertEquals( + '{"name":"handler","data":"test","meta":[]}', + $json, + ); + } + + public function testSerializeEnvelopeStack(): void + { + $message = new Message('handler', 'test'); + $message = new IdEnvelope($message, 'test-id'); + + $serializer = $this->createSerializer(); + + $json = $serializer->serialize($message); + + $this->assertEquals( + sprintf( + '{"name":"handler","data":"test","meta":{"envelopes":["%s"],"%s":"test-id"}}', + str_replace('\\', '\\\\', IdEnvelope::class), + IdEnvelope::MESSAGE_ID_KEY, + ), + $json, + ); + + $message = $serializer->unserialize($json); + + $this->assertInstanceOf(IdEnvelope::class, $message); + $this->assertEquals('test-id', $message->getId()); + $this->assertEquals([ + EnvelopeInterface::ENVELOPE_STACK_KEY => [ + IdEnvelope::class, + ], + IdEnvelope::MESSAGE_ID_KEY => 'test-id', + ], $message->getMetadata()); + + $this->assertEquals([ + EnvelopeInterface::ENVELOPE_STACK_KEY => [], + IdEnvelope::MESSAGE_ID_KEY => 'test-id', + ], $message->getMessage()->getMetadata()); + } + + private function createSerializer(): JsonMessageSerializer + { + return new JsonMessageSerializer(); + } +} diff --git a/tests/Unit/WorkerTest.php b/tests/Unit/WorkerTest.php index c9ae1d6b..d0fc9733 100644 --- a/tests/Unit/WorkerTest.php +++ b/tests/Unit/WorkerTest.php @@ -108,7 +108,7 @@ public function testJobExecutedWithStaticDefinitionHandler(): void public function testJobFailWithDefinitionUndefinedMethodHandler(): void { - $this->expectExceptionMessage("Queue handler with name simple doesn't exist"); + $this->expectExceptionMessage('Queue handler with name "simple" does not exist'); $message = new Message('simple', ['test-data']); $logger = new SimpleLogger(); @@ -124,7 +124,7 @@ public function testJobFailWithDefinitionUndefinedMethodHandler(): void public function testJobFailWithDefinitionUndefinedClassHandler(): void { - $this->expectExceptionMessage("Queue handler with name simple doesn't exist"); + $this->expectExceptionMessage('Queue handler with name "simple" does not exist'); $message = new Message('simple', ['test-data']); $logger = new SimpleLogger(); @@ -146,7 +146,7 @@ public function testJobFailWithDefinitionUndefinedClassHandler(): void public function testJobFailWithDefinitionClassNotFoundInContainerHandler(): void { - $this->expectExceptionMessage("Queue handler with name simple doesn't exist"); + $this->expectExceptionMessage('Queue handler with name "simple" does not exist'); $message = new Message('simple', ['test-data']); $logger = new SimpleLogger(); $container = new SimpleContainer();