Skip to content

Commit

Permalink
improve default event store
Browse files Browse the repository at this point in the history
  • Loading branch information
juliangut committed Sep 23, 2019
1 parent 3981615 commit 248eed5
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 152 deletions.
8 changes: 8 additions & 0 deletions src/Aggregate/AggregateVersion.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ final class AggregateVersion
* Version constructor.
*
* @param int $value
*
* @throws AggregateException
*/
public function __construct(int $value)
{
Expand Down Expand Up @@ -74,10 +76,16 @@ public function getNext(): self
/**
* Get previous version.
*
* @throws AggregateException
*
* @return AggregateVersion
*/
public function getPrevious(): self
{
if ($this->value === 0) {
throw new AggregateException('Version value cannot be lowered below 0');
}

$clone = clone $this;
$clone->value = $this->value - 1;

Expand Down
77 changes: 34 additions & 43 deletions src/Store/Event/AbstractEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,61 +91,41 @@ public function loadTo(
/**
* {@inheritdoc}
*/
public function store(
StoreStream $stream,
AggregateEventStream $eventStream,
AggregateVersion $expectedVersion
): void {
public function store(StoreStream $stream, AggregateEventStream $eventStream): void
{
if ($eventStream->count() === 0) {
return;
}
$eventStream->rewind();

if (!$this->streamExists($stream)) {
$this->createStream($stream);
}

$startVersion = $this->getStreamVersion($stream);
if (!$startVersion->isEqualTo($expectedVersion)) {
$eventStream->rewind();
$expectedVersion = $eventStream->current()->getAggregateVersion()->getPrevious();

$currentVersion = $this->getStreamVersion($stream);
if (!$currentVersion->isEqualTo($expectedVersion)) {
throw new ConcurrencyException(\sprintf(
'Expected stream version "%s" does not match current version "%s"',
$expectedVersion->getValue(),
$startVersion->getValue()
$currentVersion->getValue()
));
}

$this->storeEvents($stream, $eventStream, $expectedVersion);
foreach ($eventStream as $aggregateEvent) {
$aggregateVersion = $aggregateEvent->getAggregateVersion();

$eventStream->rewind();
$events = \iterator_to_array($eventStream);
/** @var AggregateVersion $finalVersion */
$finalVersion = \end($events)->getAggregateVersion();
if (!$aggregateVersion->getPrevious()->isEqualTo($currentVersion)) {
throw new ConcurrencyException('Event stream cannot be stored due to versions mismatch');
}

$endVersion = $this->getStreamVersion($stream);
if (!$endVersion->isEqualTo($finalVersion)) {
throw new ConcurrencyException(\sprintf(
'Expected final stream version "%s" does not match current version "%s"',
$finalVersion->getValue(),
$endVersion->getValue()
));
$currentVersion = $currentVersion->getNext();
}
}

/**
* Check stream existence.
*
* @param StoreStream $stream
*
* @return bool
*/
abstract protected function streamExists(StoreStream $stream): bool;

/**
* Create stream.
*
* @param StoreStream $stream
*/
abstract protected function createStream(StoreStream $stream): void;
$eventStream->rewind();
$this->storeEvents($stream, $eventStream);
}

/**
* Load aggregate events from a version.
Expand Down Expand Up @@ -182,13 +162,24 @@ abstract protected function loadEventsTo(
*
* @param StoreStream $stream
* @param AggregateEventStream $eventStream
* @param AggregateVersion $expectedVersion
*/
abstract protected function storeEvents(
StoreStream $stream,
AggregateEventStream $eventStream,
AggregateVersion $expectedVersion
): void;
abstract protected function storeEvents(StoreStream $stream, AggregateEventStream $eventStream): void;

/**
* Check stream existence.
*
* @param StoreStream $stream
*
* @return bool
*/
abstract protected function streamExists(StoreStream $stream): bool;

/**
* Create stream.
*
* @param StoreStream $stream
*/
abstract protected function createStream(StoreStream $stream): void;

/**
* Get current stream version.
Expand Down
7 changes: 1 addition & 6 deletions src/Store/Event/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,9 @@ public function loadTo(
*
* @param StoreStream $stream
* @param AggregateEventStream $eventStream
* @param AggregateVersion $expectedVersion
*
* @throws \Gears\EventSourcing\Store\Event\Exception\ConcurrencyException
* @throws \Gears\EventSourcing\Store\Event\Exception\EventStoreException
*/
public function store(
StoreStream $stream,
AggregateEventStream $eventStream,
AggregateVersion $expectedVersion
): void;
public function store(StoreStream $stream, AggregateEventStream $eventStream): void;
}
51 changes: 18 additions & 33 deletions src/Store/Event/InMemoryEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Gears\EventSourcing\Event\AggregateEvent;
use Gears\EventSourcing\Event\AggregateEventIteratorStream;
use Gears\EventSourcing\Event\AggregateEventStream;
use Gears\EventSourcing\Store\Event\Exception\EventStoreException;
use Gears\EventSourcing\Store\StoreStream;

/**
Expand Down Expand Up @@ -95,28 +94,14 @@ protected function loadEventsTo(
/**
* {@inheritdoc}
*/
protected function storeEvents(
StoreStream $stream,
AggregateEventStream $eventStream,
AggregateVersion $expectedVersion
): void {
$lastVersion = $this->getStreamVersion($stream);

protected function storeEvents(StoreStream $stream, AggregateEventStream $eventStream): void
{
$streamId = $this->getStreamId($stream);

foreach ($eventStream as $aggregateEvent) {
$aggregateVersion = $aggregateEvent->getAggregateVersion();

if (!$aggregateVersion->getPrevious()->isEqualTo($lastVersion)) {
throw new EventStoreException(\sprintf(
'Aggregate event for version "%s" cannot be stored',
$aggregateVersion->getValue()
));
}

$this->streams[$streamId][$aggregateVersion->getValue()] = $aggregateEvent;

$lastVersion = $lastVersion->getNext();
}

\ksort($this->streams[$streamId]);
Expand All @@ -125,35 +110,35 @@ protected function storeEvents(
/**
* {@inheritdoc}
*/
protected function getStreamVersion(StoreStream $stream): AggregateVersion
protected function streamExists(StoreStream $stream): bool
{
$streamId = $this->getStreamId($stream);

if (\count($this->streams[$streamId]) === 0) {
return new AggregateVersion(0);
}

$versions = \array_keys($this->streams[$streamId]);
/** @var int $version */
$version = \end($versions);

return new AggregateVersion($version);
return isset($this->streams[$this->getStreamId($stream)]);
}

/**
* {@inheritdoc}
*/
protected function streamExists(StoreStream $stream): bool
protected function createStream(StoreStream $stream): void
{
return isset($this->streams[$this->getStreamId($stream)]);
$this->streams[$this->getStreamId($stream)] = [];
}

/**
* {@inheritdoc}
*/
protected function createStream(StoreStream $stream): void
protected function getStreamVersion(StoreStream $stream): AggregateVersion
{
$this->streams[$this->getStreamId($stream)] = [];
$streamId = $this->getStreamId($stream);

if (\count($this->streams[$streamId]) === 0) {
return new AggregateVersion(0);
}

$versions = \array_keys($this->streams[$streamId]);
/** @var int $version */
$version = \end($versions);

return new AggregateVersion($version);
}

/**
Expand Down
4 changes: 1 addition & 3 deletions src/Store/Repository/AbstractAggregateRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ final public function saveAggregateRoot(AggregateRoot $aggregateRoot): void
return;
}

$originalVersion = $eventStream->current()->getAggregateVersion()->getPrevious();

$this->eventStore->store($storeStream, $eventStream, $originalVersion);
$this->eventStore->store($storeStream, $eventStream);
}

/**
Expand Down
10 changes: 10 additions & 0 deletions tests/EventSourcing/Aggregate/AggregateVersionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public function testGetNext(): void
static::assertNotSame($version, $next);
}

public function testInvalidPrevious(): void
{
$this->expectException(AggregateException::class);
$this->expectExceptionMessage('Version value cannot be lowered below 0');

$version = new AggregateVersion(0);

$version->getPrevious();
}

public function testGetPrevious(): void
{
$version = new AggregateVersion(10);
Expand Down
48 changes: 25 additions & 23 deletions tests/EventSourcing/Store/Event/AbstractEventStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public function testLoadToEmptyEventStream(): void
public function testStoreInvalidVersion(): void
{
$this->expectException(ConcurrencyException::class);
$this->expectExceptionMessage('Expected stream version "10" does not match current version "0"');
$this->expectExceptionMessage('Expected stream version "9" does not match current version "0"');

$identity = $this->getMockBuilder(Identity::class)->disableOriginalConstructor()->getMock();
$identity->expects(static::any())
Expand All @@ -156,44 +156,48 @@ public function testStoreInvalidVersion(): void
$event = AbstractAggregateEventStub::instance($identity);
$eventStream = new AggregateEventArrayStream([$event->withAggregateVersion(new AggregateVersion(10))]);

(new AbstractEventStoreStub())->store($stream, $eventStream, new AggregateVersion(10));
(new AbstractEventStoreStub())->store($stream, $eventStream);
}

public function testStoreEmpty(): void
public function testStoreInvalidStreamVersion(): void
{
$this->expectException(ConcurrencyException::class);
$this->expectExceptionMessage('Event stream cannot be stored due to versions mismatch');

$identity = $this->getMockBuilder(Identity::class)->disableOriginalConstructor()->getMock();
$identity->expects(static::any())
->method('getValue')
->will(static::returnValue('aaa'));
/** @var Identity $identity */
$stream = GenericStoreStream::fromAggregateData(AbstractAggregateRootStub::class, $identity);

$eventStream = new AggregateEventEmptyStream();

$eventStore = (new AbstractEventStoreStub());
$eventStore->store($stream, $eventStream, new AggregateVersion(1));
$event = AbstractAggregateEventStub::instance($identity);
$eventStream = new AggregateEventArrayStream([
$event->withAggregateVersion(new AggregateVersion(1)),
$event->withAggregateVersion(new AggregateVersion(2)),
$event->withAggregateVersion(new AggregateVersion(5)),
]);

$loadedEvents = $eventStore->loadFrom($stream, new AggregateVersion(2));
static::assertCount(0, $loadedEvents);
$eventStore = new AbstractEventStoreStub(null, true);
$eventStore->store($stream, $eventStream);
}

public function testStoreError(): void
public function testStoreEmpty(): void
{
$this->expectException(ConcurrencyException::class);
$this->expectExceptionMessage('Expected final stream version "1" does not match');

$identity = $this->getMockBuilder(Identity::class)->disableOriginalConstructor()->getMock();
$identity->expects(static::any())
->method('getValue')
->will(static::returnValue('aaa'));
/** @var Identity $identity */
$stream = GenericStoreStream::fromAggregateData(AbstractAggregateRootStub::class, $identity);

$event = AbstractAggregateEventStub::instance($identity);
$eventStream = new AggregateEventArrayStream([$event->withAggregateVersion(new AggregateVersion(1))]);
$eventStream = new AggregateEventEmptyStream();

$eventStore = new AbstractEventStoreStub(null, true);
$eventStore->store($stream, $eventStream, new AggregateVersion(0));
$eventStore = (new AbstractEventStoreStub());
$eventStore->store($stream, $eventStream);

$loadedEvents = $eventStore->loadFrom($stream, new AggregateVersion(2));
static::assertCount(0, $loadedEvents);
}

public function testStore(): void
Expand All @@ -209,7 +213,7 @@ public function testStore(): void
$eventStream = new AggregateEventArrayStream([$event->withAggregateVersion(new AggregateVersion(1))]);

$eventStore = new AbstractEventStoreStub();
$eventStore->store($stream, $eventStream, new AggregateVersion(0));
$eventStore->store($stream, $eventStream);

$loadedFromEvents = $eventStore->loadTo($stream, new AggregateVersion(1));

Expand All @@ -234,16 +238,14 @@ public function testLoadFrom(): void
$event->withAggregateVersion(new AggregateVersion(1)),
$event->withAggregateVersion(new AggregateVersion(2)),
$event->withAggregateVersion(new AggregateVersion(3)),
]),
new AggregateVersion(0)
])
);
$eventStore->store(
$stream,
new AggregateEventArrayStream([
$event->withAggregateVersion(new AggregateVersion(4)),
$event->withAggregateVersion(new AggregateVersion(5)),
]),
new AggregateVersion(3)
])
);

$loadedEvents = $eventStore->loadFrom($stream, new AggregateVersion(2), 2);
Expand Down Expand Up @@ -282,7 +284,7 @@ public function testLoadTo(): void
]);

$eventStore = new AbstractEventStoreStub();
$eventStore->store($stream, $eventStream, new AggregateVersion(0));
$eventStore->store($stream, $eventStream);

$loadedEvents = $eventStore->loadTo($stream, new AggregateVersion(4), new AggregateVersion(2));

Expand Down
Loading

0 comments on commit 248eed5

Please sign in to comment.