Skip to content

Commit

Permalink
Merge pull request #98 from prolic/event_stream_iterator
Browse files Browse the repository at this point in the history
Implement EventStream as an iterator
  • Loading branch information
codeliner committed Sep 24, 2015
2 parents 92ffe11 + f611c57 commit b9ac9f2
Show file tree
Hide file tree
Showing 16 changed files with 299 additions and 149 deletions.
8 changes: 4 additions & 4 deletions src/Adapter/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Prooph\EventStore\Adapter;

use Prooph\Common\Messaging\Message;
use Iterator;
use Prooph\EventStore\Stream\Stream;
use Prooph\EventStore\Stream\StreamName;
use Prooph\EventStore\Exception\StreamNotFoundException;
Expand All @@ -30,11 +30,11 @@ public function create(Stream $stream);

/**
* @param StreamName $streamName
* @param Message[] $domainEvents
* @param Iterator $domainEvents
* @throws StreamNotFoundException If stream does not exist
* @return void
*/
public function appendTo(StreamName $streamName, array $domainEvents);
public function appendTo(StreamName $streamName, Iterator $domainEvents);

/**
* @param StreamName $streamName
Expand All @@ -47,7 +47,7 @@ public function load(StreamName $streamName, $minVersion = null);
* @param StreamName $streamName
* @param array $metadata If empty array is provided, then all events should be returned
* @param null|int $minVersion Minimum version an event should have
* @return Message[]
* @return Iterator
*/
public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata, $minVersion = null);
}
35 changes: 23 additions & 12 deletions src/Adapter/InMemoryAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

namespace Prooph\EventStore\Adapter;

use AppendIterator;
use ArrayIterator;
use Iterator;
use Prooph\Common\Messaging\Message;
use Prooph\EventStore\Exception\StreamNotFoundException;
use Prooph\EventStore\Stream\Stream;
Expand All @@ -25,9 +28,9 @@
class InMemoryAdapter implements Adapter
{
/**
* @var array
* @var Iterator[]
*/
protected $streams = [];
protected $streams;

/**
* @param Stream $stream
Expand All @@ -40,11 +43,11 @@ public function create(Stream $stream)

/**
* @param StreamName $streamName
* @param Message[] $domainEvents
* @param Iterator $domainEvents
* @throws StreamNotFoundException
* @return void
*/
public function appendTo(StreamName $streamName, array $domainEvents)
public function appendTo(StreamName $streamName, Iterator $domainEvents)
{
if (! isset($this->streams[$streamName->toString()])) {
throw new StreamNotFoundException(
Expand All @@ -55,7 +58,11 @@ public function appendTo(StreamName $streamName, array $domainEvents)
);
}

$this->streams[$streamName->toString()] = array_merge($this->streams[$streamName->toString()], $domainEvents);
$appendIterator = new AppendIterator();
$appendIterator->append($this->streams[$streamName->toString()]);
$appendIterator->append($domainEvents);

$this->streams[$streamName->toString()] = $appendIterator;
}

/**
Expand All @@ -69,7 +76,6 @@ public function load(StreamName $streamName, $minVersion = null)
return;
}

/** @var $streamEvents Message[] */
$streamEvents = $this->streams[$streamName->toString()];

if (!is_null($minVersion)) {
Expand All @@ -81,7 +87,7 @@ public function load(StreamName $streamName, $minVersion = null)
}
}

return new Stream($streamName, $filteredEvents);
return new Stream($streamName, new \ArrayIterator($filteredEvents));
}

return new Stream($streamName, $streamEvents);
Expand All @@ -92,16 +98,16 @@ public function load(StreamName $streamName, $minVersion = null)
* @param array $metadata
* @param null|int $minVersion
* @throws StreamNotFoundException
* @return Message[]
* @return Iterator
*/
public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata, $minVersion = null)
{
$streamEvents = [];

if (! isset($this->streams[$streamName->toString()])) {
return [];
return new ArrayIterator();
}

$streamEvents = [];

foreach ($this->streams[$streamName->toString()] as $index => $streamEvent) {
if ($this->matchMetadataWith($streamEvent, $metadata)) {
if (is_null($minVersion) || $streamEvent->version() >= $minVersion) {
Expand All @@ -110,9 +116,14 @@ public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata
}
}

return $streamEvents;
return new ArrayIterator($streamEvents);
}

/**
* @param Message $streamEvent
* @param array $metadata
* @return bool
*/
protected function matchMetadataWith(Message $streamEvent, array $metadata)
{
if (empty($metadata)) {
Expand Down
7 changes: 5 additions & 2 deletions src/Aggregate/AggregateRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Prooph\EventStore\Aggregate;

use ArrayIterator;
use Assert\Assertion;
use Prooph\EventStore\Aggregate\Exception\AggregateTypeException;
use Prooph\EventStore\EventStore;
Expand Down Expand Up @@ -96,7 +97,7 @@ public function __invoke()
$this->streamStrategy->appendEvents(
$this->aggregateType,
$this->aggregateTranslator->extractAggregateId($eventSourcedAggregateRoot),
$this->pendingStreamEvents[$index],
new ArrayIterator($this->pendingStreamEvents[$index]),
$eventSourcedAggregateRoot
);
}
Expand Down Expand Up @@ -139,7 +140,9 @@ public function addAggregateRoot($eventSourcedAggregateRoot)

$aggregateId = $this->aggregateTranslator->extractAggregateId($eventSourcedAggregateRoot);

$domainEvents = $this->aggregateTranslator->extractPendingStreamEvents($eventSourcedAggregateRoot);
$domainEvents = new ArrayIterator(
$this->aggregateTranslator->extractPendingStreamEvents($eventSourcedAggregateRoot)
);

$this->streamStrategy->addEventsForNewAggregateRoot($this->aggregateType, $aggregateId, $domainEvents, $eventSourcedAggregateRoot);

Expand Down
36 changes: 21 additions & 15 deletions src/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

namespace Prooph\EventStore;

use Assert\Assertion;
use AppendIterator;
use ArrayIterator;
use Iterator;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Messaging\Message;
use Prooph\EventStore\Adapter\Adapter;
use Prooph\EventStore\Adapter\Feature\CanHandleTransaction;
use Prooph\EventStore\Exception\StreamNotFoundException;
Expand Down Expand Up @@ -39,9 +40,9 @@ class EventStore
protected $actionEventEmitter;

/**
* @var Message[]
* @var Iterator
*/
protected $recordedEvents = [];
protected $recordedEvents;

/**
* @var int
Expand All @@ -58,6 +59,7 @@ public function __construct(Adapter $adapter, ActionEventEmitter $actionEventEmi
{
$this->adapter = $adapter;
$this->actionEventEmitter = $actionEventEmitter;
$this->recordedEvents = new ArrayIterator();
}

/**
Expand All @@ -71,7 +73,7 @@ public function getAdapter()
}

/**
* @return Message[]
* @return Iterator
*/
public function getRecordedEvents()
{
Expand Down Expand Up @@ -103,7 +105,11 @@ public function create(Stream $stream)

$this->adapter->create($stream);

$this->recordedEvents = array_merge($this->recordedEvents, $stream->streamEvents());
$appendIterator = new AppendIterator();
$appendIterator->append($this->recordedEvents);
$appendIterator->append($stream->streamEvents());

$this->recordedEvents = $appendIterator;

$event->setName(__FUNCTION__ . '.post');

Expand All @@ -112,16 +118,12 @@ public function create(Stream $stream)

/**
* @param StreamName $streamName
* @param Message[] $streamEvents
* @param Iterator $streamEvents
* @throws Exception\RuntimeException
* @return void
*/
public function appendTo(StreamName $streamName, array $streamEvents)
public function appendTo(StreamName $streamName, Iterator $streamEvents)
{
foreach ($streamEvents as $streamEvent) {
Assertion::isInstanceOf($streamEvent, Message::class);
}

$argv = ['streamName' => $streamName, 'streamEvents' => $streamEvents];

$event = $this->actionEventEmitter->getNewActionEvent(__FUNCTION__ . '.pre', $this, $argv);
Expand All @@ -141,7 +143,11 @@ public function appendTo(StreamName $streamName, array $streamEvents)

$this->adapter->appendTo($streamName, $streamEvents);

$this->recordedEvents = array_merge($this->recordedEvents, $streamEvents);
$appendIterator = new AppendIterator();
$appendIterator->append($this->recordedEvents);
$appendIterator->append($streamEvents);

$this->recordedEvents = $appendIterator;

$event->setName(__FUNCTION__, '.post');

Expand Down Expand Up @@ -214,7 +220,7 @@ public function load(StreamName $streamName, $minVersion = null)
* @param StreamName $streamName
* @param array $metadata
* @param null|int $minVersion
* @return Message[]
* @return Iterator
*/
public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata, $minVersion = null)
{
Expand Down Expand Up @@ -311,7 +317,7 @@ public function commit()

$event = $this->getActionEventEmitter()->getNewActionEvent(__FUNCTION__ . '.post', $this, ['recordedEvents' => $this->recordedEvents]);

$this->recordedEvents = [];
$this->recordedEvents = new ArrayIterator();

$this->getActionEventEmitter()->dispatch($event);
}
Expand Down
39 changes: 27 additions & 12 deletions src/Stream/AggregateStreamStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace Prooph\EventStore\Stream;

use Assert\Assertion;
use Prooph\Common\Messaging\Message;
use Iterator;
use Prooph\EventStore\Aggregate\AggregateType;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception;
Expand Down Expand Up @@ -51,13 +51,17 @@ public function __construct(EventStore $eventStore, array $aggregateTypeStreamMa
/**
* @param AggregateType $repositoryAggregateType
* @param string $aggregateId
* @param Message[] $streamEvents
* @param Iterator $streamEvents
* @param object $aggregateRoot
* @throws Exception\InvalidArgumentException
* @return void
*/
public function addEventsForNewAggregateRoot(AggregateType $repositoryAggregateType, $aggregateId, array $streamEvents, $aggregateRoot)
{
public function addEventsForNewAggregateRoot(
AggregateType $repositoryAggregateType,
$aggregateId,
Iterator $streamEvents,
$aggregateRoot
) {
$arType = AggregateType::fromAggregateRoot($aggregateRoot);

if (! $repositoryAggregateType->equals($arType)) {
Expand All @@ -68,23 +72,34 @@ public function addEventsForNewAggregateRoot(AggregateType $repositoryAggregateT
));
}

$this->eventStore->create(new Stream($this->buildStreamName($repositoryAggregateType, $aggregateId), $streamEvents));
$this->eventStore->create(new Stream(
$this->buildStreamName($repositoryAggregateType, $aggregateId),
$streamEvents
));
}

/**
* @param AggregateType $repositoryAggregateType
* @param string $aggregateId
* @param Message[] $streamEvents
* @param Iterator $streamEvents
* @param object $aggregateRoot
* @throws Exception\InvalidArgumentException
* @return void
*/
public function appendEvents(AggregateType $repositoryAggregateType, $aggregateId, array $streamEvents, $aggregateRoot)
{
public function appendEvents(
AggregateType $repositoryAggregateType,
$aggregateId,
Iterator $streamEvents,
$aggregateRoot
) {
$arType = AggregateType::fromAggregateRoot($aggregateRoot);

if (! $repositoryAggregateType->equals($arType)) {
throw new Exception\InvalidArgumentException(sprintf('aggregate root mismatch between repository type %s and object type %s', $repositoryAggregateType->toString(), $arType->toString()));
throw new Exception\InvalidArgumentException(sprintf(
'aggregate root mismatch between repository type %s and object type %s',
$repositoryAggregateType->toString(),
$arType->toString()
));
}

$this->eventStore->appendTo($this->buildStreamName($repositoryAggregateType, $aggregateId), $streamEvents);
Expand All @@ -94,7 +109,7 @@ public function appendEvents(AggregateType $repositoryAggregateType, $aggregateI
* @param AggregateType $aggregateType
* @param string $aggregateId
* @param null|int $minVersion
* @return Message[]
* @return Iterator
*/
public function read(AggregateType $aggregateType, $aggregateId, $minVersion = null)
{
Expand Down Expand Up @@ -122,10 +137,10 @@ protected function buildStreamName(AggregateType $aggregateType, $aggregateId)
* No aggregate type information stored as metadata. The repository aggregate type needs to be used.
*
* @param AggregateType $repositoryAggregateType
* @param Message[] $streamEvents
* @param Iterator $streamEvents
* @return AggregateType
*/
public function getAggregateRootType(AggregateType $repositoryAggregateType, array &$streamEvents)
public function getAggregateRootType(AggregateType $repositoryAggregateType, Iterator $streamEvents)
{
return $repositoryAggregateType;
}
Expand Down
Loading

0 comments on commit b9ac9f2

Please sign in to comment.