Skip to content

Commit

Permalink
Improve run command (#168)
Browse files Browse the repository at this point in the history
* Allow to process messages from different channels

* Add "maximum" option to the "run" console command

* Improve messages

* Fix test

* Apply fixes from StyleCI

* Fix typecasting bug

---------

Co-authored-by: StyleCI Bot <bot@styleci.io>
  • Loading branch information
viktorprogger and StyleCIBot committed Jan 18, 2024
1 parent 8498f59 commit 9ce1388
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 19 deletions.
6 changes: 6 additions & 0 deletions config/di.php
Expand Up @@ -6,6 +6,7 @@
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Cli\SignalLoop;
use Yiisoft\Queue\Cli\SimpleLoop;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume;
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface;
Expand Down Expand Up @@ -52,4 +53,9 @@
FailureMiddlewareDispatcher::class => [
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-fail']],
],
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/yii-queue']['channel-definitions']),
],
],
];
5 changes: 4 additions & 1 deletion config/params.php
Expand Up @@ -2,6 +2,7 @@

declare(strict_types=1);

use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Command\ListenCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Debug\QueueCollector;
Expand All @@ -19,7 +20,9 @@
],
'yiisoft/queue' => [
'handlers' => [],
'channel-definitions' => [],
'channel-definitions' => [
QueueFactoryInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
],
'middlewares-push' => [],
'middlewares-consume' => [],
'middlewares-fail' => [],
Expand Down
35 changes: 25 additions & 10 deletions src/Command/RunCommand.php
Expand Up @@ -7,16 +7,17 @@
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;

final class RunCommand extends Command
{
protected static $defaultName = 'queue:run';
protected static $defaultDescription = 'Runs all the existing messages in the queue. Exits once messages are over.';
protected static $defaultDescription = 'Runs all the existing messages in the given queues. ' .
'Exits once messages are over.';

public function __construct(private QueueFactoryInterface $queueFactory)
public function __construct(private QueueFactoryInterface $queueFactory, private array $channels)
{
parent::__construct();
}
Expand All @@ -25,17 +26,31 @@ public function configure(): void
{
$this->addArgument(
'channel',
InputArgument::OPTIONAL,
'Queue channel name to connect to',
QueueFactory::DEFAULT_CHANNEL_NAME
);
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to.',
$this->channels,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel. Default is 0 (no limits).',
0,

Check warning on line 38 in src/Command/RunCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "DecrementInteger": --- Original +++ New @@ @@ } public function configure() : void { - $this->addArgument('channel', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, 'Queue channel name list to connect to.', $this->channels)->addOption('maximum', 'm', InputOption::VALUE_REQUIRED, 'Maximum number of messages to process in each channel. Default is 0 (no limits).', 0)->addUsage('[channel1 [channel2 [...]]] --maximum 100'); + $this->addArgument('channel', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, 'Queue channel name list to connect to.', $this->channels)->addOption('maximum', 'm', InputOption::VALUE_REQUIRED, 'Maximum number of messages to process in each channel. Default is 0 (no limits).', -1)->addUsage('[channel1 [channel2 [...]]] --maximum 100'); } protected function execute(InputInterface $input, OutputInterface $output) : int {

Check warning on line 38 in src/Command/RunCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "IncrementInteger": --- Original +++ New @@ @@ } public function configure() : void { - $this->addArgument('channel', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, 'Queue channel name list to connect to.', $this->channels)->addOption('maximum', 'm', InputOption::VALUE_REQUIRED, 'Maximum number of messages to process in each channel. Default is 0 (no limits).', 0)->addUsage('[channel1 [channel2 [...]]] --maximum 100'); + $this->addArgument('channel', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, 'Queue channel name list to connect to.', $this->channels)->addOption('maximum', 'm', InputOption::VALUE_REQUIRED, 'Maximum number of messages to process in each channel. Default is 0 (no limits).', 1)->addUsage('[channel1 [channel2 [...]]] --maximum 100'); } protected function execute(InputInterface $input, OutputInterface $output) : int {
)
->addUsage('[channel1 [channel2 [...]]] --maximum 100');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->queueFactory
->get($input->getArgument('channel'))
->run();
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$output->write("Processing channel $channel... ");

Check warning on line 47 in src/Command/RunCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ { /** @var string $channel */ foreach ($input->getArgument('channel') as $channel) { - $output->write("Processing channel {$channel}... "); + $count = $this->queueFactory->get($channel)->run((int) $input->getOption('maximum')); $output->writeln("Messages processed: {$count}."); }
$count = $this->queueFactory
->get($channel)
->run((int)$input->getOption('maximum'));

$output->writeln("Messages processed: $count.");

Check warning on line 52 in src/Command/RunCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ foreach ($input->getArgument('channel') as $channel) { $output->write("Processing channel {$channel}... "); $count = $this->queueFactory->get($channel)->run((int) $input->getOption('maximum')); - $output->writeln("Messages processed: {$count}."); + } return 0; } }
}

return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Debug/QueueDecorator.php
Expand Up @@ -35,9 +35,9 @@ public function push(
return $message;
}

public function run(int $max = 0): void
public function run(int $max = 0): int

Check warning on line 38 in src/Debug/QueueDecorator.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "DecrementInteger": --- Original +++ New @@ @@ $this->collector->collectPush($this->queue->getChannelName(), $message, ...$middlewareDefinitions); return $message; } - public function run(int $max = 0) : int + public function run(int $max = -1) : int { return $this->queue->run($max); }

Check warning on line 38 in src/Debug/QueueDecorator.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "IncrementInteger": --- Original +++ New @@ @@ $this->collector->collectPush($this->queue->getChannelName(), $message, ...$middlewareDefinitions); return $message; } - public function run(int $max = 0) : int + public function run(int $max = 1) : int { return $this->queue->run($max); }
{
$this->queue->run($max);
return $this->queue->run($max);
}

public function listen(): void
Expand Down
Expand Up @@ -101,7 +101,11 @@ private function getDelay(MessageInterface $message): float
$meta = $message->getMetadata();
$key = self::META_KEY_DELAY . "-$this->id";

$delayOriginal = (float) ($meta[$key] ?? 0 ?: $this->delayInitial);
$delayOriginal = (float) ($meta[$key] ?? 0);
if ($delayOriginal <= 0) {
$delayOriginal = $this->delayInitial;
}

$result = $delayOriginal * $this->exponent;

return min($result, $this->delayMaximum);
Expand Down
4 changes: 3 additions & 1 deletion src/Queue.php
Expand Up @@ -67,7 +67,7 @@ public function push(
return $message;
}

public function run(int $max = 0): void
public function run(int $max = 0): int
{
$this->checkAdapter();

Expand All @@ -90,6 +90,8 @@ public function run(int $max = 0): void
'Processed {count} queue messages.',
['count' => $count]
);

return $count;
}

public function listen(): void
Expand Down
4 changes: 3 additions & 1 deletion src/QueueInterface.php
Expand Up @@ -30,8 +30,10 @@ public function push(MessageInterface $message, MiddlewarePushInterface|callable
* Execute all existing jobs and exit
*
* @param int $max
*
* @return int How many messages were processed
*/
public function run(int $max = 0): void;
public function run(int $max = 0): int;

/**
* Listen to the queue and execute jobs as they come
Expand Down
3 changes: 2 additions & 1 deletion tests/App/DummyQueue.php
Expand Up @@ -24,8 +24,9 @@ public function push(
return $message;
}

public function run(int $max = 0): void
public function run(int $max = 0): int
{
throw new Exception('`run()` method is not implemented yet.');
}

public function listen(): void
Expand Down
4 changes: 2 additions & 2 deletions tests/Unit/Command/RunCommandTest.php
Expand Up @@ -15,7 +15,7 @@ final class RunCommandTest extends TestCase
{
public function testConfigure(): void
{
$command = new RunCommand($this->createMock(QueueFactoryInterface::class));
$command = new RunCommand($this->createMock(QueueFactoryInterface::class), []);
$channelArgument = $command->getNativeDefinition()->getArgument('channel');
$this->assertEquals('channel', $channelArgument->getName());
}
Expand All @@ -28,7 +28,7 @@ public function testExecute(): void
$queueFactory->method('get')->willReturn($queue);
$input = new StringInput('channel');

$command = new RunCommand($queueFactory);
$command = new RunCommand($queueFactory, []);
$exitCode = $command->run($input, $this->createMock(OutputInterface::class));

$this->assertEquals(0, $exitCode);
Expand Down

0 comments on commit 9ce1388

Please sign in to comment.