Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/Delivery/SchedulerDeliveryOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

use ServiceBus\Common\Endpoint\DeliveryOptions;

/**
* @psalm-immutable
*/
final class SchedulerDeliveryOptions implements DeliveryOptions
{
/**
Expand All @@ -23,6 +26,15 @@ final class SchedulerDeliveryOptions implements DeliveryOptions
*/
private $headers;

/**
* This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this
* flag is set, the server will return an undeliverable message with a Return method. If this flag is false, the
* server will queue the message, but with no guarantee that it will ever be consumed.
*
* @var bool
*/
private $isImmediate;

/**
* @psalm-param positive-int $delay
*/
Expand All @@ -31,6 +43,14 @@ public static function scheduledMessage(int $delay): self
return new self(['x-delay' => $delay]);
}

public function withIsHighestPriority(bool $isHighestPriority): self
{
return new self(
headers: $this->headers,
isImmediate: $isHighestPriority,
);
}

public static function create(): self
{
return new self([]);
Expand All @@ -56,7 +76,7 @@ public function isPersistent(): bool

public function isHighestPriority(): bool
{
return true;
return $this->isImmediate;
}

public function expirationAfter(): ?int
Expand All @@ -67,8 +87,11 @@ public function expirationAfter(): ?int
/**
* @psalm-param array<string, int|float|string|null> $headers
*/
private function __construct(array $headers)
{
private function __construct(
array $headers,
bool $isImmediate = true,
) {
$this->headers = $headers;
$this->isImmediate = $isImmediate;
}
}
16 changes: 11 additions & 5 deletions src/Emitter/RabbitMQEmitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@ final class RabbitMQEmitter implements SchedulerEmitter
*/
private $store;

public function __construct(SchedulerStore $store)
/**
* @var bool
*/
private $deliverWithHighestPriority;

public function __construct(SchedulerStore $store, ?bool $deliverWithHighestPriority = null)
{
$this->store = $store;
$this->deliverWithHighestPriority = $deliverWithHighestPriority ?? true;
}

public function emit(ScheduledOperationId $id, ServiceBusContext $context): Promise
Expand Down Expand Up @@ -79,11 +85,11 @@ function () use ($nextOperation, $context): \Generator

$delay = $this->calculateExecutionDelay($nextOperation);

$deliveryOptions = SchedulerDeliveryOptions::scheduledMessage($delay)
->withIsHighestPriority($this->deliverWithHighestPriority);

/** Message will return after a specified time interval */
yield $context->delivery(
new EmitSchedulerOperation($nextOperation->id),
SchedulerDeliveryOptions::scheduledMessage($delay)
);
yield $context->delivery(new EmitSchedulerOperation($nextOperation->id), $deliveryOptions);

$context->logger()->debug(
'Scheduled operation with identifier "{scheduledOperationId}" will be executed after "{scheduledOperationDelay}" seconds',
Expand Down
8 changes: 7 additions & 1 deletion src/Module/SchedulerModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use ServiceBus\Scheduler\Store\SqlSchedulerStore;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Parameter;
use Symfony\Component\DependencyInjection\Reference;

final class SchedulerModule implements ServiceBusModule
Expand Down Expand Up @@ -111,9 +112,14 @@ private function registerEmitter(ContainerBuilder $containerBuilder): void
{
if (self::TYPE === $this->adapterType)
{
$containerBuilder->setParameter('service_bus.scheduler.deliver_with_highest_priority', true);

$containerBuilder->addDefinitions([
SchedulerEmitter::class => (new Definition(RabbitMQEmitter::class))
->setArguments([new Reference(SchedulerStore::class)]),
->setArguments([
new Reference(SchedulerStore::class),
'%service_bus.scheduler.deliver_with_highest_priority%',
]),
]);

return;
Expand Down
4 changes: 4 additions & 0 deletions tests/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ final class Context implements ServiceBusContext
*/
public $logHandler;

public ?DeliveryOptions $deliveryOptions = null;

public function violations(): ?ValidationViolations
{
return null;
Expand All @@ -57,6 +59,7 @@ public function delivery(
?OutcomeMessageMetadata $withMetadata = null
): Promise {
$this->messages[] = $message;
$this->deliveryOptions = $deliveryOptions;

return new Success();
}
Expand All @@ -66,6 +69,7 @@ public function deliveryBulk(
?DeliveryOptions $deliveryOptions = null,
?OutcomeMessageMetadata $withMetadata = null
): Promise {
$this->deliveryOptions = $deliveryOptions;
$this->messages = \array_merge($this->messages, $messages);

return new Success();
Expand Down
48 changes: 48 additions & 0 deletions tests/Emitter/RabbitmqEmitterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

declare(strict_types=1);

namespace Emitter;

use PHPUnit\Framework\TestCase;
use ServiceBus\Scheduler\Data\NextScheduledOperation;
use ServiceBus\Scheduler\Emitter\RabbitMQEmitter;
use ServiceBus\Scheduler\ScheduledOperationId;
use ServiceBus\Scheduler\Store\SqlSchedulerStore;
use ServiceBus\Scheduler\Tests\Context;
use ServiceBus\Storage\Common\StorageConfiguration;
use ServiceBus\Storage\Sql\DoctrineDBAL\DoctrineDBALAdapter;

final class RabbitmqEmitterTest extends TestCase
{
/**
* @test
*
* @dataProvider isHighestPriorityOptionProvider
*/
public function isHighestPriorityOption(bool $expectedFlag, ?bool $optionFlag): void
{
$emitter = new RabbitMQEmitter(
new SqlSchedulerStore(
new DoctrineDBALAdapter(new StorageConfiguration('sqlite:///:memory:')),
),
$optionFlag,
);

$context = new Context();

$emitter->emitNextOperation(
NextScheduledOperation::create(ScheduledOperationId::new(), new \DateTimeImmutable()),
$context,
);

$this->assertSame($expectedFlag, $context->deliveryOptions->isHighestPriority());
}

public static function isHighestPriorityOptionProvider(): \Generator
{
yield 'true option results in true flag' => ['expectedFlag' => true, 'optionFlag' => true];
yield 'false option results in false flag' => ['expectedFlag' => false, 'optionFlag' => false];
yield 'null option results in default true flag' => ['expectedFlag' => true, 'optionFlag' => null];
}
}
75 changes: 75 additions & 0 deletions tests/Module/SchedulerModuleIsHighestPriorityTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

declare(strict_types=1);

namespace Module;

use PHPUnit\Framework\TestCase;
use ServiceBus\Scheduler\Emitter\RabbitMQEmitter;
use ServiceBus\Scheduler\Emitter\SchedulerEmitter;
use ServiceBus\Scheduler\Module\SchedulerModule;
use ServiceBus\Scheduler\Store\SqlSchedulerStore;
use ServiceBus\Storage\Common\DatabaseAdapter;
use ServiceBus\Storage\Common\StorageConfiguration;
use ServiceBus\Storage\Sql\DoctrineDBAL\DoctrineDBALAdapter;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;

final class SchedulerModuleIsHighestPriorityTest extends TestCase
{
private ContainerBuilder $containerBuilder;

protected function setUp(): void
{
$containerBuilder = new ContainerBuilder();
$containerBuilder->addDefinitions([
DatabaseAdapter::class => new Definition(
DoctrineDBALAdapter::class,
[
new StorageConfiguration('sqlite:///:memory:'),
],
),
]);

$this->containerBuilder = $containerBuilder;
}

/**
* To keep thing backward compatible for the clients who use default highestPriority = true
*
* @test
*/
public function highestPriorityIsDefault(): void
{
$module = SchedulerModule::rabbitMqWithSqlStorage(DatabaseAdapter::class);
$module->boot($this->containerBuilder);

$this->assertIsHighestPriority(true);
}

/**
* @test
*/
public function highestPriorityCanBeOverriden(): void
{
$module = SchedulerModule::rabbitMqWithSqlStorage(DatabaseAdapter::class);
$module->boot($this->containerBuilder);

$this->containerBuilder->setParameter('service_bus.scheduler.deliver_with_highest_priority', false);

$this->assertIsHighestPriority(false);
}

private function assertIsHighestPriority(?bool $isHighestPriority): void
{
self::assertEquals(
new RabbitMQEmitter(
new SqlSchedulerStore(
new DoctrineDBALAdapter(new StorageConfiguration('sqlite:///:memory:')),
),
$isHighestPriority,
),
$this->containerBuilder->get(SchedulerEmitter::class),
);
}
}
Loading