Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bus factory async option #135

Merged
merged 3 commits into from Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions config/prooph_service_bus.config.php
Expand Up @@ -34,6 +34,12 @@
//Router defaults to Prooph\ServiceBus\Plugin\Router\CommandRouter
//Comment out the next line to use the RegexRouter instead
//'type' => \Prooph\ServiceBus\Plugin\Router\RegexRouter::class,

//[optional] Enable the AsyncSwitchMessageRouter, see docs/plugins.md AsyncSwitchMessageRouter section for details
//If "async_switch" key is present and references an Async\MessageProducer available in the container
//the factory will pull the producer from the container and set up an AsyncSwitchMessageRouter
//using the producer AND decorating the actual configured router
//'async_switch' => 'container_id_of_async_message_producer',
]
],
//This section will be used by Prooph\ServiceBus\Container\EventBusFactory
Expand All @@ -49,6 +55,12 @@
//Router defaults to Prooph\ServiceBus\Plugin\Router\EventRouter
//Comment out the next line to use the RegexRouter instead
//'type' => \Prooph\ServiceBus\Plugin\Router\RegexRouter::class,

//[optional] Enable the AsyncSwitchMessageRouter, see docs/plugins.md AsyncSwitchMessageRouter section for details
//If "async_switch" key is present and references an Async\MessageProducer available in the container
//the factory will pull the producer from the container and set up an AsyncSwitchMessageRouter
//using the producer AND decorating the actual configured router
//'async_switch' => 'container_id_of_async_message_producer',
]
],
//This section will be used by Prooph\ServiceBus\Container\QueryBusFactory
Expand All @@ -64,6 +76,12 @@
//Router defaults to Prooph\ServiceBus\Plugin\Router\QueryRouter
//Comment out the next line to use the RegexRouter instead
//'type' => \Prooph\ServiceBus\Plugin\Router\RegexRouter::class,

//[optional] Enable the AsyncSwitchMessageRouter, see docs/plugins.md AsyncSwitchMessageRouter section for details
//If "async_switch" key is present and references an Async\MessageProducer available in the container
//the factory will pull the producer from the container and set up an AsyncSwitchMessageRouter
//using the producer AND decorating the actual configured router
//'async_switch' => 'container_id_of_async_message_producer',
]
],
], //EO service_bus
Expand Down
12 changes: 10 additions & 2 deletions src/Container/AbstractBusFactory.php
Expand Up @@ -21,6 +21,7 @@
use Prooph\ServiceBus\Exception\RuntimeException;
use Prooph\ServiceBus\MessageBus;
use Prooph\ServiceBus\Plugin\MessageFactoryPlugin;
use Prooph\ServiceBus\Plugin\Router\AsyncSwitchMessageRouter;
use Prooph\ServiceBus\Plugin\ServiceLocatorPlugin;

/**
Expand Down Expand Up @@ -134,7 +135,7 @@ public function __invoke(ContainerInterface $container)
}

if (isset($busConfig['router'])) {
$this->attachRouter($bus, $busConfig['router']);
$this->attachRouter($bus, $busConfig['router'], $container);
}

if ((bool) $busConfig['enable_handler_location']) {
Expand Down Expand Up @@ -171,15 +172,22 @@ private function attachPlugins(MessageBus $bus, array $utils, ContainerInterface
/**
* @param MessageBus $bus
* @param array $routerConfig
* @param ContainerInterface $container
*/
private function attachRouter(MessageBus $bus, array $routerConfig)
private function attachRouter(MessageBus $bus, array $routerConfig, ContainerInterface $container)
{
$routerClass = isset($routerConfig['type']) ? (string)$routerConfig['type'] : $this->getDefaultRouterClass();

$routes = isset($routerConfig['routes']) ? $routerConfig['routes'] : [];

$router = new $routerClass($routes);

if (isset($routerConfig['async_switch'])) {
$asyncMessageProducer = $container->get($routerConfig['async_switch']);

$router = new AsyncSwitchMessageRouter($router, $asyncMessageProducer);
}

$bus->utilize($router);
}
}
51 changes: 51 additions & 0 deletions tests/Container/BusFactoriesTest.php
Expand Up @@ -17,6 +17,7 @@
use Prooph\Common\Event\ActionEventListenerAggregate;
use Prooph\Common\Messaging\Message;
use Prooph\Common\Messaging\MessageFactory;
use Prooph\ServiceBus\Async\AsyncMessage;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Container\CommandBusFactory;
Expand All @@ -26,6 +27,7 @@
use Prooph\ServiceBus\MessageBus;
use Prooph\ServiceBus\Plugin\Router\RegexRouter;
use Prooph\ServiceBus\QueryBus;
use ProophTest\ServiceBus\Mock\NoopMessageProducer;
use ProophTest\ServiceBus\TestCase;
use Prophecy\Argument;

Expand Down Expand Up @@ -255,6 +257,55 @@ public function it_creates_a_bus_and_attaches_the_message_factory_defined_via_co
$this->assertTrue($handlerWasCalled);
}

/**
* @test
* @dataProvider provideBuses
*/
public function it_decorates_router_with_async_switch_and_pulls_async_message_producer_from_container($busClass, $busConfigKey, $busFactory)
{
$container = $this->prophesize(ContainerInterface::class);
$message = $this->prophesize(Message::class);
$message->willImplement(AsyncMessage::class);
$messageFactory = $this->prophesize(MessageFactory::class);
$messageProducer = new NoopMessageProducer();
$container->get('noop_message_producer')->willReturn($messageProducer);

$message->messageName()->willReturn('test_message');
$message->metadata()->willReturn([]);
$message->withAddedMetadata('handled-async', true)->willReturn($message->reveal());
$handlerWasCalled = false;

$container->has('config')->willReturn(true);
$container->get('config')->willReturn([
'prooph' => [
'service_bus' => [
$busConfigKey => [
'router' => [
'async_switch' => 'noop_message_producer',
'type' => RegexRouter::class,
'routes' => [
'/^test_./' => function (Message $message) use (&$handlerWasCalled) {
$handlerWasCalled = true;
}
]
],
'message_factory' => 'custom_message_factory'
]
]
]
]);

$container->has('custom_message_factory')->willReturn(true);
$container->get('custom_message_factory')->willReturn($messageFactory);

$bus = $busFactory($container->reveal());

$bus->dispatch($message->reveal());

$this->assertFalse($handlerWasCalled);
$this->assertTrue($messageProducer->isInvoked());
}

/**
* @test
* @dataProvider provideBuses
Expand Down
31 changes: 31 additions & 0 deletions tests/Mock/NoopMessageProducer.php
@@ -0,0 +1,31 @@
<?php
/*
* This file is part of the prooph/service-bus.
* (c) 2014-2015 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Date: 08/30/16 - 8:35 PM
*/

namespace ProophTest\ServiceBus\Mock;

use Prooph\Common\Messaging\Message;
use Prooph\ServiceBus\Async\MessageProducer;
use React\Promise\Deferred;

class NoopMessageProducer implements MessageProducer
{
private $invoked = false;

public function __invoke(Message $message, Deferred $deferred = null)
{
$this->invoked = true;
}

public function isInvoked()
{
return $this->invoked;
}
}