Skip to content

Commit

Permalink
Add transaction manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Miertsch committed Aug 30, 2015
1 parent 855eedb commit af056bb
Show file tree
Hide file tree
Showing 7 changed files with 549 additions and 2 deletions.
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
},
"autoload": {
"psr-4": {
"Prooph\\Transaction\\": "src/"
"Prooph\\EventStoreBusBridge\\": "src/"
}
},
"autoload-dev": {
"psr-4": {
"ProophTest\\Transaction\\": "tests/"
"ProophTest\\EventStoreBusBridge\\": "tests/"
}
}
}
30 changes: 30 additions & 0 deletions docs/transaction_manager.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Transaction Handling

## Set Up
To enable transaction handling based on command dispatch you need to set up the [TransactionManager](src/TransactionManager.php).
The only dependency of the transaction manager is an instance of `Prooph\EventStore\EventStore`.

Then simply add the transaction manger as a plugin to the `command bus`:
```php
/** @var $commandBus Prooph\ServiceBus\CommandBus */
$commandBus->utilize($transactionManager);
```

That's it!

### Container-Driven Set Up
If you are using the `container-aware factories` shipped with prooph/service-bus you may also
want to auto register the `TransactionManager`. As long as the event store is available as service `prooph.event_store` in the container you can use
the [TransactionManagerFactory](src/Container/TransactionManagerFactory.php) for that. Just map the factory to a service name like `prooph.transaction_manager` and
add the service name to the plugin list of the command bus configuration. Please refer to [prooph/service-bus docs](https://github.com/prooph/service-bus/blob/master/docs/factories.md)
for more details.

## Features

1. The transaction manager starts a new event store transaction on every command dispatch.
*Note: Nested transactions are deprecated and will be removed with prooph/event-store v6.0. So it is recommended to dispatch follow up commands only after event store commit!*
2. If the `dispatched command` is an instance of `Prooph\Common\Messaging\Message` the transaction manager will also add `causation metadata` to each recorded event during the transaction.
Two entries are added to the metadata:
- `causation_id` = `$command->uuid()->toString()`
- `causation_name` = `$command->messageName()`.
*Note: Depending on the event store adapter used you may need to alter your event stream schema*
23 changes: 23 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>

<phpunit backupGlobals="false"
backupStaticAttributes="false"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
syntaxCheck="false"
bootstrap="vendor/autoload.php"
>
<testsuite name="Prooph EventStore ServiceBus Bridge Test Suite">
<directory>./tests</directory>
</testsuite>

<filter>
<whitelist>
<directory>./src/</directory>
</whitelist>
</filter>
</phpunit>
28 changes: 28 additions & 0 deletions src/Container/TransactionManagerFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php
/*
* This file is part of the prooph/event-store-bus-bridge.
* (c) 2014-2015 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Date: 8/30/15 - 2:14 PM
*/
namespace Prooph\EventStoreBusBridge\Container;

use Interop\Container\ContainerInterface;
use Prooph\EventStoreBusBridge\TransactionManager;

/**
* Class TransactionManagerFactory
*
* @package Prooph\EventStoreBusBridge\Container
*/
final class TransactionManagerFactory
{
public function __invoke(ContainerInterface $container)
{
$eventStore = $container->get('prooph.event_store');
return new TransactionManager($eventStore);
}
}
161 changes: 161 additions & 0 deletions src/TransactionManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
<?php
/*
* This file is part of prooph/event-store-bus-bridge.
* (c) 2014-2015 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Date: 4/4/15 - 12:25 AM
*/
namespace Prooph\EventStoreBusBridge;

use Prooph\Common\Event\ActionEvent;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Event\ActionEventListenerAggregate;
use Prooph\Common\Event\DetachAggregateHandlers;
use Prooph\Common\Messaging\Command;
use Prooph\Common\Messaging\Message;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Stream\Stream;
use Prooph\ServiceBus\CommandBus;

/**
* TransactionManager
*
* The transaction manager starts a new transaction when a command is dispatched on the command bus.
* If the command dispatch finishes without an error the transaction manager commits the transaction otherwise it does a rollback.
* Furthermore it attaches a listener to the event store create.pre and appendTo.pre action events with a low priority to
* set causation_id as metadata for all domain events which are going to be persisted.
*
* @package Prooph\EventStoreBusBridge
*/
final class TransactionManager implements ActionEventListenerAggregate
{
use DetachAggregateHandlers;

/**
* @var EventStore
*/
private $eventStore;

/**
* @var Command
*/
private $currentCommand;

/**
* @param EventStore $eventStore
*/
public function __construct(EventStore $eventStore)
{
$this->eventStore = $eventStore;
$this->eventStore->getActionEventEmitter()->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000);
$this->eventStore->getActionEventEmitter()->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000);
}

/**
* Attaches itself to the command dispatch of the application command bus
*
* @param ActionEventEmitter $emitter
*
* @return void
*/
public function attach(ActionEventEmitter $emitter)
{
//Attach with a low priority, so that a potential message translator has done its job already
$this->trackHandler($emitter->attachListener(CommandBus::EVENT_INITIALIZE, [$this, 'onInitialize'], -1000));
//Attach with a high priority to rollback transaction early in case of an error
$this->trackHandler($emitter->attachListener(CommandBus::EVENT_FINALIZE, [$this, 'onFinalize'], 1000));
}

/**
* This method takes domain events as argument which are going to be added to the event stream and
* adds the causation_id (command UUID) and causation_name (name of the command which has caused the events)
* as metadata to each event.
*
* @param Message[] $recordedEvents
* @return Message[]
*/
private function handleRecordedEvents(array $recordedEvents)
{
if (is_null($this->currentCommand) || ! $this->currentCommand instanceof Message) {
return $recordedEvents;
}

$causationId = $this->currentCommand->uuid()->toString();
$causationName = $this->currentCommand->messageName();

$enrichedRecordedEvents = [];

foreach ($recordedEvents as $recordedEvent) {
$recordedEvent = $recordedEvent->withAddedMetadata('causation_id', $causationId);
$recordedEvent = $recordedEvent->withAddedMetadata('causation_name', $causationName);

$enrichedRecordedEvents[] = $recordedEvent;
}

return $enrichedRecordedEvents;
}

/**
* Begin event store transaction on command dispatch initialize
*
* @param ActionEvent $actionEvent
*/
public function onInitialize(ActionEvent $actionEvent)
{
$this->currentCommand = $actionEvent->getParam(CommandBus::EVENT_PARAM_MESSAGE);

$this->eventStore->beginTransaction();
}

/**
* Check if exception an exception was thrown. If so rollback event store transaction
* otherwise commit it.
*
* @param ActionEvent $actionEvent
*/
public function onFinalize(ActionEvent $actionEvent)
{
if ($actionEvent->getParam(CommandBus::EVENT_PARAM_EXCEPTION)) {
$this->eventStore->rollback();
} else {
$this->eventStore->commit();
}

$this->currentCommand = null;
}

/**
* Add event metadata on event store createStream
*
* @param ActionEvent $createEvent
*/
public function onEventStoreCreateStream(ActionEvent $createEvent)
{
$stream = $createEvent->getParam('stream');

if (! $stream instanceof Stream) {
return;
}

$streamEvents = $stream->streamEvents();
$streamEvents = $this->handleRecordedEvents($streamEvents);

$createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents));
}

/**
* Add event metadata on event store appendToStream
*
* @param ActionEvent $appendToStreamEvent
*/
public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent)
{
$streamEvents = $appendToStreamEvent->getParam('streamEvents');
$streamEvents = $this->handleRecordedEvents($streamEvents);

$appendToStreamEvent->setParam('streamEvents', $streamEvents);
}
}
46 changes: 46 additions & 0 deletions tests/Container/TransactionManagerFactoryTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php
/*
* This file is part of the prooph/event-store-bus-bridge.
* (c) 2014-2015 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Date: 8/30/15 - 2:16 PM
*/
namespace ProophTest\EventStoreBusBridge\Container;

use Interop\Container\ContainerInterface;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\EventStore\EventStore;
use Prooph\EventStoreBusBridge\Container\TransactionManagerFactory;
use Prooph\EventStoreBusBridge\TransactionManager;

/**
* Class TransactionManagerFactoryTest
*
* @package ProophTest\EventStoreBusBridge\Container
*/
final class TransactionManagerFactoryTest extends \PHPUnit_Framework_TestCase
{
/**
* @test
*/
public function it_creates_a_transaction_manager()
{
$actionEventEmitter = $this->prophesize(ActionEventEmitter::class);
$eventStore = $this->prophesize(EventStore::class);

$eventStore->getActionEventEmitter()->willReturn($actionEventEmitter->reveal());

$container = $this->prophesize(ContainerInterface::class);

$container->get('prooph.event_store')->willReturn($eventStore->reveal());

$factory = new TransactionManagerFactory();

$transactionManager = $factory($container->reveal());

$this->assertInstanceOf(TransactionManager::class, $transactionManager);
}
}

0 comments on commit af056bb

Please sign in to comment.