-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Alexander Miertsch
committed
Aug 30, 2015
1 parent
8e3c07f
commit 4660b6b
Showing
6 changed files
with
526 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# Transaction Handling | ||
|
||
## Set Up | ||
To enable transaction handling based on command dispatch you need to set up the [TransactionManager](src/TransactionManager.php). | ||
The only dependency of the transaction manager is an instance of `Prooph\EventStore\EventStore`. | ||
|
||
Then simply add the transaction manger as a plugin to the `command bus`: | ||
```php | ||
/** @var $commandBus Prooph\ServiceBus\CommandBus */ | ||
$commandBus->utilize($transactionManager); | ||
``` | ||
|
||
That's it! | ||
|
||
### Container-Driven Set Up | ||
If you are using the `container-aware factories` shipped with prooph/service-bus you may also | ||
want to auto register the `TransactionManager`. As long as the event store is available as service `prooph.event_store` in the container you can use | ||
the [TransactionManagerFactory](src/Container/TransactionManagerFactory.php) for that. Just map the factory to a service name like `prooph.transaction_manager` and | ||
add the service name to the plugin list of the command bus configuration. Please refer to [prooph/service-bus docs](https://github.com/prooph/service-bus/blob/master/docs/factories.md) | ||
for more details. | ||
|
||
## Features | ||
|
||
1. The transaction manager starts a new event store transaction on every command dispatch. | ||
*Note: Nested transactions are deprecated and will be removed with prooph/event-store v6.0. So it is recommended to dispatch follow up commands only after event store commit!* | ||
2. If the `dispatched command` is an instance of `Prooph\Common\Messaging\Message` the transaction manager will also add `causation metadata` to each recorded event during the transaction. | ||
Two entries are added to the metadata: | ||
- `causation_id` = `$command->uuid()->toString()` | ||
- `causation_name` = `$command->messageName()`. | ||
*Note: Depending on the event store adapter used you may need to alter your event stream schema* |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
<?php | ||
/* | ||
* This file is part of the prooph/event-store-bus-bridge. | ||
* (c) 2014-2015 prooph software GmbH <contact@prooph.de> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
* | ||
* Date: 8/30/15 - 2:14 PM | ||
*/ | ||
namespace Prooph\EventStoreBusBridge\Container; | ||
|
||
use Interop\Container\ContainerInterface; | ||
use Prooph\EventStoreBusBridge\TransactionManager; | ||
|
||
/** | ||
* Class TransactionManagerFactory | ||
* | ||
* @package Prooph\EventStoreBusBridge\Container | ||
*/ | ||
final class TransactionManagerFactory | ||
{ | ||
public function __invoke(ContainerInterface $container) | ||
{ | ||
$eventStore = $container->get('prooph.event_store'); | ||
return new TransactionManager($eventStore); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
<?php | ||
/* | ||
* This file is part of prooph/event-store-bus-bridge. | ||
* (c) 2014-2015 prooph software GmbH <contact@prooph.de> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
* | ||
* Date: 4/4/15 - 12:25 AM | ||
*/ | ||
namespace Prooph\EventStoreBusBridge; | ||
|
||
use Prooph\Common\Event\ActionEvent; | ||
use Prooph\Common\Event\ActionEventEmitter; | ||
use Prooph\Common\Event\ActionEventListenerAggregate; | ||
use Prooph\Common\Event\DetachAggregateHandlers; | ||
use Prooph\Common\Messaging\Command; | ||
use Prooph\Common\Messaging\Message; | ||
use Prooph\EventStore\EventStore; | ||
use Prooph\EventStore\Stream\Stream; | ||
use Prooph\ServiceBus\CommandBus; | ||
|
||
/** | ||
* TransactionManager | ||
* | ||
* The transaction manager starts a new transaction when a command is dispatched on the command bus. | ||
* If the command dispatch finishes without an error the transaction manager commits the transaction otherwise it does a rollback. | ||
* Furthermore it attaches a listener to the event store create.pre and appendTo.pre action events with a low priority to | ||
* set causation_id as metadata for all domain events which are going to be persisted. | ||
* | ||
* @package Prooph\EventStoreBusBridge | ||
*/ | ||
final class TransactionManager implements ActionEventListenerAggregate | ||
{ | ||
use DetachAggregateHandlers; | ||
|
||
/** | ||
* @var EventStore | ||
*/ | ||
private $eventStore; | ||
|
||
/** | ||
* @var Command | ||
*/ | ||
private $currentCommand; | ||
|
||
/** | ||
* @param EventStore $eventStore | ||
*/ | ||
public function __construct(EventStore $eventStore) | ||
{ | ||
$this->eventStore = $eventStore; | ||
$this->eventStore->getActionEventEmitter()->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000); | ||
$this->eventStore->getActionEventEmitter()->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000); | ||
} | ||
|
||
/** | ||
* Attaches itself to the command dispatch of the application command bus | ||
* | ||
* @param ActionEventEmitter $emitter | ||
* | ||
* @return void | ||
*/ | ||
public function attach(ActionEventEmitter $emitter) | ||
{ | ||
//Attach with a low priority, so that a potential message translator has done its job already | ||
$this->trackHandler($emitter->attachListener(CommandBus::EVENT_INITIALIZE, [$this, 'onInitialize'], -1000)); | ||
//Attach with a high priority to rollback transaction early in case of an error | ||
$this->trackHandler($emitter->attachListener(CommandBus::EVENT_FINALIZE, [$this, 'onFinalize'], 1000)); | ||
} | ||
|
||
/** | ||
* This method takes domain events as argument which are going to be added to the event stream and | ||
* adds the causation_id (command UUID) and causation_name (name of the command which has caused the events) | ||
* as metadata to each event. | ||
* | ||
* @param Message[] $recordedEvents | ||
* @return Message[] | ||
*/ | ||
private function handleRecordedEvents(array $recordedEvents) | ||
{ | ||
if (is_null($this->currentCommand) || ! $this->currentCommand instanceof Message) { | ||
return $recordedEvents; | ||
} | ||
|
||
$causationId = $this->currentCommand->uuid()->toString(); | ||
$causationName = $this->currentCommand->messageName(); | ||
|
||
$enrichedRecordedEvents = []; | ||
|
||
foreach ($recordedEvents as $recordedEvent) { | ||
$recordedEvent = $recordedEvent->withAddedMetadata('causation_id', $causationId); | ||
$recordedEvent = $recordedEvent->withAddedMetadata('causation_name', $causationName); | ||
|
||
$enrichedRecordedEvents[] = $recordedEvent; | ||
} | ||
|
||
return $enrichedRecordedEvents; | ||
} | ||
|
||
/** | ||
* Begin event store transaction on command dispatch initialize | ||
* | ||
* @param ActionEvent $actionEvent | ||
*/ | ||
public function onInitialize(ActionEvent $actionEvent) | ||
{ | ||
$this->currentCommand = $actionEvent->getParam(CommandBus::EVENT_PARAM_MESSAGE); | ||
|
||
$this->eventStore->beginTransaction(); | ||
} | ||
|
||
/** | ||
* Check if exception an exception was thrown. If so rollback event store transaction | ||
* otherwise commit it. | ||
* | ||
* @param ActionEvent $actionEvent | ||
*/ | ||
public function onFinalize(ActionEvent $actionEvent) | ||
{ | ||
if ($actionEvent->getParam(CommandBus::EVENT_PARAM_EXCEPTION)) { | ||
$this->eventStore->rollback(); | ||
} else { | ||
$this->eventStore->commit(); | ||
} | ||
|
||
$this->currentCommand = null; | ||
} | ||
|
||
/** | ||
* Add event metadata on event store createStream | ||
* | ||
* @param ActionEvent $createEvent | ||
*/ | ||
public function onEventStoreCreateStream(ActionEvent $createEvent) | ||
{ | ||
$stream = $createEvent->getParam('stream'); | ||
|
||
if (! $stream instanceof Stream) { | ||
return; | ||
} | ||
|
||
$streamEvents = $stream->streamEvents(); | ||
$streamEvents = $this->handleRecordedEvents($streamEvents); | ||
|
||
$createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents)); | ||
} | ||
|
||
/** | ||
* Add event metadata on event store appendToStream | ||
* | ||
* @param ActionEvent $appendToStreamEvent | ||
*/ | ||
public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent) | ||
{ | ||
$streamEvents = $appendToStreamEvent->getParam('streamEvents'); | ||
$streamEvents = $this->handleRecordedEvents($streamEvents); | ||
|
||
$appendToStreamEvent->setParam('streamEvents', $streamEvents); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
<?php | ||
/* | ||
* This file is part of the prooph/event-store-bus-bridge. | ||
* (c) 2014-2015 prooph software GmbH <contact@prooph.de> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
* | ||
* Date: 8/30/15 - 2:16 PM | ||
*/ | ||
namespace ProophTest\EventStoreBusBridge\Container; | ||
|
||
use Interop\Container\ContainerInterface; | ||
use Prooph\Common\Event\ActionEventEmitter; | ||
use Prooph\EventStore\EventStore; | ||
use Prooph\EventStoreBusBridge\Container\TransactionManagerFactory; | ||
use Prooph\EventStoreBusBridge\TransactionManager; | ||
|
||
/** | ||
* Class TransactionManagerFactoryTest | ||
* | ||
* @package ProophTest\EventStoreBusBridge\Container | ||
*/ | ||
final class TransactionManagerFactoryTest extends \PHPUnit_Framework_TestCase | ||
{ | ||
/** | ||
* @test | ||
*/ | ||
public function it_creates_a_transaction_manager() | ||
{ | ||
$actionEventEmitter = $this->prophesize(ActionEventEmitter::class); | ||
$eventStore = $this->prophesize(EventStore::class); | ||
|
||
$eventStore->getActionEventEmitter()->willReturn($actionEventEmitter->reveal()); | ||
|
||
$container = $this->prophesize(ContainerInterface::class); | ||
|
||
$container->get('prooph.event_store')->willReturn($eventStore->reveal()); | ||
|
||
$factory = new TransactionManagerFactory(); | ||
|
||
$transactionManager = $factory($container->reveal()); | ||
|
||
$this->assertInstanceOf(TransactionManager::class, $transactionManager); | ||
} | ||
} |
Oops, something went wrong.