Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement EventStream as an iterator #98

Merged
merged 3 commits into from
Sep 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/Adapter/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Prooph\EventStore\Adapter;

use Prooph\Common\Messaging\Message;
use Iterator;
use Prooph\EventStore\Stream\Stream;
use Prooph\EventStore\Stream\StreamName;
use Prooph\EventStore\Exception\StreamNotFoundException;
Expand All @@ -30,11 +30,11 @@ public function create(Stream $stream);

/**
* @param StreamName $streamName
* @param Message[] $domainEvents
* @param Iterator $domainEvents
* @throws StreamNotFoundException If stream does not exist
* @return void
*/
public function appendTo(StreamName $streamName, array $domainEvents);
public function appendTo(StreamName $streamName, Iterator $domainEvents);

/**
* @param StreamName $streamName
Expand All @@ -47,7 +47,7 @@ public function load(StreamName $streamName, $minVersion = null);
* @param StreamName $streamName
* @param array $metadata If empty array is provided, then all events should be returned
* @param null|int $minVersion Minimum version an event should have
* @return Message[]
* @return Iterator
*/
public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata, $minVersion = null);
}
35 changes: 23 additions & 12 deletions src/Adapter/InMemoryAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

namespace Prooph\EventStore\Adapter;

use AppendIterator;
use ArrayIterator;
use Iterator;
use Prooph\Common\Messaging\Message;
use Prooph\EventStore\Exception\StreamNotFoundException;
use Prooph\EventStore\Stream\Stream;
Expand All @@ -25,9 +28,9 @@
class InMemoryAdapter implements Adapter
{
/**
* @var array
* @var Iterator[]
*/
protected $streams = [];
protected $streams;

/**
* @param Stream $stream
Expand All @@ -40,11 +43,11 @@ public function create(Stream $stream)

/**
* @param StreamName $streamName
* @param Message[] $domainEvents
* @param Iterator $domainEvents
* @throws StreamNotFoundException
* @return void
*/
public function appendTo(StreamName $streamName, array $domainEvents)
public function appendTo(StreamName $streamName, Iterator $domainEvents)
{
if (! isset($this->streams[$streamName->toString()])) {
throw new StreamNotFoundException(
Expand All @@ -55,7 +58,11 @@ public function appendTo(StreamName $streamName, array $domainEvents)
);
}

$this->streams[$streamName->toString()] = array_merge($this->streams[$streamName->toString()], $domainEvents);
$appendIterator = new AppendIterator();
$appendIterator->append($this->streams[$streamName->toString()]);
$appendIterator->append($domainEvents);

$this->streams[$streamName->toString()] = $appendIterator;
}

/**
Expand All @@ -69,7 +76,6 @@ public function load(StreamName $streamName, $minVersion = null)
return;
}

/** @var $streamEvents Message[] */
$streamEvents = $this->streams[$streamName->toString()];

if (!is_null($minVersion)) {
Expand All @@ -81,7 +87,7 @@ public function load(StreamName $streamName, $minVersion = null)
}
}

return new Stream($streamName, $filteredEvents);
return new Stream($streamName, new \ArrayIterator($filteredEvents));
}

return new Stream($streamName, $streamEvents);
Expand All @@ -92,16 +98,16 @@ public function load(StreamName $streamName, $minVersion = null)
* @param array $metadata
* @param null|int $minVersion
* @throws StreamNotFoundException
* @return Message[]
* @return Iterator
*/
public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata, $minVersion = null)
{
$streamEvents = [];

if (! isset($this->streams[$streamName->toString()])) {
return [];
return new ArrayIterator();
}

$streamEvents = [];

foreach ($this->streams[$streamName->toString()] as $index => $streamEvent) {
if ($this->matchMetadataWith($streamEvent, $metadata)) {
if (is_null($minVersion) || $streamEvent->version() >= $minVersion) {
Expand All @@ -110,9 +116,14 @@ public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata
}
}

return $streamEvents;
return new ArrayIterator($streamEvents);
}

/**
* @param Message $streamEvent
* @param array $metadata
* @return bool
*/
protected function matchMetadataWith(Message $streamEvent, array $metadata)
{
if (empty($metadata)) {
Expand Down
7 changes: 5 additions & 2 deletions src/Aggregate/AggregateRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Prooph\EventStore\Aggregate;

use ArrayIterator;
use Assert\Assertion;
use Prooph\EventStore\Aggregate\Exception\AggregateTypeException;
use Prooph\EventStore\EventStore;
Expand Down Expand Up @@ -96,7 +97,7 @@ public function __invoke()
$this->streamStrategy->appendEvents(
$this->aggregateType,
$this->aggregateTranslator->extractAggregateId($eventSourcedAggregateRoot),
$this->pendingStreamEvents[$index],
new ArrayIterator($this->pendingStreamEvents[$index]),
$eventSourcedAggregateRoot
);
}
Expand Down Expand Up @@ -139,7 +140,9 @@ public function addAggregateRoot($eventSourcedAggregateRoot)

$aggregateId = $this->aggregateTranslator->extractAggregateId($eventSourcedAggregateRoot);

$domainEvents = $this->aggregateTranslator->extractPendingStreamEvents($eventSourcedAggregateRoot);
$domainEvents = new ArrayIterator(
$this->aggregateTranslator->extractPendingStreamEvents($eventSourcedAggregateRoot)
);

$this->streamStrategy->addEventsForNewAggregateRoot($this->aggregateType, $aggregateId, $domainEvents, $eventSourcedAggregateRoot);

Expand Down
36 changes: 21 additions & 15 deletions src/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

namespace Prooph\EventStore;

use Assert\Assertion;
use AppendIterator;
use ArrayIterator;
use Iterator;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Messaging\Message;
use Prooph\EventStore\Adapter\Adapter;
use Prooph\EventStore\Adapter\Feature\CanHandleTransaction;
use Prooph\EventStore\Exception\StreamNotFoundException;
Expand Down Expand Up @@ -39,9 +40,9 @@ class EventStore
protected $actionEventEmitter;

/**
* @var Message[]
* @var Iterator
*/
protected $recordedEvents = [];
protected $recordedEvents;

/**
* @var int
Expand All @@ -58,6 +59,7 @@ public function __construct(Adapter $adapter, ActionEventEmitter $actionEventEmi
{
$this->adapter = $adapter;
$this->actionEventEmitter = $actionEventEmitter;
$this->recordedEvents = new ArrayIterator();
}

/**
Expand All @@ -71,7 +73,7 @@ public function getAdapter()
}

/**
* @return Message[]
* @return Iterator
*/
public function getRecordedEvents()
{
Expand Down Expand Up @@ -103,7 +105,11 @@ public function create(Stream $stream)

$this->adapter->create($stream);

$this->recordedEvents = array_merge($this->recordedEvents, $stream->streamEvents());
$appendIterator = new AppendIterator();
$appendIterator->append($this->recordedEvents);
$appendIterator->append($stream->streamEvents());

$this->recordedEvents = $appendIterator;

$event->setName(__FUNCTION__ . '.post');

Expand All @@ -112,16 +118,12 @@ public function create(Stream $stream)

/**
* @param StreamName $streamName
* @param Message[] $streamEvents
* @param Iterator $streamEvents
* @throws Exception\RuntimeException
* @return void
*/
public function appendTo(StreamName $streamName, array $streamEvents)
public function appendTo(StreamName $streamName, Iterator $streamEvents)
{
foreach ($streamEvents as $streamEvent) {
Assertion::isInstanceOf($streamEvent, Message::class);
}

$argv = ['streamName' => $streamName, 'streamEvents' => $streamEvents];

$event = $this->actionEventEmitter->getNewActionEvent(__FUNCTION__ . '.pre', $this, $argv);
Expand All @@ -141,7 +143,11 @@ public function appendTo(StreamName $streamName, array $streamEvents)

$this->adapter->appendTo($streamName, $streamEvents);

$this->recordedEvents = array_merge($this->recordedEvents, $streamEvents);
$appendIterator = new AppendIterator();
$appendIterator->append($this->recordedEvents);
$appendIterator->append($streamEvents);

$this->recordedEvents = $appendIterator;

$event->setName(__FUNCTION__, '.post');

Expand Down Expand Up @@ -214,7 +220,7 @@ public function load(StreamName $streamName, $minVersion = null)
* @param StreamName $streamName
* @param array $metadata
* @param null|int $minVersion
* @return Message[]
* @return Iterator
*/
public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata, $minVersion = null)
{
Expand Down Expand Up @@ -311,7 +317,7 @@ public function commit()

$event = $this->getActionEventEmitter()->getNewActionEvent(__FUNCTION__ . '.post', $this, ['recordedEvents' => $this->recordedEvents]);

$this->recordedEvents = [];
$this->recordedEvents = new ArrayIterator();

$this->getActionEventEmitter()->dispatch($event);
}
Expand Down
39 changes: 27 additions & 12 deletions src/Stream/AggregateStreamStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace Prooph\EventStore\Stream;

use Assert\Assertion;
use Prooph\Common\Messaging\Message;
use Iterator;
use Prooph\EventStore\Aggregate\AggregateType;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception;
Expand Down Expand Up @@ -51,13 +51,17 @@ public function __construct(EventStore $eventStore, array $aggregateTypeStreamMa
/**
* @param AggregateType $repositoryAggregateType
* @param string $aggregateId
* @param Message[] $streamEvents
* @param Iterator $streamEvents
* @param object $aggregateRoot
* @throws Exception\InvalidArgumentException
* @return void
*/
public function addEventsForNewAggregateRoot(AggregateType $repositoryAggregateType, $aggregateId, array $streamEvents, $aggregateRoot)
{
public function addEventsForNewAggregateRoot(
AggregateType $repositoryAggregateType,
$aggregateId,
Iterator $streamEvents,
$aggregateRoot
) {
$arType = AggregateType::fromAggregateRoot($aggregateRoot);

if (! $repositoryAggregateType->equals($arType)) {
Expand All @@ -68,23 +72,34 @@ public function addEventsForNewAggregateRoot(AggregateType $repositoryAggregateT
));
}

$this->eventStore->create(new Stream($this->buildStreamName($repositoryAggregateType, $aggregateId), $streamEvents));
$this->eventStore->create(new Stream(
$this->buildStreamName($repositoryAggregateType, $aggregateId),
$streamEvents
));
}

/**
* @param AggregateType $repositoryAggregateType
* @param string $aggregateId
* @param Message[] $streamEvents
* @param Iterator $streamEvents
* @param object $aggregateRoot
* @throws Exception\InvalidArgumentException
* @return void
*/
public function appendEvents(AggregateType $repositoryAggregateType, $aggregateId, array $streamEvents, $aggregateRoot)
{
public function appendEvents(
AggregateType $repositoryAggregateType,
$aggregateId,
Iterator $streamEvents,
$aggregateRoot
) {
$arType = AggregateType::fromAggregateRoot($aggregateRoot);

if (! $repositoryAggregateType->equals($arType)) {
throw new Exception\InvalidArgumentException(sprintf('aggregate root mismatch between repository type %s and object type %s', $repositoryAggregateType->toString(), $arType->toString()));
throw new Exception\InvalidArgumentException(sprintf(
'aggregate root mismatch between repository type %s and object type %s',
$repositoryAggregateType->toString(),
$arType->toString()
));
}

$this->eventStore->appendTo($this->buildStreamName($repositoryAggregateType, $aggregateId), $streamEvents);
Expand All @@ -94,7 +109,7 @@ public function appendEvents(AggregateType $repositoryAggregateType, $aggregateI
* @param AggregateType $aggregateType
* @param string $aggregateId
* @param null|int $minVersion
* @return Message[]
* @return Iterator
*/
public function read(AggregateType $aggregateType, $aggregateId, $minVersion = null)
{
Expand Down Expand Up @@ -122,10 +137,10 @@ protected function buildStreamName(AggregateType $aggregateType, $aggregateId)
* No aggregate type information stored as metadata. The repository aggregate type needs to be used.
*
* @param AggregateType $repositoryAggregateType
* @param Message[] $streamEvents
* @param Iterator $streamEvents
* @return AggregateType
*/
public function getAggregateRootType(AggregateType $repositoryAggregateType, array &$streamEvents)
public function getAggregateRootType(AggregateType $repositoryAggregateType, Iterator $streamEvents)
{
return $repositoryAggregateType;
}
Expand Down
Loading