Skip to content

Commit

Permalink
Add envelope stack + MessageSerializer (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
xepozz committed Jan 22, 2024
1 parent 5339df8 commit fb5d3c6
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 14 deletions.
3 changes: 3 additions & 0 deletions config/di.php
Expand Up @@ -8,6 +8,8 @@
use Yiisoft\Queue\Cli\SimpleLoop;
use Yiisoft\Queue\Command\ListenAllCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\MessageSerializerInterface;
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume;
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface;
Expand Down Expand Up @@ -54,6 +56,7 @@
FailureMiddlewareDispatcher::class => [
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-fail']],
],
MessageSerializerInterface::class => JsonMessageSerializer::class,
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/yii-queue']['channel-definitions']),
Expand Down
4 changes: 4 additions & 0 deletions src/Message/EnvelopeInterface.php
Expand Up @@ -9,6 +9,10 @@
*/
interface EnvelopeInterface extends MessageInterface
{
public const ENVELOPE_STACK_KEY = 'envelopes';

public static function fromMessage(MessageInterface $message): self;

public function getMessage(): MessageInterface;

public function withMessage(MessageInterface $message): self;
Expand Down
21 changes: 20 additions & 1 deletion src/Message/EnvelopeTrait.php
Expand Up @@ -31,8 +31,27 @@ public function getData(): mixed
return $this->message->getData();
}

public static function fromMessage(MessageInterface $message): self
{
return new static($message);
}

public function getMetadata(): array
{
return $this->message->getMetadata();
return array_merge(
$this->message->getMetadata(),
[
self::ENVELOPE_STACK_KEY => array_merge(
$this->message->getMetadata()[self::ENVELOPE_STACK_KEY] ?? [],
[self::class],
),
],
$this->getEnvelopeMetadata(),
);
}

public function getEnvelopeMetadata(): array
{
return [];
}
}
6 changes: 2 additions & 4 deletions src/Message/IdEnvelope.php
Expand Up @@ -29,10 +29,8 @@ public function getId(): string|int|null
return $this->id ?? $this->message->getMetadata()[self::MESSAGE_ID_KEY] ?? null;
}

public function getMetadata(): array
private function getEnvelopeMetadata(): array
{
return array_merge($this->message->getMetadata(), [
self::MESSAGE_ID_KEY => $this->getId(),
]);
return [self::MESSAGE_ID_KEY => $this->getId()];
}
}
59 changes: 59 additions & 0 deletions src/Message/JsonMessageSerializer.php
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

use InvalidArgumentException;
use JsonException;

final class JsonMessageSerializer implements MessageSerializerInterface
{
/**
* @throws JsonException
*/
public function serialize(MessageInterface $message): string
{
$payload = [
'name' => $message->getHandlerName(),
'data' => $message->getData(),
'meta' => $message->getMetadata(),
];

return json_encode($payload, JSON_THROW_ON_ERROR);
}

/**
* @throws JsonException
* @throws InvalidArgumentException
*/
public function unserialize(string $value): MessageInterface
{
$payload = json_decode($value, true, 512, JSON_THROW_ON_ERROR);
if (!is_array($payload)) {
throw new InvalidArgumentException('Payload must be array. Got ' . get_debug_type($payload) . '.');
}

$meta = $payload['meta'] ?? [];
if (!is_array($meta)) {
throw new InvalidArgumentException('Metadata must be array. Got ' . get_debug_type($meta) . '.');
}

// TODO: will be removed later
$message = new Message($payload['name'] ?? '$name', $payload['data'] ?? null, $meta);

if (isset($meta[EnvelopeInterface::ENVELOPE_STACK_KEY]) && is_array($meta[EnvelopeInterface::ENVELOPE_STACK_KEY])) {
$message = $message->withMetadata(
array_merge($message->getMetadata(), [EnvelopeInterface::ENVELOPE_STACK_KEY => []]),
);
foreach ($meta[EnvelopeInterface::ENVELOPE_STACK_KEY] as $envelope) {
if (is_string($envelope) && class_exists($envelope) && is_subclass_of($envelope, EnvelopeInterface::class)) {
$message = $envelope::fromMessage($message);
}
}
}


return $message;
}
}
8 changes: 8 additions & 0 deletions src/Message/Message.php
Expand Up @@ -32,4 +32,12 @@ public function getMetadata(): array
{
return $this->metadata;
}

public function withMetadata(array $metadata): self
{
$instance = clone $this;
$instance->metadata = $metadata;

return $instance;
}
}
12 changes: 12 additions & 0 deletions src/Message/MessageSerializerInterface.php
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

interface MessageSerializerInterface
{
public function serialize(MessageInterface $message): string;

public function unserialize(string $value): MessageInterface;
}
2 changes: 1 addition & 1 deletion src/Middleware/FailureHandling/FailureEnvelope.php
Expand Up @@ -14,7 +14,7 @@ final class FailureEnvelope implements EnvelopeInterface

public function __construct(
private MessageInterface $message,
private array $meta,
private array $meta = [],
) {
}

Expand Down
4 changes: 0 additions & 4 deletions src/QueueInterface.php
Expand Up @@ -19,18 +19,14 @@ interface QueueInterface
/**
* Pushes a message into the queue.
*
* @param MessageInterface $message
* @param array|callable|MiddlewarePushInterface|string ...$middlewareDefinitions
*
* @return MessageInterface
*/
public function push(MessageInterface $message, MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): MessageInterface;

/**
* Execute all existing jobs and exit
*
* @param int $max
*
* @return int How many messages were processed
*/
public function run(int $max = 0): int;
Expand Down
2 changes: 1 addition & 1 deletion src/Worker/Worker.php
Expand Up @@ -51,7 +51,7 @@ public function process(MessageInterface $message, QueueInterface $queue): Messa
$name = $message->getHandlerName();
$handler = $this->getHandler($name);
if ($handler === null) {
throw new RuntimeException("Queue handler with name $name doesn't exist");
throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $name));
}

$request = new ConsumeRequest($message, $queue);
Expand Down
47 changes: 47 additions & 0 deletions tests/Unit/EnvelopeTest.php
@@ -0,0 +1,47 @@
<?php

declare(strict_types=1);

namespace Unit;

use PHPUnit\Framework\TestCase;
use Yiisoft\Queue\Message\EnvelopeInterface;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\Message;

final class EnvelopeTest extends TestCase
{
public function testEnvelopeStack(): void
{
$message = new Message('handler', 'test');
$message = new IdEnvelope($message, 'test-id');

$this->assertEquals('test', $message->getMessage()->getData());

$stack = $message->getMetadata()[EnvelopeInterface::ENVELOPE_STACK_KEY];
$this->assertIsArray($stack);

$this->assertEquals([
IdEnvelope::class,
], $stack);
}

public function testEnvelopeDuplicates(): void
{
$message = new Message('handler', 'test');
$message = new IdEnvelope($message, 'test-id');
$message = new IdEnvelope($message, 'test-id');
$message = new IdEnvelope($message, 'test-id');

$this->assertEquals('test', $message->getMessage()->getData());

$stack = $message->getMetadata()[EnvelopeInterface::ENVELOPE_STACK_KEY];
$this->assertIsArray($stack);

$this->assertEquals([
IdEnvelope::class,
IdEnvelope::class,
IdEnvelope::class,
], $stack);
}
}

0 comments on commit fb5d3c6

Please sign in to comment.