Skip to content

Commit

Permalink
Fix #143: Add new console command to listen to multiple queues (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorprogger committed Jan 19, 2024
1 parent 9ce1388 commit 4c918a1
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 0 deletions.
6 changes: 6 additions & 0 deletions config/di.php
Expand Up @@ -6,6 +6,7 @@
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Cli\SignalLoop;
use Yiisoft\Queue\Cli\SimpleLoop;
use Yiisoft\Queue\Command\ListenAllCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume;
Expand Down Expand Up @@ -58,4 +59,9 @@
'channels' => array_keys($params['yiisoft/yii-queue']['channel-definitions']),
],
],
ListenAllCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/yii-queue']['channel-definitions']),
],
],
];
2 changes: 2 additions & 0 deletions config/params.php
Expand Up @@ -3,6 +3,7 @@
declare(strict_types=1);

use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Command\ListenAllCommand;
use Yiisoft\Queue\Command\ListenCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Debug\QueueCollector;
Expand All @@ -16,6 +17,7 @@
'commands' => [
'queue:run' => RunCommand::class,
'queue:listen' => ListenCommand::class,
'queue:listen:all' => ListenAllCommand::class,
],
],
'yiisoft/queue' => [
Expand Down
82 changes: 82 additions & 0 deletions src/Command/ListenAllCommand.php
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Yii\Queue\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\QueueFactoryInterface;

final class ListenAllCommand extends Command
{
protected static $defaultName = 'queue:listen-all';
protected static $defaultDescription = 'Listens the all the given queues and executes messages as they come. ' .
'Meant to be used in development environment only. ' .
'Listens all configured queues by default in case you\'re using yiisoft/config. ' .
'Needs to be stopped manually.';

public function __construct(private QueueFactoryInterface $queueFactory, private LoopInterface $loop, private array $channels)
{
parent::__construct();
}

public function configure(): void
{
$this->addArgument(
'channel',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to',
$this->channels,
)
->addOption(
'pause',
'p',
InputOption::VALUE_REQUIRED,
'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
1,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. ' .
'Default is 0 (no limits).',
0,
);

$this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueFactory->get($channel);
}

while ($this->loop->canContinue()) {
$hasMessages = false;
foreach ($queues as $queue) {
$hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages;
}

if (!$hasMessages) {
$pauseSeconds = (int)$input->getOption('pause');
if ($pauseSeconds < 0) {
$pauseSeconds = 1;
}

/** @psalm-var 0|positive-int $pauseSeconds */
sleep($pauseSeconds);
}
}

return 0;
}
}

0 comments on commit 4c918a1

Please sign in to comment.