Skip to content
Permalink
Browse files

[Messenger] New messenger:stop-workers Command

  • Loading branch information...
weaverryan authored and fabpot committed Mar 28, 2019
1 parent 88042a3 commit 58971627f5e03a667d682bb2cf5f9c53f4d3eded
@@ -36,6 +36,10 @@
<tag name="cache.pool" />
</service>

<service id="cache.messenger.restart_workers_signal" parent="cache.app" public="false">
<tag name="cache.pool" />
</service>

<service id="cache.adapter.system" class="Symfony\Component\Cache\Adapter\AdapterInterface" abstract="true">
<factory class="Symfony\Component\Cache\Adapter\AbstractAdapter" method="createSystemCache" />
<tag name="cache.pool" clearer="cache.system_clearer" />
@@ -83,6 +83,9 @@
<argument type="collection" /> <!-- Receiver names -->
<argument type="service" id="messenger.retry_strategy_locator" />
<argument type="service" id="event_dispatcher" />
<call method="setCachePoolForRestartSignal">
<argument type="service" id="cache.messenger.restart_workers_signal" />
</call>

<tag name="console.command" command="messenger:consume" />
<tag name="console.command" command="messenger:consume-messages" />
@@ -101,6 +104,11 @@
<tag name="console.command" command="debug:messenger" />
</service>

<service id="console.command.messenger_stop_workers" class="Symfony\Component\Messenger\Command\StopWorkersCommand">
<argument type="service" id="cache.messenger.restart_workers_signal" />
<tag name="console.command" command="messenger:stop-workers" />
</service>

<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
<argument type="service" id="router" />
<tag name="console.command" command="debug:router" />
@@ -4,6 +4,8 @@ CHANGELOG
4.3.0
-----

* Added new `messenger:stop-workers` command that sends a signal
to stop all `messenger:consume` workers.
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
@@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Command;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
@@ -25,6 +26,7 @@
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@@ -43,6 +45,8 @@ class ConsumeMessagesCommand extends Command
private $receiverNames;
private $retryStrategyLocator;
private $eventDispatcher;
/** @var CacheItemPoolInterface|null */
private $restartSignalCachePool;
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
{
@@ -62,6 +66,11 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
parent::__construct();
}
public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;
}
/**
* {@inheritdoc}
*/
@@ -190,6 +199,11 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}
if (null !== $this->restartSignalCachePool) {
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
$worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
@@ -0,0 +1,73 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Command;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class StopWorkersCommand extends Command
{
protected static $defaultName = 'messenger:stop-workers';
private $restartSignalCachePool;
public function __construct(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([])
->setDescription('Stops workers after their current message')
->setHelp(<<<'EOF'
The <info>%command.name%</info> command sends a signal to stop any <info>messenger:consume</info> processes that are running.
<info>php %command.full_name%</info>
Each worker command will finish the message they are currently processing
and then exit. Worker commands are *not* automatically restarted: that
should be handled by something like supervisord.
EOF
)
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): void
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem->set(time());
$this->restartSignalCachePool->save($cacheItem);
$io->success('Signal successfully sent to stop any running workers.');
}
}
@@ -0,0 +1,35 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Psr\Cache\CacheItemInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Messenger\Command\StopWorkersCommand;
class StopWorkersCommandTest extends TestCase
{
public function testItSetsCacheItem()
{
$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('set');
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
$cachePool->expects($this->once())->method('save')->with($cacheItem);
$command = new StopWorkersCommand($cachePool);
$tester = new CommandTester($command);
$tester->execute([]);
}
}
@@ -0,0 +1,71 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Worker;
use PHPUnit\Framework\TestCase;
use Psr\Cache\CacheItemInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
/**
* @group time-sensitive
*/
class StopWhenRestartSignalIsReceivedTest extends TestCase
{
/**
* @dataProvider restartTimeProvider
*/
public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop)
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('isHIt')->willReturn(true);
$cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset);
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
$stopOnSignalWorker->run();
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
}
public function restartTimeProvider()
{
yield [null, false]; // no cached restart time, do not restart
yield [+10, true]; // 10 seconds after starting, a restart was requested
yield [-10, false]; // a restart was requested, but 10 seconds before we started
}
public function testWorkerDoesNotStopIfRestartNotInCache()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);
$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('isHIt')->willReturn(false);
$cacheItem->expects($this->never())->method('get');
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
$stopOnSignalWorker->run();
$this->assertFalse($decoratedWorker->isStopped());
}
}
@@ -0,0 +1,72 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Worker;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class StopWhenRestartSignalIsReceived implements WorkerInterface
{
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp';
private $decoratedWorker;
private $cachePool;
private $logger;
public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->cachePool = $cachePool;
$this->logger = $logger;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
$workerStartedTimestamp = time();
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedTimestamp) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}
if ($this->shouldRestart($workerStartedTimestamp)) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped because a restart was requested.');
}
}
});
}
public function stop(): void
{
$this->decoratedWorker->stop();
}
private function shouldRestart(int $workerStartedAt)
{
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
if (!$cacheItem->isHit()) {
// no restart has ever been scheduled
return false;
}
return $workerStartedAt < $cacheItem->get();
}
}

0 comments on commit 5897162

Please sign in to comment.
You can’t perform that action at this time.