From 4660b6b59e94a4af8d64bae84462ced4c08d6b87 Mon Sep 17 00:00:00 2001 From: Alexander Miertsch Date: Sun, 30 Aug 2015 19:31:26 +0200 Subject: [PATCH] Add transaction manager --- composer.json | 4 +- docs/transaction_manager.md | 30 ++ src/Container/TransactionManagerFactory.php | 28 ++ src/TransactionManager.php | 161 +++++++++++ .../TransactionManagerFactoryTest.php | 46 ++++ tests/TransactionManagerTest.php | 259 ++++++++++++++++++ 6 files changed, 526 insertions(+), 2 deletions(-) create mode 100644 src/Container/TransactionManagerFactory.php create mode 100644 src/TransactionManager.php create mode 100644 tests/Container/TransactionManagerFactoryTest.php create mode 100644 tests/TransactionManagerTest.php diff --git a/composer.json b/composer.json index 9c46727..daad26d 100644 --- a/composer.json +++ b/composer.json @@ -32,12 +32,12 @@ }, "autoload": { "psr-4": { - "Prooph\\Transaction\\": "src/" + "Prooph\\EventStoreBusBridge\\": "src/" } }, "autoload-dev": { "psr-4": { - "ProophTest\\Transaction\\": "tests/" + "ProophTest\\EventStoreBusBridge\\": "tests/" } } } diff --git a/docs/transaction_manager.md b/docs/transaction_manager.md index e69de29..deab7e0 100644 --- a/docs/transaction_manager.md +++ b/docs/transaction_manager.md @@ -0,0 +1,30 @@ +# Transaction Handling + +## Set Up +To enable transaction handling based on command dispatch you need to set up the [TransactionManager](src/TransactionManager.php). +The only dependency of the transaction manager is an instance of `Prooph\EventStore\EventStore`. + +Then simply add the transaction manger as a plugin to the `command bus`: +```php +/** @var $commandBus Prooph\ServiceBus\CommandBus */ +$commandBus->utilize($transactionManager); +``` + +That's it! + +### Container-Driven Set Up +If you are using the `container-aware factories` shipped with prooph/service-bus you may also +want to auto register the `TransactionManager`. As long as the event store is available as service `prooph.event_store` in the container you can use +the [TransactionManagerFactory](src/Container/TransactionManagerFactory.php) for that. Just map the factory to a service name like `prooph.transaction_manager` and +add the service name to the plugin list of the command bus configuration. Please refer to [prooph/service-bus docs](https://github.com/prooph/service-bus/blob/master/docs/factories.md) +for more details. + +## Features + +1. The transaction manager starts a new event store transaction on every command dispatch. + *Note: Nested transactions are deprecated and will be removed with prooph/event-store v6.0. So it is recommended to dispatch follow up commands only after event store commit!* +2. If the `dispatched command` is an instance of `Prooph\Common\Messaging\Message` the transaction manager will also add `causation metadata` to each recorded event during the transaction. + Two entries are added to the metadata: + - `causation_id` = `$command->uuid()->toString()` + - `causation_name` = `$command->messageName()`. + *Note: Depending on the event store adapter used you may need to alter your event stream schema* \ No newline at end of file diff --git a/src/Container/TransactionManagerFactory.php b/src/Container/TransactionManagerFactory.php new file mode 100644 index 0000000..6932ce6 --- /dev/null +++ b/src/Container/TransactionManagerFactory.php @@ -0,0 +1,28 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Date: 8/30/15 - 2:14 PM + */ +namespace Prooph\EventStoreBusBridge\Container; + +use Interop\Container\ContainerInterface; +use Prooph\EventStoreBusBridge\TransactionManager; + +/** + * Class TransactionManagerFactory + * + * @package Prooph\EventStoreBusBridge\Container + */ +final class TransactionManagerFactory +{ + public function __invoke(ContainerInterface $container) + { + $eventStore = $container->get('prooph.event_store'); + return new TransactionManager($eventStore); + } +} diff --git a/src/TransactionManager.php b/src/TransactionManager.php new file mode 100644 index 0000000..3abe7dd --- /dev/null +++ b/src/TransactionManager.php @@ -0,0 +1,161 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Date: 4/4/15 - 12:25 AM + */ +namespace Prooph\EventStoreBusBridge; + +use Prooph\Common\Event\ActionEvent; +use Prooph\Common\Event\ActionEventEmitter; +use Prooph\Common\Event\ActionEventListenerAggregate; +use Prooph\Common\Event\DetachAggregateHandlers; +use Prooph\Common\Messaging\Command; +use Prooph\Common\Messaging\Message; +use Prooph\EventStore\EventStore; +use Prooph\EventStore\Stream\Stream; +use Prooph\ServiceBus\CommandBus; + +/** + * TransactionManager + * + * The transaction manager starts a new transaction when a command is dispatched on the command bus. + * If the command dispatch finishes without an error the transaction manager commits the transaction otherwise it does a rollback. + * Furthermore it attaches a listener to the event store create.pre and appendTo.pre action events with a low priority to + * set causation_id as metadata for all domain events which are going to be persisted. + * + * @package Prooph\EventStoreBusBridge + */ +final class TransactionManager implements ActionEventListenerAggregate +{ + use DetachAggregateHandlers; + + /** + * @var EventStore + */ + private $eventStore; + + /** + * @var Command + */ + private $currentCommand; + + /** + * @param EventStore $eventStore + */ + public function __construct(EventStore $eventStore) + { + $this->eventStore = $eventStore; + $this->eventStore->getActionEventEmitter()->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000); + $this->eventStore->getActionEventEmitter()->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000); + } + + /** + * Attaches itself to the command dispatch of the application command bus + * + * @param ActionEventEmitter $emitter + * + * @return void + */ + public function attach(ActionEventEmitter $emitter) + { + //Attach with a low priority, so that a potential message translator has done its job already + $this->trackHandler($emitter->attachListener(CommandBus::EVENT_INITIALIZE, [$this, 'onInitialize'], -1000)); + //Attach with a high priority to rollback transaction early in case of an error + $this->trackHandler($emitter->attachListener(CommandBus::EVENT_FINALIZE, [$this, 'onFinalize'], 1000)); + } + + /** + * This method takes domain events as argument which are going to be added to the event stream and + * adds the causation_id (command UUID) and causation_name (name of the command which has caused the events) + * as metadata to each event. + * + * @param Message[] $recordedEvents + * @return Message[] + */ + private function handleRecordedEvents(array $recordedEvents) + { + if (is_null($this->currentCommand) || ! $this->currentCommand instanceof Message) { + return $recordedEvents; + } + + $causationId = $this->currentCommand->uuid()->toString(); + $causationName = $this->currentCommand->messageName(); + + $enrichedRecordedEvents = []; + + foreach ($recordedEvents as $recordedEvent) { + $recordedEvent = $recordedEvent->withAddedMetadata('causation_id', $causationId); + $recordedEvent = $recordedEvent->withAddedMetadata('causation_name', $causationName); + + $enrichedRecordedEvents[] = $recordedEvent; + } + + return $enrichedRecordedEvents; + } + + /** + * Begin event store transaction on command dispatch initialize + * + * @param ActionEvent $actionEvent + */ + public function onInitialize(ActionEvent $actionEvent) + { + $this->currentCommand = $actionEvent->getParam(CommandBus::EVENT_PARAM_MESSAGE); + + $this->eventStore->beginTransaction(); + } + + /** + * Check if exception an exception was thrown. If so rollback event store transaction + * otherwise commit it. + * + * @param ActionEvent $actionEvent + */ + public function onFinalize(ActionEvent $actionEvent) + { + if ($actionEvent->getParam(CommandBus::EVENT_PARAM_EXCEPTION)) { + $this->eventStore->rollback(); + } else { + $this->eventStore->commit(); + } + + $this->currentCommand = null; + } + + /** + * Add event metadata on event store createStream + * + * @param ActionEvent $createEvent + */ + public function onEventStoreCreateStream(ActionEvent $createEvent) + { + $stream = $createEvent->getParam('stream'); + + if (! $stream instanceof Stream) { + return; + } + + $streamEvents = $stream->streamEvents(); + $streamEvents = $this->handleRecordedEvents($streamEvents); + + $createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents)); + } + + /** + * Add event metadata on event store appendToStream + * + * @param ActionEvent $appendToStreamEvent + */ + public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent) + { + $streamEvents = $appendToStreamEvent->getParam('streamEvents'); + $streamEvents = $this->handleRecordedEvents($streamEvents); + + $appendToStreamEvent->setParam('streamEvents', $streamEvents); + } +} diff --git a/tests/Container/TransactionManagerFactoryTest.php b/tests/Container/TransactionManagerFactoryTest.php new file mode 100644 index 0000000..a14ebe1 --- /dev/null +++ b/tests/Container/TransactionManagerFactoryTest.php @@ -0,0 +1,46 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Date: 8/30/15 - 2:16 PM + */ +namespace ProophTest\EventStoreBusBridge\Container; + +use Interop\Container\ContainerInterface; +use Prooph\Common\Event\ActionEventEmitter; +use Prooph\EventStore\EventStore; +use Prooph\EventStoreBusBridge\Container\TransactionManagerFactory; +use Prooph\EventStoreBusBridge\TransactionManager; + +/** + * Class TransactionManagerFactoryTest + * + * @package ProophTest\EventStoreBusBridge\Container + */ +final class TransactionManagerFactoryTest extends \PHPUnit_Framework_TestCase +{ + /** + * @test + */ + public function it_creates_a_transaction_manager() + { + $actionEventEmitter = $this->prophesize(ActionEventEmitter::class); + $eventStore = $this->prophesize(EventStore::class); + + $eventStore->getActionEventEmitter()->willReturn($actionEventEmitter->reveal()); + + $container = $this->prophesize(ContainerInterface::class); + + $container->get('prooph.event_store')->willReturn($eventStore->reveal()); + + $factory = new TransactionManagerFactory(); + + $transactionManager = $factory($container->reveal()); + + $this->assertInstanceOf(TransactionManager::class, $transactionManager); + } +} diff --git a/tests/TransactionManagerTest.php b/tests/TransactionManagerTest.php new file mode 100644 index 0000000..c00843b --- /dev/null +++ b/tests/TransactionManagerTest.php @@ -0,0 +1,259 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Date: 8/30/15 - 12:25 PM + */ +namespace ProophTest\EventStoreBusBridge; + +use Prooph\Common\Event\ActionEvent; +use Prooph\Common\Event\ActionEventEmitter; +use Prooph\Common\Event\ListenerHandler; +use Prooph\Common\Messaging\Message; +use Prooph\EventStore\EventStore; +use Prooph\EventStore\Stream\Stream; +use Prooph\EventStore\Stream\StreamName; +use Prooph\EventStoreBusBridge\TransactionManager; +use Prooph\ServiceBus\CommandBus; +use Prophecy\Argument; +use Rhumsaa\Uuid\Uuid; + +/** + * Class TransactionManagerTest + * + * @package ProophTest\EventStoreBusBridge + */ +final class TransactionManagerTest extends \PHPUnit_Framework_TestCase +{ + /** + * @test + */ + public function it_attaches_itself_to_event_store_events() + { + $eventStoreMock = $this->prophesize(EventStore::class); + + $emitter = $this->prophesize(ActionEventEmitter::class); + + $createStreamListener = null; + $appendToStreamListener = null; + + $emitter->attachListener('create.pre', Argument::any(), -1000)->will( + function ($args) use (&$createStreamListener) { + $createStreamListener = $args[1]; + } + ); + $emitter->attachListener('appendTo.pre', Argument::any(), -1000)->will( + function ($args) use (&$appendToStreamListener) { + $appendToStreamListener = $args[1]; + } + ); + + $eventStoreMock->getActionEventEmitter()->willReturn($emitter->reveal()); + + $transactionManager = new TransactionManager($eventStoreMock->reveal()); + + $this->assertEquals([$transactionManager, 'onEventStoreCreateStream'], $createStreamListener); + $this->assertEquals([$transactionManager, 'onEventStoreAppendToStream'], $appendToStreamListener); + } + + /** + * @test + */ + public function it_attaches_itself_to_command_bus_initialize_and_finalize_events() + { + $transactionManager = new TransactionManager($this->getEventStoreObjectProphecy()->reveal()); + + $commandBusEmitter = $this->prophesize(ActionEventEmitter::class); + + $commandBusEmitter->attachListener(CommandBus::EVENT_INITIALIZE, [$transactionManager, 'onInitialize'], -1000) + ->willReturn($this->prophesize(ListenerHandler::class)->reveal()); + $commandBusEmitter->attachListener(CommandBus::EVENT_FINALIZE, [$transactionManager, 'onFinalize'], 1000) + ->willReturn($this->prophesize(ListenerHandler::class)->reveal()); + + $transactionManager->attach($commandBusEmitter->reveal()); + } + + /** + * @test + */ + public function it_begins_a_transaction_on_command_dispatch_initialize() + { + $eventStoreMock = $this->getEventStoreObjectProphecy(); + + $eventStoreMock->beginTransaction()->shouldBeCalled(); + + $transactionManager = new TransactionManager($eventStoreMock->reveal()); + + $actionEvent = $this->prophesize(ActionEvent::class); + + $actionEvent->getParam(CommandBus::EVENT_PARAM_MESSAGE)->willReturn("a message"); + + $transactionManager->onInitialize($actionEvent->reveal()); + } + + /** + * @test + */ + public function it_commits_a_transaction_on_command_dispatch_finalize_if_no_exception_was_thrown() + { + $eventStoreMock = $this->getEventStoreObjectProphecy(); + + $eventStoreMock->commit()->shouldBeCalled(); + + $transactionManager = new TransactionManager($eventStoreMock->reveal()); + + $actionEvent = $this->prophesize(ActionEvent::class); + + $actionEvent->getParam(CommandBus::EVENT_PARAM_EXCEPTION)->willReturn(null); + + $transactionManager->onFinalize($actionEvent->reveal()); + } + + /** + * @test + */ + public function it_rollback_a_transaction_on_command_dispatch_finalize_if_exception_was_thrown() + { + $eventStoreMock = $this->getEventStoreObjectProphecy(); + + $eventStoreMock->rollback()->shouldBeCalled(); + + $transactionManager = new TransactionManager($eventStoreMock->reveal()); + + $actionEvent = $this->prophesize(ActionEvent::class); + + $exception = $this->prophesize(\Exception::class); + + $actionEvent->getParam(CommandBus::EVENT_PARAM_EXCEPTION)->willReturn($exception->reveal()); + + $transactionManager->onFinalize($actionEvent->reveal()); + } + + /** + * @test + */ + public function it_adds_causation_id_and_causation_name_on_event_store_create_stream() + { + //Step 1: Track the command which will cause events + $command = $this->prophesize(Message::class); + + $causationId = Uuid::uuid4(); + + $command->uuid()->willReturn($causationId); + $command->messageName()->willReturn('causation-message-name'); + + $initializeActionEvent = $this->prophesize(ActionEvent::class); + + $initializeActionEvent->getParam(CommandBus::EVENT_PARAM_MESSAGE)->willReturn($command->reveal()); + + $eventStoreMock = $this->getEventStoreObjectProphecy(); + + $eventStoreMock->beginTransaction()->shouldBeCalled(); + + $transactionManager = new TransactionManager($eventStoreMock->reveal()); + + //Now the command is set as currentCommand internally and later used when new stream is going to be created + $transactionManager->onInitialize($initializeActionEvent->reveal()); + + //Step 2: Prepare a new stream which is going to be created. + // The TransactionManager should respect immutability + // so we test that too. + $recordedEvent = $this->prophesize(Message::class); + $recordedEventCopy1 = $this->prophesize(Message::class); + $recordedEventCopy2 = $this->prophesize(Message::class); + + $recordedEventCopy1->withAddedMetadata('causation_name', 'causation-message-name')->willReturn($recordedEventCopy2->reveal()); + $recordedEvent->withAddedMetadata('causation_id', $causationId->toString())->willReturn($recordedEventCopy1->reveal()); + + $stream = new Stream(new StreamName('event_stream'), [$recordedEvent->reveal()]); + + $createStreamActionEvent = $this->prophesize(ActionEvent::class); + + $createStreamActionEvent->getParam('stream')->willReturn($stream); + + $enrichedStream = null; + $createStreamActionEvent->setParam('stream', Argument::type(Stream::class)) + ->will(function ($args) use (&$enrichedStream) { + $enrichedStream = $args[1]; + }); + + $transactionManager->onEventStoreCreateStream($createStreamActionEvent->reveal()); + + $this->assertNotNull($enrichedStream); + $this->assertEquals('event_stream', $enrichedStream->streamName()->toString()); + $this->assertEquals(1, count($enrichedStream->streamEvents())); + $this->assertSame($recordedEventCopy2->reveal(), $enrichedStream->streamEvents()[0]); + } + + /** + * @test + */ + public function it_adds_causation_id_and_causation_name_on_event_store_append_to_stream() + { + //Step 1: Track the command which will cause events + $command = $this->prophesize(Message::class); + + $causationId = Uuid::uuid4(); + + $command->uuid()->willReturn($causationId); + $command->messageName()->willReturn('causation-message-name'); + + $initializeActionEvent = $this->prophesize(ActionEvent::class); + + $initializeActionEvent->getParam(CommandBus::EVENT_PARAM_MESSAGE)->willReturn($command->reveal()); + + $eventStoreMock = $this->getEventStoreObjectProphecy(); + + $eventStoreMock->beginTransaction()->shouldBeCalled(); + + $transactionManager = new TransactionManager($eventStoreMock->reveal()); + + //Now the command is set as currentCommand internally and later used when new stream is going to be created + $transactionManager->onInitialize($initializeActionEvent->reveal()); + + //Step 2: Prepare a new stream which is going to be created. + // The TransactionManager should respect immutability + // so we test that too. + $recordedEvent = $this->prophesize(Message::class); + $recordedEventCopy1 = $this->prophesize(Message::class); + $recordedEventCopy2 = $this->prophesize(Message::class); + + $recordedEventCopy1->withAddedMetadata('causation_name', 'causation-message-name')->willReturn($recordedEventCopy2->reveal()); + $recordedEvent->withAddedMetadata('causation_id', $causationId->toString())->willReturn($recordedEventCopy1->reveal()); + + $appendToStreamActionEvent = $this->prophesize(ActionEvent::class); + + $appendToStreamActionEvent->getParam('streamEvents')->willReturn([$recordedEvent->reveal()]); + + $enrichedEvents = null; + $appendToStreamActionEvent->setParam('streamEvents', Argument::any()) + ->will(function ($args) use (&$enrichedEvents) { + $enrichedEvents = $args[1]; + }); + + $transactionManager->onEventStoreAppendToStream($appendToStreamActionEvent->reveal()); + + $this->assertNotNull($enrichedEvents); + $this->assertTrue(is_array($enrichedEvents)); + $this->assertEquals(1, count($enrichedEvents)); + $this->assertSame($recordedEventCopy2->reveal(), $enrichedEvents[0]); + } + + /** + * @return \Prophecy\Prophecy\ObjectProphecy + */ + private function getEventStoreObjectProphecy() + { + $actionEventEmitter = $this->prophesize(ActionEventEmitter::class); + + $eventStoreMock = $this->prophesize(EventStore::class); + + $eventStoreMock->getActionEventEmitter()->willReturn($actionEventEmitter->reveal()); + + return $eventStoreMock; + } +}