Skip to content

Commit

Permalink
feature #30754 [Messenger] New messenger:stop-workers Command (weaver…
Browse files Browse the repository at this point in the history
…ryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30754).

Discussion
----------

[Messenger] New messenger:stop-workers Command

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Kinda of #29451
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

o/ me again.

This requires and is built on top of #30708

When you deploy, all workers need to be stopped and restarted. That's not currently possible, unless you manually track the pids and send a SIGTERM signal. We can make that much easier :).

Now run:

```
bin/console messenger:stop-workers
```

And it will signal to all workers (even if they're distributed on other servers) that they should stop, once they finish processing their current message. This is done via a key in `cache.app`.

Cheers!

Commits
-------

58971627f5 [Messenger] New messenger:stop-workers Command
  • Loading branch information
fabpot committed Mar 31, 2019
2 parents 8c68e43 + 9686401 commit 1fc6038
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ CHANGELOG
second argument.
* [BC BREAK] The `MessageBusInterface::dispatch()` signature changed:
a second argument `array $stamps = []` was added.
* Added new `messenger:stop-workers` command that sends a signal
to stop all `messenger:consume` workers.
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
Expand Down
14 changes: 14 additions & 0 deletions Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\Command;

use Psr\Cache\CacheItemPoolInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
Expand All @@ -25,6 +26,7 @@
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

Expand All @@ -43,6 +45,8 @@ class ConsumeMessagesCommand extends Command
private $receiverNames;
private $retryStrategyLocator;
private $eventDispatcher;
/** @var CacheItemPoolInterface|null */
private $restartSignalCachePool;

public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
{
Expand All @@ -62,6 +66,11 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
parent::__construct();
}

public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -190,6 +199,11 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}

if (null !== $this->restartSignalCachePool) {
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
$worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
}

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));

Expand Down
73 changes: 73 additions & 0 deletions Command/StopWorkersCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Command;

use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;

/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class StopWorkersCommand extends Command
{
protected static $defaultName = 'messenger:stop-workers';

private $restartSignalCachePool;

public function __construct(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;

parent::__construct();
}

/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([])
->setDescription('Stops workers after their current message')
->setHelp(<<<'EOF'
The <info>%command.name%</info> command sends a signal to stop any <info>messenger:consume</info> processes that are running.
<info>php %command.full_name%</info>
Each worker command will finish the message they are currently processing
and then exit. Worker commands are *not* automatically restarted: that
should be handled by something like supervisord.
EOF
)
;
}

/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): void
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);

$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem->set(time());
$this->restartSignalCachePool->save($cacheItem);

$io->success('Signal successfully sent to stop any running workers.');
}
}
35 changes: 35 additions & 0 deletions Tests/Command/StopWorkersCommandTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Command;

use PHPUnit\Framework\TestCase;
use Psr\Cache\CacheItemInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Messenger\Command\StopWorkersCommand;

class StopWorkersCommandTest extends TestCase
{
public function testItSetsCacheItem()
{
$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('set');
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
$cachePool->expects($this->once())->method('save')->with($cacheItem);

$command = new StopWorkersCommand($cachePool);

$tester = new CommandTester($command);
$tester->execute([]);
}
}
71 changes: 71 additions & 0 deletions Tests/Worker/StopWhenRestartSignalIsReceivedTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Worker;

use PHPUnit\Framework\TestCase;
use Psr\Cache\CacheItemInterface;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;

/**
* @group time-sensitive
*/
class StopWhenRestartSignalIsReceivedTest extends TestCase
{
/**
* @dataProvider restartTimeProvider
*/
public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop)
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);

$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('isHIt')->willReturn(true);
$cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset);
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);

$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
$stopOnSignalWorker->run();

$this->assertSame($shouldStop, $decoratedWorker->isStopped());
}

public function restartTimeProvider()
{
yield [null, false]; // no cached restart time, do not restart
yield [+10, true]; // 10 seconds after starting, a restart was requested
yield [-10, false]; // a restart was requested, but 10 seconds before we started
}

public function testWorkerDoesNotStopIfRestartNotInCache()
{
$decoratedWorker = new DummyWorker([
new Envelope(new \stdClass()),
]);

$cachePool = $this->createMock(CacheItemPoolInterface::class);
$cacheItem = $this->createMock(CacheItemInterface::class);
$cacheItem->expects($this->once())->method('isHIt')->willReturn(false);
$cacheItem->expects($this->never())->method('get');
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);

$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
$stopOnSignalWorker->run();

$this->assertFalse($decoratedWorker->isStopped());
}
}
72 changes: 72 additions & 0 deletions Worker/StopWhenRestartSignalIsReceived.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Worker;

use Psr\Cache\CacheItemPoolInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\WorkerInterface;

/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class StopWhenRestartSignalIsReceived implements WorkerInterface
{
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp';

private $decoratedWorker;
private $cachePool;
private $logger;

public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
{
$this->decoratedWorker = $decoratedWorker;
$this->cachePool = $cachePool;
$this->logger = $logger;
}

public function run(array $options = [], callable $onHandledCallback = null): void
{
$workerStartedTimestamp = time();

$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedTimestamp) {
if (null !== $onHandledCallback) {
$onHandledCallback($envelope);
}

if ($this->shouldRestart($workerStartedTimestamp)) {
$this->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped because a restart was requested.');
}
}
});
}

public function stop(): void
{
$this->decoratedWorker->stop();
}

private function shouldRestart(int $workerStartedAt)
{
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
if (!$cacheItem->isHit()) {
// no restart has ever been scheduled
return false;
}

return $workerStartedAt < $cacheItem->get();
}
}

0 comments on commit 1fc6038

Please sign in to comment.