Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjustments for prooph/event-store v6 #10

Merged
merged 2 commits into from
Sep 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
"prooph/common": "~3.5"
},
"require-dev": {
"phpunit/phpunit": "~4.7",
"prooph/event-store" : "dev-master",
"phpunit/phpunit": "~4.8",
"prooph/event-store" : "dev-develop",
"fabpot/php-cs-fixer": "1.7.*",
"satooshi/php-coveralls": "dev-master"
},
Expand All @@ -38,6 +38,9 @@

}
},
"suggest" : {
"prooph/event-store" : "^6.0 - Use prooph/event-store to persist recorded events in event streams"
},
"autoload-dev": {
"psr-4": {
"Prooph\\EventSourcingTest\\": "tests/"
Expand Down
11 changes: 5 additions & 6 deletions src/AggregateRoot.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ abstract class AggregateRoot
protected $recordedEvents = [];

/**
* @param AggregateChanged[] $historyEvents
* @param \Iterator $historyEvents
* @return static
*/
protected static function reconstituteFromHistory(array $historyEvents)
protected static function reconstituteFromHistory(\Iterator $historyEvents)
{
$instance = new static();
$instance->replay($historyEvents);
Expand Down Expand Up @@ -83,21 +83,20 @@ protected function recordThat(AggregateChanged $event)
$this->version += 1;

$this->recordedEvents[] = $event->withVersion($this->version);

$this->apply($event);
}

/**
* Replay past events
*
* @param AggregateChanged[] $historyEvents
* @param \Iterator $historyEvents
*
* @throws \RuntimeException
* @return void
*/
protected function replay(array $historyEvents)
protected function replay(\Iterator $historyEvents)
{
foreach ($historyEvents as $pastEvent) {
/** @var AggregateChanged $pastEvent */
$this->version = $pastEvent->version();

$this->apply($pastEvent);
Expand Down
16 changes: 14 additions & 2 deletions src/EventStoreIntegration/AggregateRootDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Prooph\EventSourcing\EventStoreIntegration;

use Prooph\EventSourcing\AggregateChanged;
use Prooph\EventSourcing\AggregateRoot;

/**
Expand Down Expand Up @@ -49,11 +50,11 @@ public function extractAggregateId(AggregateRoot $anAggregateRoot)

/**
* @param string $arClass
* @param array $aggregateChangedEvents
* @param \Iterator $aggregateChangedEvents
* @return AggregateRoot
* @throws \RuntimeException
*/
public function fromHistory($arClass, array $aggregateChangedEvents)
public function fromHistory($arClass, \Iterator $aggregateChangedEvents)
{
if (! class_exists($arClass)) {
throw new \RuntimeException(
Expand All @@ -64,6 +65,17 @@ public function fromHistory($arClass, array $aggregateChangedEvents)
return $arClass::reconstituteFromHistory($aggregateChangedEvents);
}

/**
* @param AggregateRoot $aggregateRoot
* @param AggregateChanged[] $events
*/
public function applyPendingStreamEvents(AggregateRoot $aggregateRoot, array $events)
{
foreach ($events as $event) {
$aggregateRoot->apply($event);
}
}

/**
* @throws \BadMethodCallException
* @return string representation of the unique identifier of the aggregate root
Expand Down
23 changes: 11 additions & 12 deletions src/EventStoreIntegration/AggregateTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,11 @@ public function extractAggregateId($anEventSourcedAggregateRoot)

/**
* @param AggregateType $aggregateType
* @param Message[] $historyEvents
* @throws \RuntimeException
* @param \Iterator $historyEvents
* @return object reconstructed AggregateRoot
*/
public function reconstituteAggregateFromHistory(AggregateType $aggregateType, $historyEvents)
public function reconstituteAggregateFromHistory(AggregateType $aggregateType, \Iterator $historyEvents)
{
if (count($historyEvents) === 0) {
throw new \RuntimeException(
sprintf(
"Can not reconstitute Aggregate %s from history. No stream events given",
$aggregateType->toString()
)
);
}

return $this->getAggregateRootDecorator()
->fromHistory($aggregateType->toString(), $historyEvents);
}
Expand All @@ -67,6 +57,15 @@ public function extractPendingStreamEvents($anEventSourcedAggregateRoot)
return $this->getAggregateRootDecorator()->extractRecordedEvents($anEventSourcedAggregateRoot);
}

/**
* @param object $anEventSourcedAggregateRoot
* @param Message[] $events
*/
public function applyPendingStreamEvents($anEventSourcedAggregateRoot, array $events)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only method that wants an array instead of an iterator. What would you think of making this consistent with the other methods?

To achieve this, we would do in the AggregateRootDecorator this:

public function extractRecordedEvents(AggregateRoot $anAggregateRoot)
{
    return new \ArrayIterator($anAggregateRoot->popRecordedEvents());
}

Also update the ConfigurableAggregateTranslator and the AggregateRepository classes.

On the other hand, it' not really needed, but the methods here would have consistent method headers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like discussed yesterday in the chat I think we can keep it as-is. The \Iterator type hint represents the event stream and the array type hint says that the given events are not the stream but only the current pending events.
@prolic What do you think?

{
$this->getAggregateRootDecorator()->applyPendingStreamEvents($anEventSourcedAggregateRoot, $events);
}

/**
* @return AggregateRootDecorator
*/
Expand Down
40 changes: 33 additions & 7 deletions tests/AggregateRootTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Prooph\EventSourcingTest;

use Prooph\EventSourcing\EventStoreIntegration\AggregateRootDecorator;
use Prooph\EventSourcingTest\Mock\BrokenUser;
use Prooph\EventSourcingTest\Mock\User;

Expand All @@ -27,24 +28,38 @@ class AggregateRootTest extends TestCase
*/
public function it_applies_event_by_calling_appropriate_event_handler()
{
$decorator = AggregateRootDecorator::newInstance();

$user = User::nameNew('John');

$recordedEvents = $decorator->extractRecordedEvents($user);

//Recording and applying events are two different steps
//In between would be the process of persisting recorded events to an event stream
//Only if this was successful the events can be applied to the aggregate root
//We skip the persistence process here and apply the events directly
$decorator->applyPendingStreamEvents($user, $recordedEvents);

$this->assertEquals('John', $user->name());

$user->changeName('Max');

$additionalRecordedEvents = $decorator->extractRecordedEvents($user);

$decorator->applyPendingStreamEvents($user, $additionalRecordedEvents);

$this->assertEquals('Max', $user->name());

$pendingEvents = $user->accessRecordedEvents();
$recordedEvents = array_merge($recordedEvents, $additionalRecordedEvents);

$this->assertEquals(2, count($pendingEvents));
$this->assertEquals(2, count($recordedEvents));

$userCreatedEvent = $pendingEvents[0];
$userCreatedEvent = $recordedEvents[0];

$this->assertEquals('John', $userCreatedEvent->name());
$this->assertEquals(1, $userCreatedEvent->version());

$userNameChangedEvent = $pendingEvents[1];
$userNameChangedEvent = $recordedEvents[1];

$this->assertEquals('Max', $userNameChangedEvent->newUsername());
$this->assertEquals(2, $userNameChangedEvent->version());
Expand All @@ -57,7 +72,12 @@ public function it_applies_event_by_calling_appropriate_event_handler()
*/
public function it_throws_exception_when_no_handler_on_aggregate()
{
BrokenUser::nameNew('John');
$brokenUser = BrokenUser::nameNew('John');

AggregateRootDecorator::newInstance()->applyPendingStreamEvents(
$brokenUser,
$brokenUser->accessRecordedEvents()
);
}

/**
Expand All @@ -67,11 +87,17 @@ public function it_reconstructs_itself_from_history()
{
$user = User::nameNew('John');

$this->assertEquals('John', $user->name());
$recordedEvents = $user->accessRecordedEvents();

AggregateRootDecorator::newInstance()->applyPendingStreamEvents($user, $recordedEvents);

$user->changeName('Max');

$historyEvents = $user->accessRecordedEvents();
$additionalRecordedEvents = $user->accessRecordedEvents();

AggregateRootDecorator::newInstance()->applyPendingStreamEvents($user, $additionalRecordedEvents);

$historyEvents = new \ArrayIterator(array_merge($recordedEvents, $additionalRecordedEvents));

$sameUser = User::fromHistory($historyEvents);

Expand Down
2 changes: 1 addition & 1 deletion tests/EventStoreIntegration/AggregateRootDecoratorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class AggregateRootDecoratorTest extends TestCase
public function it_throws_exception_when_reconstitute_from_history_with_invalid_class()
{
$decorator = AggregateRootDecorator::newInstance();
$decorator->fromHistory('UnknownClass', []);
$decorator->fromHistory('UnknownClass', new \ArrayIterator([]));
}

/**
Expand Down
23 changes: 7 additions & 16 deletions tests/EventStoreIntegration/AggregateTranslatorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected function setUp()

$this->eventStore->beginTransaction();

$this->eventStore->create(new Stream(new StreamName('event_stream'), []));
$this->eventStore->create(new Stream(new StreamName('event_stream'), new \ArrayIterator([])));

$this->eventStore->commit();

Expand All @@ -64,12 +64,16 @@ public function it_translates_aggregate_back_and_forth()

$user = User::nameNew('John Doe');

$user->changeName('Max Mustermann');

$this->repository->addAggregateRoot($user);

$this->eventStore->commit();

$this->eventStore->beginTransaction();

$user->changeName('Max Mustermann');

$this->eventStore->commit();

$this->resetRepository();

$loadedUser = $this->repository->getAggregateRoot($user->id());
Expand All @@ -90,19 +94,6 @@ public function it_can_use_custom_aggregate_root_decorator()
$this->assertSame($mock, $translator->getAggregateRootDecorator());
}

/**
* @test
* @expectedException RuntimeException
* @expectedExceptionMessage Can not reconstitute Aggregate Prooph\EventSourcingTest\Mock\User from history. No stream events given
*/
public function it_cannot_reconstitute_from_history_without_stream_events()
{
$aggregateType = AggregateType::fromAggregateRootClass('Prooph\EventSourcingTest\Mock\User');

$translator = new AggregateTranslator();
$translator->reconstituteAggregateFromHistory($aggregateType, []);
}

protected function resetRepository()
{
$this->repository = new AggregateRepository(
Expand Down
5 changes: 2 additions & 3 deletions tests/Mock/User.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

namespace Prooph\EventSourcingTest\Mock;

use Prooph\EventSourcing\AggregateChanged;
use Prooph\EventSourcing\AggregateRoot;
use Rhumsaa\Uuid\Uuid;

Expand Down Expand Up @@ -44,10 +43,10 @@ public static function nameNew($name)
}

/**
* @param AggregateChanged[] $historyEvents
* @param \Iterator $historyEvents
* @return User
*/
public static function fromHistory(array $historyEvents)
public static function fromHistory(\Iterator $historyEvents)
{
return self::reconstituteFromHistory($historyEvents);
}
Expand Down