Skip to content

Commit

Permalink
change EventStore::load method
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Jan 29, 2017
1 parent 7bbcc44 commit 0ca708f
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 150 deletions.
52 changes: 20 additions & 32 deletions src/MySqlEventStore.php
Expand Up @@ -258,7 +258,7 @@ public function load(
int $fromNumber = 1,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream {
): Iterator {
if (null === $count) {
$count = PHP_INT_MAX;
}
Expand Down Expand Up @@ -315,23 +315,17 @@ public function load(
}

if (0 === $statement->rowCount()) {
return new Stream(
$streamName,
new EmptyIterator()
);
return new EmptyIterator();
}

return new Stream(
$streamName,
new PdoStreamIterator(
$this->connection,
$statement,
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
$count,
true
)
return new PdoStreamIterator(
$this->connection,
$statement,
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
$count,
true
);
}

Expand All @@ -340,7 +334,7 @@ public function loadReverse(
int $fromNumber = PHP_INT_MAX,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream {
): Iterator {
if (null === $count) {
$count = PHP_INT_MAX;
}
Expand Down Expand Up @@ -397,23 +391,17 @@ public function loadReverse(
}

if (0 === $statement->rowCount()) {
return new Stream(
$streamName,
new EmptyIterator()
);
return new EmptyIterator();
}

return new Stream(
$streamName,
new PdoStreamIterator(
$this->connection,
$statement,
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
$count,
false
)
return new PdoStreamIterator(
$this->connection,
$statement,
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
$count,
false
);
}

Expand Down
52 changes: 20 additions & 32 deletions src/PostgresEventStore.php
Expand Up @@ -227,7 +227,7 @@ public function load(
int $fromNumber = 1,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream {
): Iterator {
if (null === $count) {
$count = PHP_INT_MAX;
}
Expand Down Expand Up @@ -284,23 +284,17 @@ public function load(
}

if (0 === $statement->rowCount()) {
return new Stream(
$streamName,
new EmptyIterator()
);
return new EmptyIterator();
}

return new Stream(
$streamName,
new PdoStreamIterator(
$this->connection,
$statement,
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
$count,
true
)
return new PdoStreamIterator(
$this->connection,
$statement,
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
$count,
true
);
}

Expand All @@ -309,7 +303,7 @@ public function loadReverse(
int $fromNumber = PHP_INT_MAX,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream {
): Iterator {
if (null === $count) {
$count = PHP_INT_MAX;
}
Expand Down Expand Up @@ -366,23 +360,17 @@ public function loadReverse(
}

if (0 === $statement->rowCount()) {
return new Stream(
$streamName,
new EmptyIterator()
);
return new EmptyIterator();
}

return new Stream(
$streamName,
new PdoStreamIterator(
$this->connection,
$statement,
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
$count,
false
)
return new PdoStreamIterator(
$this->connection,
$statement,
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
$count,
false
);
}

Expand Down
6 changes: 3 additions & 3 deletions src/Projection/PdoEventStoreProjection.php
Expand Up @@ -405,16 +405,16 @@ public function run(bool $keepRunning = true): void

foreach ($this->streamPositions as $streamName => $position) {
try {
$stream = $this->eventStore->load(new StreamName($streamName), $position + 1);
$streamEvents = $this->eventStore->load(new StreamName($streamName), $position + 1);
} catch (Exception\StreamNotFound $e) {
// no newer events found
continue;
}

if ($singleHandler) {
$this->handleStreamWithSingleHandler($streamName, $stream->streamEvents());
$this->handleStreamWithSingleHandler($streamName, $streamEvents);
} else {
$this->handleStreamWithHandlers($streamName, $stream->streamEvents());
$this->handleStreamWithHandlers($streamName, $streamEvents);
}

if ($this->isStopped) {
Expand Down
6 changes: 3 additions & 3 deletions src/Projection/PdoEventStoreQuery.php
Expand Up @@ -292,12 +292,12 @@ public function run(): void
$singleHandler = null !== $this->handler;

foreach ($this->streamPositions as $streamName => $position) {
$stream = $this->eventStore->load(new StreamName($streamName), $position + 1);
$streamEvents = $this->eventStore->load(new StreamName($streamName), $position + 1);

if ($singleHandler) {
$this->handleStreamWithSingleHandler($streamName, $stream->streamEvents());
$this->handleStreamWithSingleHandler($streamName, $streamEvents);
} else {
$this->handleStreamWithHandlers($streamName, $stream->streamEvents());
$this->handleStreamWithHandlers($streamName, $streamEvents);
}

if ($this->isStopped) {
Expand Down
6 changes: 3 additions & 3 deletions src/Projection/PdoEventStoreReadModelProjection.php
Expand Up @@ -383,16 +383,16 @@ public function run(bool $keepRunning = true): void

foreach ($this->streamPositions as $streamName => $position) {
try {
$stream = $this->eventStore->load(new StreamName($streamName), $position + 1);
$streamEvents = $this->eventStore->load(new StreamName($streamName), $position + 1);
} catch (Exception\StreamNotFound $e) {
// no newer events found
continue;
}

if ($singleHandler) {
$this->handleStreamWithSingleHandler($streamName, $stream->streamEvents());
$this->handleStreamWithSingleHandler($streamName, $streamEvents);
} else {
$this->handleStreamWithHandlers($streamName, $stream->streamEvents());
$this->handleStreamWithHandlers($streamName, $streamEvents);
}

if ($this->isStopped) {
Expand Down

0 comments on commit 0ca708f

Please sign in to comment.