diff --git a/config/prooph_service_bus.config.php b/config/prooph_service_bus.config.php index e0c260e..e96e2c4 100644 --- a/config/prooph_service_bus.config.php +++ b/config/prooph_service_bus.config.php @@ -34,6 +34,12 @@ //Router defaults to Prooph\ServiceBus\Plugin\Router\CommandRouter //Comment out the next line to use the RegexRouter instead //'type' => \Prooph\ServiceBus\Plugin\Router\RegexRouter::class, + + //[optional] Enable the AsyncSwitchMessageRouter, see docs/plugins.md AsyncSwitchMessageRouter section for details + //If "async_switch" key is present and references an Async\MessageProducer available in the container + //the factory will pull the producer from the container and set up an AsyncSwitchMessageRouter + //using the producer AND decorating the actual configured router + //'async_switch' => 'container_id_of_async_message_producer', ] ], //This section will be used by Prooph\ServiceBus\Container\EventBusFactory @@ -49,6 +55,12 @@ //Router defaults to Prooph\ServiceBus\Plugin\Router\EventRouter //Comment out the next line to use the RegexRouter instead //'type' => \Prooph\ServiceBus\Plugin\Router\RegexRouter::class, + + //[optional] Enable the AsyncSwitchMessageRouter, see docs/plugins.md AsyncSwitchMessageRouter section for details + //If "async_switch" key is present and references an Async\MessageProducer available in the container + //the factory will pull the producer from the container and set up an AsyncSwitchMessageRouter + //using the producer AND decorating the actual configured router + //'async_switch' => 'container_id_of_async_message_producer', ] ], //This section will be used by Prooph\ServiceBus\Container\QueryBusFactory @@ -64,6 +76,12 @@ //Router defaults to Prooph\ServiceBus\Plugin\Router\QueryRouter //Comment out the next line to use the RegexRouter instead //'type' => \Prooph\ServiceBus\Plugin\Router\RegexRouter::class, + + //[optional] Enable the AsyncSwitchMessageRouter, see docs/plugins.md AsyncSwitchMessageRouter section for details + //If "async_switch" key is present and references an Async\MessageProducer available in the container + //the factory will pull the producer from the container and set up an AsyncSwitchMessageRouter + //using the producer AND decorating the actual configured router + //'async_switch' => 'container_id_of_async_message_producer', ] ], ], //EO service_bus diff --git a/src/Container/AbstractBusFactory.php b/src/Container/AbstractBusFactory.php index 206d54c..392b3ff 100644 --- a/src/Container/AbstractBusFactory.php +++ b/src/Container/AbstractBusFactory.php @@ -21,6 +21,7 @@ use Prooph\ServiceBus\Exception\RuntimeException; use Prooph\ServiceBus\MessageBus; use Prooph\ServiceBus\Plugin\MessageFactoryPlugin; +use Prooph\ServiceBus\Plugin\Router\AsyncSwitchMessageRouter; use Prooph\ServiceBus\Plugin\ServiceLocatorPlugin; /** @@ -134,7 +135,7 @@ public function __invoke(ContainerInterface $container) } if (isset($busConfig['router'])) { - $this->attachRouter($bus, $busConfig['router']); + $this->attachRouter($bus, $busConfig['router'], $container); } if ((bool) $busConfig['enable_handler_location']) { @@ -171,8 +172,9 @@ private function attachPlugins(MessageBus $bus, array $utils, ContainerInterface /** * @param MessageBus $bus * @param array $routerConfig + * @param ContainerInterface $container */ - private function attachRouter(MessageBus $bus, array $routerConfig) + private function attachRouter(MessageBus $bus, array $routerConfig, ContainerInterface $container) { $routerClass = isset($routerConfig['type']) ? (string)$routerConfig['type'] : $this->getDefaultRouterClass(); @@ -180,6 +182,12 @@ private function attachRouter(MessageBus $bus, array $routerConfig) $router = new $routerClass($routes); + if (isset($routerConfig['async_switch'])) { + $asyncMessageProducer = $container->get($routerConfig['async_switch']); + + $router = new AsyncSwitchMessageRouter($router, $asyncMessageProducer); + } + $bus->utilize($router); } } diff --git a/tests/Container/BusFactoriesTest.php b/tests/Container/BusFactoriesTest.php index 4a2b291..7c11766 100644 --- a/tests/Container/BusFactoriesTest.php +++ b/tests/Container/BusFactoriesTest.php @@ -17,6 +17,7 @@ use Prooph\Common\Event\ActionEventListenerAggregate; use Prooph\Common\Messaging\Message; use Prooph\Common\Messaging\MessageFactory; +use Prooph\ServiceBus\Async\AsyncMessage; use Prooph\ServiceBus\CommandBus; use Prooph\ServiceBus\EventBus; use Prooph\ServiceBus\Container\CommandBusFactory; @@ -26,6 +27,7 @@ use Prooph\ServiceBus\MessageBus; use Prooph\ServiceBus\Plugin\Router\RegexRouter; use Prooph\ServiceBus\QueryBus; +use ProophTest\ServiceBus\Mock\NoopMessageProducer; use ProophTest\ServiceBus\TestCase; use Prophecy\Argument; @@ -255,6 +257,55 @@ public function it_creates_a_bus_and_attaches_the_message_factory_defined_via_co $this->assertTrue($handlerWasCalled); } + /** + * @test + * @dataProvider provideBuses + */ + public function it_decorates_router_with_async_switch_and_pulls_async_message_producer_from_container($busClass, $busConfigKey, $busFactory) + { + $container = $this->prophesize(ContainerInterface::class); + $message = $this->prophesize(Message::class); + $message->willImplement(AsyncMessage::class); + $messageFactory = $this->prophesize(MessageFactory::class); + $messageProducer = new NoopMessageProducer(); + $container->get('noop_message_producer')->willReturn($messageProducer); + + $message->messageName()->willReturn('test_message'); + $message->metadata()->willReturn([]); + $message->withAddedMetadata('handled-async', true)->willReturn($message->reveal()); + $handlerWasCalled = false; + + $container->has('config')->willReturn(true); + $container->get('config')->willReturn([ + 'prooph' => [ + 'service_bus' => [ + $busConfigKey => [ + 'router' => [ + 'async_switch' => 'noop_message_producer', + 'type' => RegexRouter::class, + 'routes' => [ + '/^test_./' => function (Message $message) use (&$handlerWasCalled) { + $handlerWasCalled = true; + } + ] + ], + 'message_factory' => 'custom_message_factory' + ] + ] + ] + ]); + + $container->has('custom_message_factory')->willReturn(true); + $container->get('custom_message_factory')->willReturn($messageFactory); + + $bus = $busFactory($container->reveal()); + + $bus->dispatch($message->reveal()); + + $this->assertFalse($handlerWasCalled); + $this->assertTrue($messageProducer->isInvoked()); + } + /** * @test * @dataProvider provideBuses diff --git a/tests/Mock/NoopMessageProducer.php b/tests/Mock/NoopMessageProducer.php new file mode 100644 index 0000000..6ef1c64 --- /dev/null +++ b/tests/Mock/NoopMessageProducer.php @@ -0,0 +1,31 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Date: 08/30/16 - 8:35 PM + */ + +namespace ProophTest\ServiceBus\Mock; + +use Prooph\Common\Messaging\Message; +use Prooph\ServiceBus\Async\MessageProducer; +use React\Promise\Deferred; + +class NoopMessageProducer implements MessageProducer +{ + private $invoked = false; + + public function __invoke(Message $message, Deferred $deferred = null) + { + $this->invoked = true; + } + + public function isInvoked() + { + return $this->invoked; + } +}