Skip to content
Permalink
Browse files

feature #30708 [Messenger] ReceiverInterface::handle() to get() & Wor…

…ker with prioritized transports (weaverryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30708).

Discussion
----------

[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | Helps with #30699
| License       | MIT
| Doc PR        | TODO

Highlights:

* `messenger:consume` can now consume messages from multiple transports with priority ❗️

```
bin/console messenger:consume amqp_high amqp_medium amqp_low
```

* How long you want to sleep before checking more messages is now an option to `messenger:consume`
* `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()`
* Logic for looping & sleeping is moved into `Worker`

Commits
-------

e800bd5 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
  • Loading branch information...
fabpot committed Mar 30, 2019
2 parents 2389d7c + e800bd5 commit b12351a7eb9ede70d14473e6760d845fde1aa19a
Showing with 917 additions and 804 deletions.
  1. +6 −3 src/Symfony/Component/Messenger/CHANGELOG.md
  2. +50 −27 src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
  3. +2 −10 src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
  4. +2 −4 src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
  5. +0 −48 src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php
  6. +46 −0 src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php
  7. +41 −72 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
  8. +10 −21 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
  9. +2 −5 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php
  10. +2 −2 src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php
  11. +0 −84 ...ymfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php
  12. +0 −103 ...mfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php
  13. +0 −49 src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php
  14. +71 −0 src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php
  15. +71 −0 src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php
  16. +44 −0 src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php
  17. +209 −46 src/Symfony/Component/Messenger/Tests/WorkerTest.php
  18. +23 −36 src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
  19. +2 −10 src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
  20. +13 −9 src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
  21. +0 −68 src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php
  22. +0 −65 src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php
  23. +0 −66 src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php
  24. +108 −76 src/Symfony/Component/Messenger/Worker.php
  25. +61 −0 src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php
  26. +58 −0 src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php
  27. +59 −0 src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php
  28. +37 −0 src/Symfony/Component/Messenger/WorkerInterface.php
@@ -11,8 +11,9 @@ CHANGELOG
to the `Envelope` then find the correct bus when receiving from
the transport. See `ConsumeMessagesCommand`.
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
`ack()` and `reject()`.
* [BC BREAK] 3 new methods were added to `ReceiverInterface`:
`ack()`, `reject()` and `get()`. The methods `receive()`
and `stop()` were removed.
* [BC BREAK] Error handling was moved from the receivers into
`Worker`. Implementations of `ReceiverInterface::handle()`
should now allow all exceptions to be thrown, except for transport
@@ -24,7 +25,9 @@ CHANGELOG
* The default command name for `ConsumeMessagesCommand` was
changed from `messenger:consume-messages` to `messenger:consume`
* `ConsumeMessagesCommand` has two new optional constructor arguments
* `Worker` has 4 new option constructor arguments.
* [BC BREAK] The first argument to Worker changed from a single
`ReceiverInterface` to an array of `ReceiverInterface`.
* `Worker` has 3 new optional constructor arguments.
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
receiver no longer needs to call this.
* The `AmqpSender` will now retry messages using a dead-letter exchange
@@ -19,12 +19,13 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
@@ -70,10 +71,11 @@ protected function configure(): void
$this
->setDefinition([
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
])
->setDescription('Consumes messages')
@@ -82,6 +84,10 @@ protected function configure(): void
<info>php %command.full_name% <receiver-name></info>
To receive from multiple transports, pass each name:
<info>php %command.full_name% receiver1 receiver2</info>
Use the --limit option to limit the number of messages received:
<info>php %command.full_name% <receiver-name> --limit=10</info>
@@ -111,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
if (null === $receiverName) {
$io->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
}
if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
if (\count($this->receiverNames) > 1) {
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
}
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
$question->setMultiselect(true);
$input->setArgument('receivers', $io->askQuestion($question));
}
if (0 === \count($input->getArgument('receivers'))) {
throw new RuntimeException('Please pass at least one receiver.');
}
}
@@ -135,41 +147,51 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$output->writeln(sprintf('<comment>%s</comment>', $message));
}
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
$receivers = [];
$retryStrategies = [];
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
}
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
throw new RuntimeException($message);
}
$receiver = $this->receiverLocator->get($receiverName);
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
}
if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
} else {
$bus = new RoutableMessageBus($this->busLocator);
}
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
}
if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
if ($stopsWhen) {
$last = array_pop($stopsWhen);
@@ -183,8 +205,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$worker->run();
$worker->run([
'sleep' => $input->getOption('sleep') * 1000000,
]);
}
private function convertToBytes(string $memoryLimit): int
@@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$inputArgument = $command->getDefinition()->getArgument('receivers');
$this->assertFalse($inputArgument->isRequired());
$this->assertSame('amqp', $inputArgument->getDefault());
}
public function testConfigurationWithoutDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertTrue($inputArgument->isRequired());
$this->assertNull($inputArgument->getDefault());
$this->assertSame(['amqp'], $inputArgument->getDefault());
}
}
@@ -612,11 +612,9 @@ public function __invoke(DummyMessage $message): void
class DummyReceiver implements ReceiverInterface
{
public function receive(callable $handler): void
public function get(): iterable
{
for ($i = 0; $i < 3; ++$i) {
$handler(new Envelope(new DummyMessage("Dummy $i")));
}
yield new Envelope(new DummyMessage('Dummy'));
}
public function stop(): void

This file was deleted.

@@ -0,0 +1,46 @@
<?php
namespace Symfony\Component\Messenger\Tests\Fixtures;
use Symfony\Component\Messenger\WorkerInterface;
class DummyWorker implements WorkerInterface
{
private $isStopped = false;
private $envelopesToReceive;
private $envelopesHandled = 0;
public function __construct(array $envelopesToReceive)
{
$this->envelopesToReceive = $envelopesToReceive;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
foreach ($this->envelopesToReceive as $envelope) {
if (true === $this->isStopped) {
break;
}
if ($onHandledCallback) {
$onHandledCallback($envelope);
++$this->envelopesHandled;
}
}
}
public function stop(): void
{
$this->isStopped = true;
}
public function isStopped(): bool
{
return $this->isStopped;
}
public function countEnvelopesHandled()

This comment has been minimized.

Copy link
@lucchese-pd
{
return $this->envelopesHandled;
}
}

0 comments on commit b12351a

Please sign in to comment.
You can’t perform that action at this time.