Skip to content

Commit

Permalink
Introduce Envelope (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
xepozz committed Jan 12, 2024
1 parent 6e7fec2 commit 8498f59
Show file tree
Hide file tree
Showing 23 changed files with 188 additions and 81 deletions.
4 changes: 2 additions & 2 deletions composer.json
Expand Up @@ -40,9 +40,9 @@
"rector/rector": "^0.19.0",
"roave/infection-static-analysis-plugin": "^1.16",
"spatie/phpunit-watcher": "^1.23",
"yiisoft/yii-debug": "dev-master",
"vimeo/psalm": "^4.30|^5.8",
"yiisoft/test-support": "^3.0"
"yiisoft/test-support": "^3.0",
"yiisoft/yii-debug": "dev-master|dev-php80"
},
"suggest": {
"ext-pcntl": "Need for process signals"
Expand Down
4 changes: 2 additions & 2 deletions src/Adapter/AdapterInterface.php
Expand Up @@ -26,12 +26,12 @@ public function runExisting(callable $handlerCallback): void;
*
* @return JobStatus
*/
public function status(string $id): JobStatus;
public function status(string|int $id): JobStatus;

/**
* Pushing a message to the queue. Adapter sets message ID if available.
*/
public function push(MessageInterface $message): void;
public function push(MessageInterface $message): MessageInterface;

/**
* Listen to the queue and pass messages to the given handler as they come.
Expand Down
7 changes: 4 additions & 3 deletions src/Adapter/SynchronousAdapter.php
Expand Up @@ -10,6 +10,7 @@
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;

final class SynchronousAdapter implements AdapterInterface
{
Expand Down Expand Up @@ -42,7 +43,7 @@ public function runExisting(callable $handlerCallback): void
}
}

public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
$id = (int) $id;

Expand All @@ -61,12 +62,12 @@ public function status(string $id): JobStatus
throw new InvalidArgumentException('There is no message with the given ID.');
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
$key = count($this->messages) + $this->current;
$this->messages[] = $message;

$message->setId((string) $key);
return new IdEnvelope($message, $key);
}

public function subscribe(callable $handlerCallback): void
Expand Down
2 changes: 1 addition & 1 deletion src/Debug/QueueDecorator.php
Expand Up @@ -18,7 +18,7 @@ public function __construct(
) {
}

public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
$result = $this->queue->status($id);
$this->collector->collectStatus($id, $result);

Check warning on line 24 in src/Debug/QueueDecorator.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ public function status(string|int $id) : JobStatus { $result = $this->queue->status($id); - $this->collector->collectStatus($id, $result); + return $result; } public function push(MessageInterface $message, string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions) : MessageInterface
Expand Down
3 changes: 2 additions & 1 deletion src/Exception/JobFailureException.php
Expand Up @@ -7,13 +7,14 @@
use RuntimeException;
use Throwable;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\IdEnvelope;

class JobFailureException extends RuntimeException
{
public function __construct(private MessageInterface $queueMessage, Throwable $previous)
{
$error = $previous->getMessage();
$messageId = $queueMessage->getId() ?? 'null';
$messageId = $queueMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
$messageText = "Processing of message #$messageId is stopped because of an exception:\n$error.";

parent::__construct($messageText, 0, $previous);
Expand Down
15 changes: 15 additions & 0 deletions src/Message/EnvelopeInterface.php
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

/**
* Envelope is a message container that adds additional metadata.
*/
interface EnvelopeInterface extends MessageInterface
{
public function getMessage(): MessageInterface;

public function withMessage(MessageInterface $message): self;
}
38 changes: 38 additions & 0 deletions src/Message/EnvelopeTrait.php
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

trait EnvelopeTrait
{
private MessageInterface $message;

public function getMessage(): MessageInterface
{
return $this->message;
}

public function withMessage(MessageInterface $message): self
{
$instance = clone $this;
$instance->message = $message;

return $instance;
}

public function getHandlerName(): string
{
return $this->message->getHandlerName();
}

public function getData(): mixed
{
return $this->message->getData();
}

public function getMetadata(): array
{
return $this->message->getMetadata();
}
}
38 changes: 38 additions & 0 deletions src/Message/IdEnvelope.php
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

/**
* ID envelope allows to identify a message.
*/
final class IdEnvelope implements EnvelopeInterface
{
use EnvelopeTrait;

public const MESSAGE_ID_KEY = 'yii-message-id';

public function __construct(
private MessageInterface $message,
private string|int|null $id = null,
) {
}

public function setId(string|int|null $id): void
{
$this->id = $id;
}

public function getId(): string|int|null
{
return $this->id ?? $this->message->getMetadata()[self::MESSAGE_ID_KEY] ?? null;
}

public function getMetadata(): array
{
return array_merge($this->message->getMetadata(), [
self::MESSAGE_ID_KEY => $this->getId(),
]);
}
}
11 changes: 0 additions & 11 deletions src/Message/Message.php
Expand Up @@ -15,7 +15,6 @@ public function __construct(
private string $handlerName,
private mixed $data,
private array $metadata = [],
private ?string $id = null
) {
}

Expand All @@ -29,16 +28,6 @@ public function getData(): mixed
return $this->data;
}

public function setId(?string $id): void
{
$this->id = $id;
}

public function getId(): ?string
{
return $this->id;
}

public function getMetadata(): array
{
return $this->metadata;
Expand Down
9 changes: 0 additions & 9 deletions src/Message/MessageInterface.php
Expand Up @@ -6,15 +6,6 @@

interface MessageInterface
{
public function setId(?string $id): void;

/**
* Returns unique message ID.
*
* @return string|null
*/
public function getId(): ?string;

/**
* Returns handler name.
*
Expand Down
25 changes: 25 additions & 0 deletions src/Middleware/FailureHandling/FailureEnvelope.php
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Middleware\FailureHandling;

use Yiisoft\Queue\Message\EnvelopeInterface;
use Yiisoft\Queue\Message\EnvelopeTrait;
use Yiisoft\Queue\Message\MessageInterface;

final class FailureEnvelope implements EnvelopeInterface
{
use EnvelopeTrait;

public function __construct(
private MessageInterface $message,
private array $meta,
) {
}

public function getMetadata(): array
{
return array_merge($this->message->getMetadata(), $this->meta);
}
}
Expand Up @@ -12,6 +12,7 @@
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface;
use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope;

/**
* Failure strategy which resends the given message to a queue with an exponentially increasing delay.
Expand Down Expand Up @@ -62,15 +63,12 @@ public function processFailure(
): FailureHandlingRequest {
$message = $request->getMessage();
if ($this->suites($message)) {
$messageNew = new Message(
handlerName: $message->getHandlerName(),
data: $message->getData(),
metadata: $this->formNewMeta($message),
id: $message->getId(),
);
($this->queue ?? $request->getQueue())->push(
$messageNew,
$this->delayMiddleware->withDelay($this->getDelay($message))
$envelope = new FailureEnvelope($message, $this->createNewMeta($message));
$queue = $this->queue ?? $request->getQueue();
$middlewareDefinitions = $this->delayMiddleware->withDelay($this->getDelay($envelope));
$messageNew = $queue->push(
$envelope,
$middlewareDefinitions
);

return $request->withMessage($messageNew);
Expand All @@ -84,7 +82,7 @@ private function suites(MessageInterface $message): bool
return $this->maxAttempts > $this->getAttempts($message);
}

private function formNewMeta(MessageInterface $message): array
private function createNewMeta(MessageInterface $message): array
{
$meta = $message->getMetadata();
$meta[self::META_KEY_DELAY . "-$this->id"] = $this->getDelay($message);
Expand Down
Expand Up @@ -5,8 +5,8 @@
namespace Yiisoft\Queue\Middleware\FailureHandling\Implementation;

use InvalidArgumentException;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope;
use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest;
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface;
Expand Down Expand Up @@ -40,15 +40,11 @@ public function processFailure(
): FailureHandlingRequest {
$message = $request->getMessage();
if ($this->suites($message)) {
$message = new Message(
handlerName: $message->getHandlerName(),
data: $message->getData(),
metadata: $this->createMeta($message),
id: $message->getId(),
);
$message = $this->queue?->push($message) ?? $request->getQueue()->push($message);
$envelope = new FailureEnvelope($message, $this->createMeta($message));
$envelope = ($this->queue ?? $request->getQueue())->push($envelope);

return $request->withMessage($message)->withQueue($this->queue ?? $request->getQueue());
return $request->withMessage($envelope)
->withQueue($this->queue ?? $request->getQueue());
}

return $handler->handleFailure($request);
Expand All @@ -59,11 +55,6 @@ private function suites(MessageInterface $message): bool
return $this->getAttempts($message) < $this->maxAttempts;
}

private function getMetaKey(): string
{
return self::META_KEY_RESEND . "-$this->id";
}

private function createMeta(MessageInterface $message): array
{
$metadata = $message->getMetadata();
Expand All @@ -81,4 +72,9 @@ private function getAttempts(MessageInterface $message): int

return (int) $result;
}

private function getMetaKey(): string
{
return self::META_KEY_RESEND . "-$this->id";
}
}
6 changes: 2 additions & 4 deletions src/Middleware/Push/AdapterPushHandler.php
Expand Up @@ -13,11 +13,9 @@ final class AdapterPushHandler implements MessageHandlerPushInterface
{
public function handlePush(PushRequest $request): PushRequest
{
if ($request->getAdapter() === null) {
if (($adapter = $request->getAdapter()) === null) {
throw new AdapterNotConfiguredException();
}
$request->getAdapter()->push($request->getMessage());

return $request;
return $request->withMessage($adapter->push($request->getMessage()));
}
}
8 changes: 5 additions & 3 deletions src/Queue.php
Expand Up @@ -16,6 +16,7 @@
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Push\PushRequest;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;

final class Queue implements QueueInterface
{
Expand Down Expand Up @@ -57,9 +58,10 @@ public function push(
->dispatch($request, $this->createPushHandler($middlewareDefinitions))
->getMessage();

$messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
$this->logger->info(
'Pushed message with handler name "{handlerName}" to the queue. Assigned ID #{id}.',
['handlerName' => $message->getHandlerName(), 'id' => $message->getId() ?? 'null']
['handlerName' => $message->getHandlerName(), 'id' => $messageId]
);

return $message;
Expand Down Expand Up @@ -100,7 +102,7 @@ public function listen(): void
$this->logger->info('Finish listening to the queue.');
}

public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
$this->checkAdapter();

Expand Down Expand Up @@ -159,7 +161,7 @@ private function createPushHandler(array $middlewares): MessageHandlerPushInterf
return new class (
$this->adapterPushHandler,
$this->pushMiddlewareDispatcher,
[...array_values($this->middlewareDefinitions), ...array_values($middlewares)]
array_merge($this->middlewareDefinitions, $middlewares)
) implements MessageHandlerPushInterface {
public function __construct(
private AdapterPushHandler $adapterPushHandler,
Expand Down
2 changes: 1 addition & 1 deletion src/QueueInterface.php
Expand Up @@ -45,7 +45,7 @@ public function listen(): void;
*
* @return JobStatus
*/
public function status(string $id): JobStatus;
public function status(string|int $id): JobStatus;

public function withAdapter(AdapterInterface $adapter): self;

Expand Down

0 comments on commit 8498f59

Please sign in to comment.