Skip to content

Commit

Permalink
Fixing a bug where a transport could receive a message and dispatch i…
Browse files Browse the repository at this point in the history
…t to a different bus
  • Loading branch information
weaverryan committed Mar 22, 2019
1 parent 522594a commit 41069af
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 33 deletions.
Expand Up @@ -1616,8 +1616,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
}

$defaultMiddleware = [
'before' => [['id' => 'dispatch_after_current_bus']],
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'dispatch_after_current_bus'],
],
'after' => [
['id' => 'send_message'],
['id' => 'handle_message'],
],
];
foreach ($config['buses'] as $busId => $bus) {
$middleware = $bus['middleware'];
Expand All @@ -1628,6 +1634,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
} else {
unset($defaultMiddleware['after'][1]['arguments']);
}

// argument to add_bus_name_stamp_middleware
$defaultMiddleware['before'][0]['arguments'] = [$busId];

$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
}

Expand Down
Expand Up @@ -39,6 +39,8 @@
</call>
</service>

<service id="messenger.middleware.add_bus_name_stamp_middleware" class="Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware" abstract="true" />

<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />

<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
Expand Down
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -4,6 +4,11 @@ CHANGELOG
4.3.0
-----

* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
and `BusNameStamp` were added, which allow you to add a bus identifier
to the `Envelope` then find the correct bus when receiving from
the transport. See `ConsumeMessagesCommand`.
* An optional `ConsumeMessagesCommand` constructor argument was removed.
* Added `PhpSerializer` which uses PHP's native `serialize()` and
`unserialize()` to serialize messages to a transport
* [BC BREAK] If no serializer were passed, the default serializer
Expand Down
45 changes: 16 additions & 29 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Expand Up @@ -20,6 +20,7 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
Expand All @@ -38,15 +39,13 @@ class ConsumeMessagesCommand extends Command
private $receiverLocator;
private $logger;
private $receiverNames;
private $busNames;

public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [])
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [])
{
$this->busLocator = $busLocator;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
$this->busNames = $busNames;

parent::__construct();
}
Expand All @@ -57,15 +56,14 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
protected function configure(): void
{
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;

$this
->setDefinition([
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
])
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
Expand All @@ -84,6 +82,12 @@ protected function configure(): void
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
Use the --bus option to specify the message bus to dispatch received messages
to instead of trying to determine it automatically. This is required if the
messages didn't originate from Messenger:
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
EOF
)
;
Expand All @@ -107,24 +111,6 @@ protected function interact(InputInterface $input, OutputInterface $output)
}
}
}

$busName = $input->getOption('bus');
if ($this->busNames && !$this->busLocator->has($busName)) {
if (null === $busName) {
$io->block('Missing bus argument.', null, 'error', ' ', true);
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);

if (1 === \count($alternatives)) {
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
$input->setOption('bus', $alternatives[0]);
}
} else {
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
}
}
}
}

/**
Expand All @@ -136,12 +122,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}

if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
}

$receiver = $this->receiverLocator->get($receiverName);
$bus = $this->busLocator->get($busName);

if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
} else {
$bus = new RoutableMessageBus($this->busLocator);
}

$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
Expand All @@ -160,7 +147,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
}

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));

if ($stopsWhen) {
$last = array_pop($stopsWhen);
Expand Down
Expand Up @@ -248,8 +248,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)

$container->getDefinition('console.command.messenger_consume_messages')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
->replaceArgument(3, array_values($receiverNames))
->replaceArgument(4, $busIds);
->replaceArgument(3, array_values($receiverNames));
}

$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
Expand Down
@@ -0,0 +1,39 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\BusNameStamp;

/**
* Adds the BusNameStamp to the bus.
*
* @experimental in Symfony 4.2
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class AddBusNameStampMiddleware implements MiddlewareInterface
{
private $busName;

public function __construct(string $busName)
{
$this->busName = $busName;
}

public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$envelope = $envelope->with(new BusNameStamp($this->busName));

return $stack->next()->handle($envelope, $stack);
}
}
58 changes: 58 additions & 0 deletions src/Symfony/Component/Messenger/RoutableMessageBus.php
@@ -0,0 +1,58 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger;

use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Stamp\BusNameStamp;

/**
* Bus of buses that is routable using a BusNameStamp.
*
* This is useful when passed to Worker: messages received
* from the transport can be sent to the correct bus.
*
* @experimental in Symfony 4.2
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class RoutableMessageBus implements MessageBusInterface
{
private $busLocator;

/**
* @param ContainerInterface $busLocator A locator full of MessageBusInterface objects
*/
public function __construct(ContainerInterface $busLocator)
{
$this->busLocator = $busLocator;
}

public function dispatch($envelope): Envelope
{
if (!$envelope instanceof Envelope) {
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
}

/** @var BusNameStamp $busNameStamp */
$busNameStamp = $envelope->last(BusNameStamp::class);
if (null === $busNameStamp) {
throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.');
}

if (!$this->busLocator->has($busNameStamp->getBusName())) {
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
}

return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
}
}
34 changes: 34 additions & 0 deletions src/Symfony/Component/Messenger/Stamp/BusNameStamp.php
@@ -0,0 +1,34 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Stamp;

/**
* Stamp used to identify which bus it was passed to.
*
* @experimental in Symfony 4.2
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class BusNameStamp implements StampInterface
{
private $busName;

public function __construct(string $busName)
{
$this->busName = $busName;
}

public function getBusName(): string
{
return $this->busName;
}
}
@@ -0,0 +1,33 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;

class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
{
public function testItSendsTheMessageToAssignedSender()
{
$middleware = new AddBusNameStampMiddleware('the_bus_name');
$envelope = new Envelope(new DummyMessage('the message'));

$finalEnvelope = $middleware->handle($envelope, $this->getStackMock());
/** @var BusNameStamp $busNameStamp */
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
$this->assertNotNull($busNameStamp);
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
}
}

0 comments on commit 41069af

Please sign in to comment.