Skip to content

Commit

Permalink
Merge pull request #133 from codeliner/feature/async_switch_event_bus…
Browse files Browse the repository at this point in the history
…_support

Add event bus support to AsyncSwitchMessageRouter
  • Loading branch information
prolic committed Aug 31, 2016
2 parents cbd8c9e + 83183fa commit 4e57e56
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 9 deletions.
14 changes: 13 additions & 1 deletion src/Plugin/Router/AsyncSwitchMessageRouter.php
Expand Up @@ -17,7 +17,10 @@
use Prooph\Common\Event\DetachAggregateHandlers;
use Prooph\ServiceBus\Async\AsyncMessage;
use Prooph\ServiceBus\Async\MessageProducer;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\MessageBus;
use Prooph\ServiceBus\QueryBus;

/**
* Class AsyncSwitchMessageRouter
Expand Down Expand Up @@ -80,7 +83,16 @@ public function onRouteMessage(ActionEvent $actionEvent)

// update ActionEvent
$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE, $message);
$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER, $this->asyncMessageProducer);

if ($actionEvent->getTarget() instanceof CommandBus || $actionEvent->getTarget() instanceof QueryBus) {
$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER, $this->asyncMessageProducer);
} else {
//Target is an event bus so we set message producer as the only listener of the message
$actionEvent->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [$this->asyncMessageProducer]);
}




return;
}
Expand Down
13 changes: 8 additions & 5 deletions tests/Mock/AsyncCommand.php
@@ -1,9 +1,12 @@
<?php
/**
* Created by PhpStorm.
* User: GuyRadford
* Date: 28/08/2016
* Time: 12:06
/*
* This file is part of the prooph/service-bus.
* (c) 2014-2015 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Date: 08/28/16 - 8:35 PM
*/

namespace ProophTest\ServiceBus\Mock;
Expand Down
26 changes: 26 additions & 0 deletions tests/Mock/AsyncEvent.php
@@ -0,0 +1,26 @@
<?php
/*
* This file is part of the prooph/service-bus.
* (c) 2014-2015 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Date: 08/30/16 - 8:35 PM
*/
namespace ProophTest\ServiceBus\Mock;

use Prooph\Common\Messaging\DomainEvent;
use Prooph\Common\Messaging\PayloadConstructable;
use Prooph\Common\Messaging\PayloadTrait;
use Prooph\ServiceBus\Async\AsyncMessage;

final class AsyncEvent extends DomainEvent implements PayloadConstructable, AsyncMessage
{
use PayloadTrait;

public static function createEvent($data)
{
return new self(['data' => $data]);
}
}
36 changes: 33 additions & 3 deletions tests/Plugin/Router/AsyncSwitchMessageRouterTest.php
Expand Up @@ -16,10 +16,13 @@
use Prooph\Common\Event\ListenerHandler;
use Prooph\ServiceBus\Async\MessageProducer;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\MessageBus;
use Prooph\ServiceBus\Plugin\Router\AsyncSwitchMessageRouter;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use Prooph\ServiceBus\Plugin\Router\SingleHandlerRouter;
use ProophTest\ServiceBus\Mock\AsyncCommand;
use ProophTest\ServiceBus\Mock\AsyncEvent;
use ProophTest\ServiceBus\Mock\NonAsyncCommand;
use ProophTest\ServiceBus\TestCase;

Expand Down Expand Up @@ -87,7 +90,7 @@ public function unmarked_message_is_passed_to_decorated_router()
$message = NonAsyncCommand::createCommand('test-data');
$actionEvent = new DefaultActionEvent(
AsyncCommand::class,
null,
new CommandBus(),
[
MessageBus::EVENT_PARAM_MESSAGE_NAME => get_class($message),
MessageBus::EVENT_PARAM_MESSAGE => $message
Expand All @@ -114,7 +117,7 @@ public function marked_message_is_passed_to_async_producer()
$message = AsyncCommand::createCommand('test-data');
$actionEvent = new DefaultActionEvent(
AsyncCommand::class,
null,
new CommandBus(),
[
MessageBus::EVENT_PARAM_MESSAGE_NAME => get_class($message),
MessageBus::EVENT_PARAM_MESSAGE => $message
Expand Down Expand Up @@ -144,7 +147,7 @@ public function marked_message_is_passed_to_decorated_router_as_already_handled_

$actionEvent = new DefaultActionEvent(
AsyncCommand::class,
null,
new CommandBus(),
[
MessageBus::EVENT_PARAM_MESSAGE_NAME => get_class($message),
MessageBus::EVENT_PARAM_MESSAGE => $message
Expand All @@ -161,4 +164,31 @@ public function marked_message_is_passed_to_decorated_router_as_already_handled_
$this->assertArrayHasKey('handled-async', $updatedMessage->metadata());
$this->assertTrue($updatedMessage->metadata()['handled-async']);
}

/**
* @test
*/
public function it_sets_message_producer_as_event_listener_if_target_is_an_event_bus()
{
$messageProducer = $this->prophesize(MessageProducer::class);

$message = AsyncEvent::createEvent('test-data');
$actionEvent = new DefaultActionEvent(
MessageBus::EVENT_ROUTE,
new EventBus(),
[
MessageBus::EVENT_PARAM_MESSAGE_NAME => get_class($message),
MessageBus::EVENT_PARAM_MESSAGE => $message
]
);

$router = new AsyncSwitchMessageRouter(new EventRouter(), $messageProducer->reveal());
$router->onRouteMessage($actionEvent);

$this->assertEquals($messageProducer->reveal(), $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS)[0]);

$updatedMessage = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE);
$this->assertArrayHasKey('handled-async', $updatedMessage->metadata());
$this->assertTrue($updatedMessage->metadata()['handled-async']);
}
}

0 comments on commit 4e57e56

Please sign in to comment.