Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports #30708

Merged
merged 1 commit into from Mar 30, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -11,8 +11,9 @@ CHANGELOG
to the `Envelope` then find the correct bus when receiving from
the transport. See `ConsumeMessagesCommand`.
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
`ack()` and `reject()`.
* [BC BREAK] 3 new methods were added to `ReceiverInterface`:
`ack()`, `reject()` and `get()`. The methods `receive()`
and `stop()` were removed.
* [BC BREAK] Error handling was moved from the receivers into
`Worker`. Implementations of `ReceiverInterface::handle()`
should now allow all exceptions to be thrown, except for transport
@@ -24,7 +25,9 @@ CHANGELOG
* The default command name for `ConsumeMessagesCommand` was
changed from `messenger:consume-messages` to `messenger:consume`
* `ConsumeMessagesCommand` has two new optional constructor arguments
* `Worker` has 4 new option constructor arguments.
* [BC BREAK] The first argument to Worker changed from a single
`ReceiverInterface` to an array of `ReceiverInterface`.
* `Worker` has 3 new optional constructor arguments.
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
receiver no longer needs to call this.
* The `AmqpSender` will now retry messages using a dead-letter exchange
@@ -19,12 +19,13 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
@@ -70,10 +71,11 @@ protected function configure(): void
$this
->setDefinition([
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
])
->setDescription('Consumes messages')
@@ -82,6 +84,10 @@ protected function configure(): void
<info>php %command.full_name% <receiver-name></info>
To receive from multiple transports, pass each name:
<info>php %command.full_name% receiver1 receiver2</info>
Use the --limit option to limit the number of messages received:
<info>php %command.full_name% <receiver-name> --limit=10</info>
@@ -111,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
if (null === $receiverName) {
$io->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
}
if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
if (\count($this->receiverNames) > 1) {
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
}
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
$question->setMultiselect(true);
$input->setArgument('receivers', $io->askQuestion($question));
}
if (0 === \count($input->getArgument('receivers'))) {
throw new RuntimeException('Please pass at least one receiver.');
}
}
@@ -135,41 +147,51 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$output->writeln(sprintf('<comment>%s</comment>', $message));
}
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
$receivers = [];
$retryStrategies = [];
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
}
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
throw new RuntimeException($message);
}
$receiver = $this->receiverLocator->get($receiverName);
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
}
if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
} else {
$bus = new RoutableMessageBus($this->busLocator);
}
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
}
if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
if ($stopsWhen) {
$last = array_pop($stopsWhen);
@@ -183,8 +205,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$worker->run();
$worker->run([
'sleep' => $input->getOption('sleep') * 1000000,
]);
}
private function convertToBytes(string $memoryLimit): int
@@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$inputArgument = $command->getDefinition()->getArgument('receivers');
$this->assertFalse($inputArgument->isRequired());
$this->assertSame('amqp', $inputArgument->getDefault());
}
public function testConfigurationWithoutDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertTrue($inputArgument->isRequired());
$this->assertNull($inputArgument->getDefault());
$this->assertSame(['amqp'], $inputArgument->getDefault());
}
}
@@ -612,11 +612,9 @@ public function __invoke(DummyMessage $message): void
class DummyReceiver implements ReceiverInterface
{
public function receive(callable $handler): void
public function get(): iterable
{
for ($i = 0; $i < 3; ++$i) {
$handler(new Envelope(new DummyMessage("Dummy $i")));
}
yield new Envelope(new DummyMessage('Dummy'));
}
public function stop(): void

This file was deleted.

@@ -0,0 +1,46 @@
<?php
namespace Symfony\Component\Messenger\Tests\Fixtures;
use Symfony\Component\Messenger\WorkerInterface;
class DummyWorker implements WorkerInterface
{
private $isStopped = false;
private $envelopesToReceive;
private $envelopesHandled = 0;
public function __construct(array $envelopesToReceive)
{
$this->envelopesToReceive = $envelopesToReceive;
}
public function run(array $options = [], callable $onHandledCallback = null): void
{
foreach ($this->envelopesToReceive as $envelope) {
if (true === $this->isStopped) {
break;
}
if ($onHandledCallback) {
$onHandledCallback($envelope);
++$this->envelopesHandled;
}
}
}
public function stop(): void
{
$this->isStopped = true;
}
public function isStopped(): bool
{
return $this->isStopped;
}
public function countEnvelopesHandled()
{
return $this->envelopesHandled;
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.