-
-
Notifications
You must be signed in to change notification settings - Fork 84
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Messenger] New messenger:stop-workers Command
- Loading branch information
1 parent
2cb5b5b
commit 9686401
Showing
6 changed files
with
267 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Command; | ||
|
||
use Psr\Cache\CacheItemPoolInterface; | ||
use Symfony\Component\Console\Command\Command; | ||
use Symfony\Component\Console\Input\InputInterface; | ||
use Symfony\Component\Console\Output\OutputInterface; | ||
use Symfony\Component\Console\Style\SymfonyStyle; | ||
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived; | ||
|
||
/** | ||
* @author Ryan Weaver <ryan@symfonycasts.com> | ||
* | ||
* @experimental in 4.3 | ||
*/ | ||
class StopWorkersCommand extends Command | ||
{ | ||
protected static $defaultName = 'messenger:stop-workers'; | ||
|
||
private $restartSignalCachePool; | ||
|
||
public function __construct(CacheItemPoolInterface $restartSignalCachePool) | ||
{ | ||
$this->restartSignalCachePool = $restartSignalCachePool; | ||
|
||
parent::__construct(); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
protected function configure(): void | ||
{ | ||
$this | ||
->setDefinition([]) | ||
->setDescription('Stops workers after their current message') | ||
->setHelp(<<<'EOF' | ||
The <info>%command.name%</info> command sends a signal to stop any <info>messenger:consume</info> processes that are running. | ||
<info>php %command.full_name%</info> | ||
Each worker command will finish the message they are currently processing | ||
and then exit. Worker commands are *not* automatically restarted: that | ||
should be handled by something like supervisord. | ||
EOF | ||
) | ||
; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
protected function execute(InputInterface $input, OutputInterface $output): void | ||
{ | ||
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); | ||
|
||
$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY); | ||
$cacheItem->set(time()); | ||
$this->restartSignalCachePool->save($cacheItem); | ||
|
||
$io->success('Signal successfully sent to stop any running workers.'); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Tests\Command; | ||
|
||
use PHPUnit\Framework\TestCase; | ||
use Psr\Cache\CacheItemInterface; | ||
use Psr\Cache\CacheItemPoolInterface; | ||
use Symfony\Component\Console\Tester\CommandTester; | ||
use Symfony\Component\Messenger\Command\StopWorkersCommand; | ||
|
||
class StopWorkersCommandTest extends TestCase | ||
{ | ||
public function testItSetsCacheItem() | ||
{ | ||
$cachePool = $this->createMock(CacheItemPoolInterface::class); | ||
$cacheItem = $this->createMock(CacheItemInterface::class); | ||
$cacheItem->expects($this->once())->method('set'); | ||
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem); | ||
$cachePool->expects($this->once())->method('save')->with($cacheItem); | ||
|
||
$command = new StopWorkersCommand($cachePool); | ||
|
||
$tester = new CommandTester($command); | ||
$tester->execute([]); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Tests\Worker; | ||
|
||
use PHPUnit\Framework\TestCase; | ||
use Psr\Cache\CacheItemInterface; | ||
use Psr\Cache\CacheItemPoolInterface; | ||
use Symfony\Component\Messenger\Envelope; | ||
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; | ||
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived; | ||
|
||
/** | ||
* @group time-sensitive | ||
*/ | ||
class StopWhenRestartSignalIsReceivedTest extends TestCase | ||
{ | ||
/** | ||
* @dataProvider restartTimeProvider | ||
*/ | ||
public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop) | ||
{ | ||
$decoratedWorker = new DummyWorker([ | ||
new Envelope(new \stdClass()), | ||
]); | ||
|
||
$cachePool = $this->createMock(CacheItemPoolInterface::class); | ||
$cacheItem = $this->createMock(CacheItemInterface::class); | ||
$cacheItem->expects($this->once())->method('isHIt')->willReturn(true); | ||
$cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset); | ||
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem); | ||
|
||
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool); | ||
$stopOnSignalWorker->run(); | ||
|
||
$this->assertSame($shouldStop, $decoratedWorker->isStopped()); | ||
} | ||
|
||
public function restartTimeProvider() | ||
{ | ||
yield [null, false]; // no cached restart time, do not restart | ||
yield [+10, true]; // 10 seconds after starting, a restart was requested | ||
yield [-10, false]; // a restart was requested, but 10 seconds before we started | ||
} | ||
|
||
public function testWorkerDoesNotStopIfRestartNotInCache() | ||
{ | ||
$decoratedWorker = new DummyWorker([ | ||
new Envelope(new \stdClass()), | ||
]); | ||
|
||
$cachePool = $this->createMock(CacheItemPoolInterface::class); | ||
$cacheItem = $this->createMock(CacheItemInterface::class); | ||
$cacheItem->expects($this->once())->method('isHIt')->willReturn(false); | ||
$cacheItem->expects($this->never())->method('get'); | ||
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem); | ||
|
||
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool); | ||
$stopOnSignalWorker->run(); | ||
|
||
$this->assertFalse($decoratedWorker->isStopped()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Worker; | ||
|
||
use Psr\Cache\CacheItemPoolInterface; | ||
use Psr\Log\LoggerInterface; | ||
use Symfony\Component\Messenger\Envelope; | ||
use Symfony\Component\Messenger\WorkerInterface; | ||
|
||
/** | ||
* @author Ryan Weaver <ryan@symfonycasts.com> | ||
* | ||
* @experimental in 4.3 | ||
*/ | ||
class StopWhenRestartSignalIsReceived implements WorkerInterface | ||
{ | ||
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp'; | ||
|
||
private $decoratedWorker; | ||
private $cachePool; | ||
private $logger; | ||
|
||
public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null) | ||
{ | ||
$this->decoratedWorker = $decoratedWorker; | ||
$this->cachePool = $cachePool; | ||
$this->logger = $logger; | ||
} | ||
|
||
public function run(array $options = [], callable $onHandledCallback = null): void | ||
{ | ||
$workerStartedTimestamp = time(); | ||
|
||
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedTimestamp) { | ||
if (null !== $onHandledCallback) { | ||
$onHandledCallback($envelope); | ||
} | ||
|
||
if ($this->shouldRestart($workerStartedTimestamp)) { | ||
$this->stop(); | ||
if (null !== $this->logger) { | ||
$this->logger->info('Worker stopped because a restart was requested.'); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
public function stop(): void | ||
{ | ||
$this->decoratedWorker->stop(); | ||
} | ||
|
||
private function shouldRestart(int $workerStartedAt) | ||
{ | ||
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY); | ||
if (!$cacheItem->isHit()) { | ||
// no restart has ever been scheduled | ||
return false; | ||
} | ||
|
||
return $workerStartedAt < $cacheItem->get(); | ||
} | ||
} |