Skip to content

Commit

Permalink
Adding the ability to consume multiple transports in one Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
weaverryan committed Mar 28, 2019
1 parent bf89cd6 commit 9c9c4e0
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 102 deletions.
4 changes: 3 additions & 1 deletion src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -23,7 +23,9 @@ CHANGELOG
* The default command name for `ConsumeMessagesCommand` was
changed from `messenger:consume-messages` to `messenger:consume`
* `ConsumeMessagesCommand` has two new optional constructor arguments
* `Worker` has 4 new option constructor arguments.
* [BC BREAK] The first argument to Worker changed from a single
`ReceiverInterface` to an array of `ReceiverInterface`.
* `Worker` has 3 new optional constructor arguments.
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
receiver no longer needs to call this.
* The `AmqpSender` will now retry messages using a dead-letter exchange
Expand Down
62 changes: 41 additions & 21 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Expand Up @@ -19,6 +19,7 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Worker;
Expand Down Expand Up @@ -70,7 +71,7 @@ protected function configure(): void

$this
->setDefinition([
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
Expand All @@ -83,6 +84,10 @@ protected function configure(): void
<info>php %command.full_name% <receiver-name></info>
To receive from multiple transports, pass each name:
<info>php %command.full_name% receiver1 receiver2</info>
Use the --limit option to limit the number of messages received:
<info>php %command.full_name% <receiver-name> --limit=10</info>
Expand Down Expand Up @@ -112,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);

if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
if (null === $receiverName) {
$io->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
}
if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);

$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
if (\count($this->receiverNames) > 1) {
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
}

$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
$question->setMultiselect(true);

$input->setArgument('receivers', $io->askQuestion($question));
}

if (0 === \count($input->getArgument('receivers'))) {
throw new RuntimeException('Please pass at least one receiver.');
}
}

Expand All @@ -136,24 +147,33 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$output->writeln(sprintf('<comment>%s</comment>', $message));
}

if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
$receivers = [];
$retryStrategies = [];
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
}

if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
throw new RuntimeException($message);
}

$receiver = $this->receiverLocator->get($receiverName);
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}

$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
}

if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
} else {
$bus = new RoutableMessageBus($this->busLocator);
}

$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
Expand All @@ -171,7 +191,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
}

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));

if ($stopsWhen) {
$last = array_pop($stopsWhen);
Expand All @@ -186,7 +206,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
}

$worker->run([
'sleep' => $input->getOption('sleep') * 1000000
'sleep' => $input->getOption('sleep') * 1000000,
]);
}

Expand Down
Expand Up @@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$inputArgument = $command->getDefinition()->getArgument('receivers');
$this->assertFalse($inputArgument->isRequired());
$this->assertSame('amqp', $inputArgument->getDefault());
}

public function testConfigurationWithoutDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertTrue($inputArgument->isRequired());
$this->assertNull($inputArgument->getDefault());
$this->assertSame(['amqp'], $inputArgument->getDefault());
}
}
Expand Up @@ -15,7 +15,7 @@ public function __construct(array $envelopesToReceive)
$this->envelopesToReceive = $envelopesToReceive;
}

public function run(callable $onHandledCallback = null): void
public function run(array $options = [], callable $onHandledCallback = null): void
{
foreach ($this->envelopesToReceive as $envelope) {
if (true === $this->isStopped) {
Expand Down
Expand Up @@ -57,14 +57,20 @@ public function testItSendsAndReceivesMessages()
$sender->send($first = new Envelope(new DummyMessage('First')));
$sender->send($second = new Envelope(new DummyMessage('Second')));

$envelope = $receiver->get();
$envelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $envelopes);
/** @var Envelope $envelope */
$envelope = $envelopes[0];
$this->assertEquals($first->getMessage(), $envelope->getMessage());
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));

$envelope = $receiver->get();
$this->assertEquals($envelope->getMessage(), $envelope->getMessage());
$envelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $envelopes);
/** @var Envelope $envelope */
$envelope = $envelopes[0];
$this->assertEquals($second->getMessage(), $envelope->getMessage());

$this->assertNull($receiver->get());
$this->assertEmpty(iterator_to_array($receiver->get()));
}

public function testRetryAndDelay()
Expand All @@ -80,20 +86,26 @@ public function testRetryAndDelay()

$sender->send($first = new Envelope(new DummyMessage('First')));

$envelope = $receiver->get();
$envelopes = iterator_to_array($receiver->get());
/** @var Envelope $envelope */
$envelope = $envelopes[0];
$newEnvelope = $envelope
->with(new DelayStamp(2000))
->with(new RedeliveryStamp(1, 'not_important'));
$sender->send($newEnvelope);
$receiver->ack($envelope);

$envelope = null;
$envelopes = [];
$startTime = time();
// wait for next message, but only for max 3 seconds
while (null === $envelope && $startTime + 3 > time()) {
$envelope = $receiver->get();
while (0 === \count($envelopes) && $startTime + 3 > time()) {
$envelopes = iterator_to_array($receiver->get());
}

$this->assertCount(1, $envelopes);
/** @var Envelope $envelope */
$envelope = $envelopes[0];

// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
Expand Down
Expand Up @@ -39,8 +39,9 @@ public function testItReturnsTheDecodedMessageToTheHandler()
$connection->method('get')->willReturn($amqpEnvelope);

$receiver = new AmqpReceiver($connection, $serializer);
$actualEnvelope = $receiver->get();
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
$actualEnvelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}

/**
Expand Down
Expand Up @@ -47,8 +47,8 @@ public function testReceivesMessages()
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($amqpEnvelope);

$envelope = $transport->get();
$this->assertSame($decodedMessage, $envelope->getMessage());
$envelopes = iterator_to_array($transport->get());
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}

private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
Expand Down
Expand Up @@ -32,7 +32,7 @@
$receiver = new AmqpReceiver($connection, $serializer);
$retryStrategy = new MultiplierRetryStrategy(3, 0);

$worker = new Worker($receiver, new class() implements MessageBusInterface {
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
public function dispatch($envelope): Envelope
{
echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n";
Expand All @@ -43,7 +43,7 @@ public function dispatch($envelope): Envelope

return $envelope;
}
}, 'the_receiver', $retryStrategy);
});

echo "Receiving messages...\n";
$worker->run();
Expand Up @@ -37,7 +37,7 @@ public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $me
};

$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver);
$memoryLimitWorker->run($handledCallback);
$memoryLimitWorker->run([], $handledCallback);

// handler should be called exactly 1 time
$this->assertSame($handlerCalledTimes, 1);
Expand Down
Expand Up @@ -39,7 +39,7 @@ public function testWorkerStopsWhenMaximumCountExceeded($max, $shouldStop)
]);

$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max);
$maximumCountWorker->run($handledCallback);
$maximumCountWorker->run([], $handledCallback);

$this->assertSame($shouldStop, $decoratedWorker->isStopped());
}
Expand Down
Expand Up @@ -34,7 +34,7 @@ public function testWorkerStopsWhenTimeLimitIsReached()
->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);

$timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger);
$timeoutWorker->run(function () {
$timeoutWorker->run([], function () {
sleep(2);
});

Expand Down

0 comments on commit 9c9c4e0

Please sign in to comment.