diff --git a/CHANGELOG.md b/CHANGELOG.md index a2f94de6..b2bc7875 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ CHANGELOG 4.3.0 ----- + * Added new `messenger:stop-workers` command that sends a signal + to stop all `messenger:consume` workers. * [BC BREAK] The `TransportFactoryInterface::createTransport()` signature changed: a required 3rd `SerializerInterface` argument was added. * Added a new `SyncTransport` along with `ForceCallHandlersStamp` to diff --git a/Command/ConsumeMessagesCommand.php b/Command/ConsumeMessagesCommand.php index bd3a7a17..dea3a204 100644 --- a/Command/ConsumeMessagesCommand.php +++ b/Command/ConsumeMessagesCommand.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Command; +use Psr\Cache\CacheItemPoolInterface; use Psr\Container\ContainerInterface; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Command\Command; @@ -25,6 +26,7 @@ use Symfony\Component\Messenger\Worker; use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker; use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker; +use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived; use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; @@ -43,6 +45,8 @@ class ConsumeMessagesCommand extends Command private $receiverNames; private $retryStrategyLocator; private $eventDispatcher; + /** @var CacheItemPoolInterface|null */ + private $restartSignalCachePool; public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null) { @@ -62,6 +66,11 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $ parent::__construct(); } + public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool) + { + $this->restartSignalCachePool = $restartSignalCachePool; + } + /** * {@inheritdoc} */ @@ -190,6 +199,11 @@ protected function execute(InputInterface $input, OutputInterface $output): void $worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger); } + if (null !== $this->restartSignalCachePool) { + $stopsWhen[] = 'received a stop signal via the messenger:stop-workers command'; + $worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger); + } + $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); $io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames))); diff --git a/Command/StopWorkersCommand.php b/Command/StopWorkersCommand.php new file mode 100644 index 00000000..afb2ce0d --- /dev/null +++ b/Command/StopWorkersCommand.php @@ -0,0 +1,73 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Command; + +use Psr\Cache\CacheItemPoolInterface; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived; + +/** + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class StopWorkersCommand extends Command +{ + protected static $defaultName = 'messenger:stop-workers'; + + private $restartSignalCachePool; + + public function __construct(CacheItemPoolInterface $restartSignalCachePool) + { + $this->restartSignalCachePool = $restartSignalCachePool; + + parent::__construct(); + } + + /** + * {@inheritdoc} + */ + protected function configure(): void + { + $this + ->setDefinition([]) + ->setDescription('Stops workers after their current message') + ->setHelp(<<<'EOF' +The %command.name% command sends a signal to stop any messenger:consume processes that are running. + + php %command.full_name% + +Each worker command will finish the message they are currently processing +and then exit. Worker commands are *not* automatically restarted: that +should be handled by something like supervisord. +EOF + ) + ; + } + + /** + * {@inheritdoc} + */ + protected function execute(InputInterface $input, OutputInterface $output): void + { + $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); + + $cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY); + $cacheItem->set(time()); + $this->restartSignalCachePool->save($cacheItem); + + $io->success('Signal successfully sent to stop any running workers.'); + } +} diff --git a/Tests/Command/StopWorkersCommandTest.php b/Tests/Command/StopWorkersCommandTest.php new file mode 100644 index 00000000..fd5ddae2 --- /dev/null +++ b/Tests/Command/StopWorkersCommandTest.php @@ -0,0 +1,35 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Command; + +use PHPUnit\Framework\TestCase; +use Psr\Cache\CacheItemInterface; +use Psr\Cache\CacheItemPoolInterface; +use Symfony\Component\Console\Tester\CommandTester; +use Symfony\Component\Messenger\Command\StopWorkersCommand; + +class StopWorkersCommandTest extends TestCase +{ + public function testItSetsCacheItem() + { + $cachePool = $this->createMock(CacheItemPoolInterface::class); + $cacheItem = $this->createMock(CacheItemInterface::class); + $cacheItem->expects($this->once())->method('set'); + $cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem); + $cachePool->expects($this->once())->method('save')->with($cacheItem); + + $command = new StopWorkersCommand($cachePool); + + $tester = new CommandTester($command); + $tester->execute([]); + } +} diff --git a/Tests/Worker/StopWhenRestartSignalIsReceivedTest.php b/Tests/Worker/StopWhenRestartSignalIsReceivedTest.php new file mode 100644 index 00000000..a5a4937f --- /dev/null +++ b/Tests/Worker/StopWhenRestartSignalIsReceivedTest.php @@ -0,0 +1,71 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Worker; + +use PHPUnit\Framework\TestCase; +use Psr\Cache\CacheItemInterface; +use Psr\Cache\CacheItemPoolInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; +use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived; + +/** + * @group time-sensitive + */ +class StopWhenRestartSignalIsReceivedTest extends TestCase +{ + /** + * @dataProvider restartTimeProvider + */ + public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop) + { + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + ]); + + $cachePool = $this->createMock(CacheItemPoolInterface::class); + $cacheItem = $this->createMock(CacheItemInterface::class); + $cacheItem->expects($this->once())->method('isHIt')->willReturn(true); + $cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset); + $cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem); + + $stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool); + $stopOnSignalWorker->run(); + + $this->assertSame($shouldStop, $decoratedWorker->isStopped()); + } + + public function restartTimeProvider() + { + yield [null, false]; // no cached restart time, do not restart + yield [+10, true]; // 10 seconds after starting, a restart was requested + yield [-10, false]; // a restart was requested, but 10 seconds before we started + } + + public function testWorkerDoesNotStopIfRestartNotInCache() + { + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + ]); + + $cachePool = $this->createMock(CacheItemPoolInterface::class); + $cacheItem = $this->createMock(CacheItemInterface::class); + $cacheItem->expects($this->once())->method('isHIt')->willReturn(false); + $cacheItem->expects($this->never())->method('get'); + $cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem); + + $stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool); + $stopOnSignalWorker->run(); + + $this->assertFalse($decoratedWorker->isStopped()); + } +} diff --git a/Worker/StopWhenRestartSignalIsReceived.php b/Worker/StopWhenRestartSignalIsReceived.php new file mode 100644 index 00000000..63f6ea04 --- /dev/null +++ b/Worker/StopWhenRestartSignalIsReceived.php @@ -0,0 +1,72 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Worker; + +use Psr\Cache\CacheItemPoolInterface; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\WorkerInterface; + +/** + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class StopWhenRestartSignalIsReceived implements WorkerInterface +{ + public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp'; + + private $decoratedWorker; + private $cachePool; + private $logger; + + public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null) + { + $this->decoratedWorker = $decoratedWorker; + $this->cachePool = $cachePool; + $this->logger = $logger; + } + + public function run(array $options = [], callable $onHandledCallback = null): void + { + $workerStartedTimestamp = time(); + + $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedTimestamp) { + if (null !== $onHandledCallback) { + $onHandledCallback($envelope); + } + + if ($this->shouldRestart($workerStartedTimestamp)) { + $this->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped because a restart was requested.'); + } + } + }); + } + + public function stop(): void + { + $this->decoratedWorker->stop(); + } + + private function shouldRestart(int $workerStartedAt) + { + $cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY); + if (!$cacheItem->isHit()) { + // no restart has ever been scheduled + return false; + } + + return $workerStartedAt < $cacheItem->get(); + } +}