diff --git a/config/di.php b/config/di.php index 235061aa..ea0f54b3 100644 --- a/config/di.php +++ b/config/di.php @@ -6,6 +6,7 @@ use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Cli\SignalLoop; use Yiisoft\Queue\Cli\SimpleLoop; +use Yiisoft\Queue\Command\ListenAllCommand; use Yiisoft\Queue\Command\RunCommand; use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume; @@ -58,4 +59,9 @@ 'channels' => array_keys($params['yiisoft/yii-queue']['channel-definitions']), ], ], + ListenAllCommand::class => [ + '__construct()' => [ + 'channels' => array_keys($params['yiisoft/yii-queue']['channel-definitions']), + ], + ], ]; diff --git a/config/params.php b/config/params.php index 9264388a..1251c408 100644 --- a/config/params.php +++ b/config/params.php @@ -3,6 +3,7 @@ declare(strict_types=1); use Yiisoft\Queue\Adapter\AdapterInterface; +use Yiisoft\Queue\Command\ListenAllCommand; use Yiisoft\Queue\Command\ListenCommand; use Yiisoft\Queue\Command\RunCommand; use Yiisoft\Queue\Debug\QueueCollector; @@ -16,6 +17,7 @@ 'commands' => [ 'queue:run' => RunCommand::class, 'queue:listen' => ListenCommand::class, + 'queue:listen:all' => ListenAllCommand::class, ], ], 'yiisoft/queue' => [ diff --git a/src/Command/ListenAllCommand.php b/src/Command/ListenAllCommand.php new file mode 100644 index 00000000..f3d522d8 --- /dev/null +++ b/src/Command/ListenAllCommand.php @@ -0,0 +1,82 @@ +addArgument( + 'channel', + InputArgument::OPTIONAL | InputArgument::IS_ARRAY, + 'Queue channel name list to connect to', + $this->channels, + ) + ->addOption( + 'pause', + 'p', + InputOption::VALUE_REQUIRED, + 'Pause between queue channel iterations in seconds. May save some CPU. Default: 1', + 1, + ) + ->addOption( + 'maximum', + 'm', + InputOption::VALUE_REQUIRED, + 'Maximum number of messages to process in each channel before switching to another channel. ' . + 'Default is 0 (no limits).', + 0, + ); + + $this->addUsage('[channel1 [channel2 [...]]] [--timeout=] [--maximum=]'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $queues = []; + /** @var string $channel */ + foreach ($input->getArgument('channel') as $channel) { + $queues[] = $this->queueFactory->get($channel); + } + + while ($this->loop->canContinue()) { + $hasMessages = false; + foreach ($queues as $queue) { + $hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages; + } + + if (!$hasMessages) { + $pauseSeconds = (int)$input->getOption('pause'); + if ($pauseSeconds < 0) { + $pauseSeconds = 1; + } + + /** @psalm-var 0|positive-int $pauseSeconds */ + sleep($pauseSeconds); + } + } + + return 0; + } +}