diff --git a/composer.json b/composer.json index 8129f40..7e35986 100644 --- a/composer.json +++ b/composer.json @@ -27,8 +27,8 @@ "prooph/common": "~3.5" }, "require-dev": { - "phpunit/phpunit": "~4.7", - "prooph/event-store" : "dev-master", + "phpunit/phpunit": "~4.8", + "prooph/event-store" : "dev-develop", "fabpot/php-cs-fixer": "1.7.*", "satooshi/php-coveralls": "dev-master" }, @@ -38,6 +38,9 @@ } }, + "suggest" : { + "prooph/event-store" : "^6.0 - Use prooph/event-store to persist recorded events in event streams" + }, "autoload-dev": { "psr-4": { "Prooph\\EventSourcingTest\\": "tests/" diff --git a/src/AggregateRoot.php b/src/AggregateRoot.php index 9850721..22bd210 100644 --- a/src/AggregateRoot.php +++ b/src/AggregateRoot.php @@ -35,10 +35,10 @@ abstract class AggregateRoot protected $recordedEvents = []; /** - * @param AggregateChanged[] $historyEvents + * @param \Iterator $historyEvents * @return static */ - protected static function reconstituteFromHistory(array $historyEvents) + protected static function reconstituteFromHistory(\Iterator $historyEvents) { $instance = new static(); $instance->replay($historyEvents); @@ -83,21 +83,20 @@ protected function recordThat(AggregateChanged $event) $this->version += 1; $this->recordedEvents[] = $event->withVersion($this->version); - - $this->apply($event); } /** * Replay past events * - * @param AggregateChanged[] $historyEvents + * @param \Iterator $historyEvents * * @throws \RuntimeException * @return void */ - protected function replay(array $historyEvents) + protected function replay(\Iterator $historyEvents) { foreach ($historyEvents as $pastEvent) { + /** @var AggregateChanged $pastEvent */ $this->version = $pastEvent->version(); $this->apply($pastEvent); diff --git a/src/EventStoreIntegration/AggregateRootDecorator.php b/src/EventStoreIntegration/AggregateRootDecorator.php index f729c46..674c1e7 100644 --- a/src/EventStoreIntegration/AggregateRootDecorator.php +++ b/src/EventStoreIntegration/AggregateRootDecorator.php @@ -11,6 +11,7 @@ namespace Prooph\EventSourcing\EventStoreIntegration; +use Prooph\EventSourcing\AggregateChanged; use Prooph\EventSourcing\AggregateRoot; /** @@ -49,11 +50,11 @@ public function extractAggregateId(AggregateRoot $anAggregateRoot) /** * @param string $arClass - * @param array $aggregateChangedEvents + * @param \Iterator $aggregateChangedEvents * @return AggregateRoot * @throws \RuntimeException */ - public function fromHistory($arClass, array $aggregateChangedEvents) + public function fromHistory($arClass, \Iterator $aggregateChangedEvents) { if (! class_exists($arClass)) { throw new \RuntimeException( @@ -64,6 +65,17 @@ public function fromHistory($arClass, array $aggregateChangedEvents) return $arClass::reconstituteFromHistory($aggregateChangedEvents); } + /** + * @param AggregateRoot $aggregateRoot + * @param AggregateChanged[] $events + */ + public function applyPendingStreamEvents(AggregateRoot $aggregateRoot, array $events) + { + foreach ($events as $event) { + $aggregateRoot->apply($event); + } + } + /** * @throws \BadMethodCallException * @return string representation of the unique identifier of the aggregate root diff --git a/src/EventStoreIntegration/AggregateTranslator.php b/src/EventStoreIntegration/AggregateTranslator.php index ee1cfd1..d4aeb86 100644 --- a/src/EventStoreIntegration/AggregateTranslator.php +++ b/src/EventStoreIntegration/AggregateTranslator.php @@ -39,21 +39,11 @@ public function extractAggregateId($anEventSourcedAggregateRoot) /** * @param AggregateType $aggregateType - * @param Message[] $historyEvents - * @throws \RuntimeException + * @param \Iterator $historyEvents * @return object reconstructed AggregateRoot */ - public function reconstituteAggregateFromHistory(AggregateType $aggregateType, $historyEvents) + public function reconstituteAggregateFromHistory(AggregateType $aggregateType, \Iterator $historyEvents) { - if (count($historyEvents) === 0) { - throw new \RuntimeException( - sprintf( - "Can not reconstitute Aggregate %s from history. No stream events given", - $aggregateType->toString() - ) - ); - } - return $this->getAggregateRootDecorator() ->fromHistory($aggregateType->toString(), $historyEvents); } @@ -67,6 +57,15 @@ public function extractPendingStreamEvents($anEventSourcedAggregateRoot) return $this->getAggregateRootDecorator()->extractRecordedEvents($anEventSourcedAggregateRoot); } + /** + * @param object $anEventSourcedAggregateRoot + * @param Message[] $events + */ + public function applyPendingStreamEvents($anEventSourcedAggregateRoot, array $events) + { + $this->getAggregateRootDecorator()->applyPendingStreamEvents($anEventSourcedAggregateRoot, $events); + } + /** * @return AggregateRootDecorator */ diff --git a/tests/AggregateRootTest.php b/tests/AggregateRootTest.php index c52b455..c98ba64 100644 --- a/tests/AggregateRootTest.php +++ b/tests/AggregateRootTest.php @@ -11,6 +11,7 @@ namespace Prooph\EventSourcingTest; +use Prooph\EventSourcing\EventStoreIntegration\AggregateRootDecorator; use Prooph\EventSourcingTest\Mock\BrokenUser; use Prooph\EventSourcingTest\Mock\User; @@ -27,24 +28,38 @@ class AggregateRootTest extends TestCase */ public function it_applies_event_by_calling_appropriate_event_handler() { + $decorator = AggregateRootDecorator::newInstance(); + $user = User::nameNew('John'); + $recordedEvents = $decorator->extractRecordedEvents($user); + + //Recording and applying events are two different steps + //In between would be the process of persisting recorded events to an event stream + //Only if this was successful the events can be applied to the aggregate root + //We skip the persistence process here and apply the events directly + $decorator->applyPendingStreamEvents($user, $recordedEvents); + $this->assertEquals('John', $user->name()); $user->changeName('Max'); + $additionalRecordedEvents = $decorator->extractRecordedEvents($user); + + $decorator->applyPendingStreamEvents($user, $additionalRecordedEvents); + $this->assertEquals('Max', $user->name()); - $pendingEvents = $user->accessRecordedEvents(); + $recordedEvents = array_merge($recordedEvents, $additionalRecordedEvents); - $this->assertEquals(2, count($pendingEvents)); + $this->assertEquals(2, count($recordedEvents)); - $userCreatedEvent = $pendingEvents[0]; + $userCreatedEvent = $recordedEvents[0]; $this->assertEquals('John', $userCreatedEvent->name()); $this->assertEquals(1, $userCreatedEvent->version()); - $userNameChangedEvent = $pendingEvents[1]; + $userNameChangedEvent = $recordedEvents[1]; $this->assertEquals('Max', $userNameChangedEvent->newUsername()); $this->assertEquals(2, $userNameChangedEvent->version()); @@ -57,7 +72,12 @@ public function it_applies_event_by_calling_appropriate_event_handler() */ public function it_throws_exception_when_no_handler_on_aggregate() { - BrokenUser::nameNew('John'); + $brokenUser = BrokenUser::nameNew('John'); + + AggregateRootDecorator::newInstance()->applyPendingStreamEvents( + $brokenUser, + $brokenUser->accessRecordedEvents() + ); } /** @@ -67,11 +87,17 @@ public function it_reconstructs_itself_from_history() { $user = User::nameNew('John'); - $this->assertEquals('John', $user->name()); + $recordedEvents = $user->accessRecordedEvents(); + + AggregateRootDecorator::newInstance()->applyPendingStreamEvents($user, $recordedEvents); $user->changeName('Max'); - $historyEvents = $user->accessRecordedEvents(); + $additionalRecordedEvents = $user->accessRecordedEvents(); + + AggregateRootDecorator::newInstance()->applyPendingStreamEvents($user, $additionalRecordedEvents); + + $historyEvents = new \ArrayIterator(array_merge($recordedEvents, $additionalRecordedEvents)); $sameUser = User::fromHistory($historyEvents); diff --git a/tests/EventStoreIntegration/AggregateRootDecoratorTest.php b/tests/EventStoreIntegration/AggregateRootDecoratorTest.php index 920175f..6b8320d 100644 --- a/tests/EventStoreIntegration/AggregateRootDecoratorTest.php +++ b/tests/EventStoreIntegration/AggregateRootDecoratorTest.php @@ -30,7 +30,7 @@ class AggregateRootDecoratorTest extends TestCase public function it_throws_exception_when_reconstitute_from_history_with_invalid_class() { $decorator = AggregateRootDecorator::newInstance(); - $decorator->fromHistory('UnknownClass', []); + $decorator->fromHistory('UnknownClass', new \ArrayIterator([])); } /** diff --git a/tests/EventStoreIntegration/AggregateTranslatorTest.php b/tests/EventStoreIntegration/AggregateTranslatorTest.php index 1017020..f898406 100644 --- a/tests/EventStoreIntegration/AggregateTranslatorTest.php +++ b/tests/EventStoreIntegration/AggregateTranslatorTest.php @@ -48,7 +48,7 @@ protected function setUp() $this->eventStore->beginTransaction(); - $this->eventStore->create(new Stream(new StreamName('event_stream'), [])); + $this->eventStore->create(new Stream(new StreamName('event_stream'), new \ArrayIterator([]))); $this->eventStore->commit(); @@ -64,12 +64,16 @@ public function it_translates_aggregate_back_and_forth() $user = User::nameNew('John Doe'); - $user->changeName('Max Mustermann'); - $this->repository->addAggregateRoot($user); $this->eventStore->commit(); + $this->eventStore->beginTransaction(); + + $user->changeName('Max Mustermann'); + + $this->eventStore->commit(); + $this->resetRepository(); $loadedUser = $this->repository->getAggregateRoot($user->id()); @@ -90,19 +94,6 @@ public function it_can_use_custom_aggregate_root_decorator() $this->assertSame($mock, $translator->getAggregateRootDecorator()); } - /** - * @test - * @expectedException RuntimeException - * @expectedExceptionMessage Can not reconstitute Aggregate Prooph\EventSourcingTest\Mock\User from history. No stream events given - */ - public function it_cannot_reconstitute_from_history_without_stream_events() - { - $aggregateType = AggregateType::fromAggregateRootClass('Prooph\EventSourcingTest\Mock\User'); - - $translator = new AggregateTranslator(); - $translator->reconstituteAggregateFromHistory($aggregateType, []); - } - protected function resetRepository() { $this->repository = new AggregateRepository( diff --git a/tests/Mock/User.php b/tests/Mock/User.php index f99b284..f5e704a 100644 --- a/tests/Mock/User.php +++ b/tests/Mock/User.php @@ -11,7 +11,6 @@ namespace Prooph\EventSourcingTest\Mock; -use Prooph\EventSourcing\AggregateChanged; use Prooph\EventSourcing\AggregateRoot; use Rhumsaa\Uuid\Uuid; @@ -44,10 +43,10 @@ public static function nameNew($name) } /** - * @param AggregateChanged[] $historyEvents + * @param \Iterator $historyEvents * @return User */ - public static function fromHistory(array $historyEvents) + public static function fromHistory(\Iterator $historyEvents) { return self::reconstituteFromHistory($historyEvents); }