From f986fe32c8192563c8e4b2763ddf97dc2df2687a Mon Sep 17 00:00:00 2001 From: Guuzen Date: Mon, 8 Sep 2025 18:24:56 +0400 Subject: [PATCH] Rabbitmq doesn't support immediate flag (which is isHighestPriority flag in the lib) --- src/Delivery/SchedulerDeliveryOptions.php | 29 ++++++- src/Emitter/RabbitMQEmitter.php | 16 ++-- src/Module/SchedulerModule.php | 8 +- tests/Context.php | 4 + tests/Emitter/RabbitmqEmitterTest.php | 48 ++++++++++++ .../SchedulerModuleIsHighestPriorityTest.php | 75 +++++++++++++++++++ 6 files changed, 171 insertions(+), 9 deletions(-) create mode 100644 tests/Emitter/RabbitmqEmitterTest.php create mode 100644 tests/Module/SchedulerModuleIsHighestPriorityTest.php diff --git a/src/Delivery/SchedulerDeliveryOptions.php b/src/Delivery/SchedulerDeliveryOptions.php index 69d8bec..012cc65 100644 --- a/src/Delivery/SchedulerDeliveryOptions.php +++ b/src/Delivery/SchedulerDeliveryOptions.php @@ -14,6 +14,9 @@ use ServiceBus\Common\Endpoint\DeliveryOptions; +/** + * @psalm-immutable + */ final class SchedulerDeliveryOptions implements DeliveryOptions { /** @@ -23,6 +26,15 @@ final class SchedulerDeliveryOptions implements DeliveryOptions */ private $headers; + /** + * This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this + * flag is set, the server will return an undeliverable message with a Return method. If this flag is false, the + * server will queue the message, but with no guarantee that it will ever be consumed. + * + * @var bool + */ + private $isImmediate; + /** * @psalm-param positive-int $delay */ @@ -31,6 +43,14 @@ public static function scheduledMessage(int $delay): self return new self(['x-delay' => $delay]); } + public function withIsHighestPriority(bool $isHighestPriority): self + { + return new self( + headers: $this->headers, + isImmediate: $isHighestPriority, + ); + } + public static function create(): self { return new self([]); @@ -56,7 +76,7 @@ public function isPersistent(): bool public function isHighestPriority(): bool { - return true; + return $this->isImmediate; } public function expirationAfter(): ?int @@ -67,8 +87,11 @@ public function expirationAfter(): ?int /** * @psalm-param array $headers */ - private function __construct(array $headers) - { + private function __construct( + array $headers, + bool $isImmediate = true, + ) { $this->headers = $headers; + $this->isImmediate = $isImmediate; } } diff --git a/src/Emitter/RabbitMQEmitter.php b/src/Emitter/RabbitMQEmitter.php index 955d08e..6916231 100644 --- a/src/Emitter/RabbitMQEmitter.php +++ b/src/Emitter/RabbitMQEmitter.php @@ -33,9 +33,15 @@ final class RabbitMQEmitter implements SchedulerEmitter */ private $store; - public function __construct(SchedulerStore $store) + /** + * @var bool + */ + private $deliverWithHighestPriority; + + public function __construct(SchedulerStore $store, ?bool $deliverWithHighestPriority = null) { $this->store = $store; + $this->deliverWithHighestPriority = $deliverWithHighestPriority ?? true; } public function emit(ScheduledOperationId $id, ServiceBusContext $context): Promise @@ -79,11 +85,11 @@ function () use ($nextOperation, $context): \Generator $delay = $this->calculateExecutionDelay($nextOperation); + $deliveryOptions = SchedulerDeliveryOptions::scheduledMessage($delay) + ->withIsHighestPriority($this->deliverWithHighestPriority); + /** Message will return after a specified time interval */ - yield $context->delivery( - new EmitSchedulerOperation($nextOperation->id), - SchedulerDeliveryOptions::scheduledMessage($delay) - ); + yield $context->delivery(new EmitSchedulerOperation($nextOperation->id), $deliveryOptions); $context->logger()->debug( 'Scheduled operation with identifier "{scheduledOperationId}" will be executed after "{scheduledOperationDelay}" seconds', diff --git a/src/Module/SchedulerModule.php b/src/Module/SchedulerModule.php index 9fccfda..4e0e524 100644 --- a/src/Module/SchedulerModule.php +++ b/src/Module/SchedulerModule.php @@ -22,6 +22,7 @@ use ServiceBus\Scheduler\Store\SqlSchedulerStore; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Definition; +use Symfony\Component\DependencyInjection\Parameter; use Symfony\Component\DependencyInjection\Reference; final class SchedulerModule implements ServiceBusModule @@ -111,9 +112,14 @@ private function registerEmitter(ContainerBuilder $containerBuilder): void { if (self::TYPE === $this->adapterType) { + $containerBuilder->setParameter('service_bus.scheduler.deliver_with_highest_priority', true); + $containerBuilder->addDefinitions([ SchedulerEmitter::class => (new Definition(RabbitMQEmitter::class)) - ->setArguments([new Reference(SchedulerStore::class)]), + ->setArguments([ + new Reference(SchedulerStore::class), + '%service_bus.scheduler.deliver_with_highest_priority%', + ]), ]); return; diff --git a/tests/Context.php b/tests/Context.php index d15b492..a377c36 100644 --- a/tests/Context.php +++ b/tests/Context.php @@ -41,6 +41,8 @@ final class Context implements ServiceBusContext */ public $logHandler; + public ?DeliveryOptions $deliveryOptions = null; + public function violations(): ?ValidationViolations { return null; @@ -57,6 +59,7 @@ public function delivery( ?OutcomeMessageMetadata $withMetadata = null ): Promise { $this->messages[] = $message; + $this->deliveryOptions = $deliveryOptions; return new Success(); } @@ -66,6 +69,7 @@ public function deliveryBulk( ?DeliveryOptions $deliveryOptions = null, ?OutcomeMessageMetadata $withMetadata = null ): Promise { + $this->deliveryOptions = $deliveryOptions; $this->messages = \array_merge($this->messages, $messages); return new Success(); diff --git a/tests/Emitter/RabbitmqEmitterTest.php b/tests/Emitter/RabbitmqEmitterTest.php new file mode 100644 index 0000000..d78018c --- /dev/null +++ b/tests/Emitter/RabbitmqEmitterTest.php @@ -0,0 +1,48 @@ +emitNextOperation( + NextScheduledOperation::create(ScheduledOperationId::new(), new \DateTimeImmutable()), + $context, + ); + + $this->assertSame($expectedFlag, $context->deliveryOptions->isHighestPriority()); + } + + public static function isHighestPriorityOptionProvider(): \Generator + { + yield 'true option results in true flag' => ['expectedFlag' => true, 'optionFlag' => true]; + yield 'false option results in false flag' => ['expectedFlag' => false, 'optionFlag' => false]; + yield 'null option results in default true flag' => ['expectedFlag' => true, 'optionFlag' => null]; + } +} diff --git a/tests/Module/SchedulerModuleIsHighestPriorityTest.php b/tests/Module/SchedulerModuleIsHighestPriorityTest.php new file mode 100644 index 0000000..4b0f11c --- /dev/null +++ b/tests/Module/SchedulerModuleIsHighestPriorityTest.php @@ -0,0 +1,75 @@ +addDefinitions([ + DatabaseAdapter::class => new Definition( + DoctrineDBALAdapter::class, + [ + new StorageConfiguration('sqlite:///:memory:'), + ], + ), + ]); + + $this->containerBuilder = $containerBuilder; + } + + /** + * To keep thing backward compatible for the clients who use default highestPriority = true + * + * @test + */ + public function highestPriorityIsDefault(): void + { + $module = SchedulerModule::rabbitMqWithSqlStorage(DatabaseAdapter::class); + $module->boot($this->containerBuilder); + + $this->assertIsHighestPriority(true); + } + + /** + * @test + */ + public function highestPriorityCanBeOverriden(): void + { + $module = SchedulerModule::rabbitMqWithSqlStorage(DatabaseAdapter::class); + $module->boot($this->containerBuilder); + + $this->containerBuilder->setParameter('service_bus.scheduler.deliver_with_highest_priority', false); + + $this->assertIsHighestPriority(false); + } + + private function assertIsHighestPriority(?bool $isHighestPriority): void + { + self::assertEquals( + new RabbitMQEmitter( + new SqlSchedulerStore( + new DoctrineDBALAdapter(new StorageConfiguration('sqlite:///:memory:')), + ), + $isHighestPriority, + ), + $this->containerBuilder->get(SchedulerEmitter::class), + ); + } +}