Skip to content

Commit

Permalink
Merge c20162a into 9ab667c
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Oct 7, 2016
2 parents 9ab667c + c20162a commit 935c74b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
24 changes: 23 additions & 1 deletion src/DoctrineStreamIterator.php
Expand Up @@ -24,6 +24,8 @@
*/
final class DoctrineStreamIterator implements Iterator
{
const BATCH_SIZE = 100;

/**
* @var QueryBuilder
*/
Expand Down Expand Up @@ -64,6 +66,11 @@ final class DoctrineStreamIterator implements Iterator
*/
private $currentKey;

/**
* @var int
*/
private $batchPosition = 0;

/**
* @param QueryBuilder $queryBuilder
* @param MessageFactory $messageFactory
Expand Down Expand Up @@ -129,7 +136,18 @@ public function next()
if (false !== $this->currentItem) {
$this->currentKey++;
} else {
$this->currentKey = -1;
$this->batchPosition++;
$this->queryBuilder->setFirstResult(self::BATCH_SIZE * $this->batchPosition);
$this->queryBuilder->setMaxResults(self::BATCH_SIZE);
/* @var $stmt \Doctrine\DBAL\Statement */
$this->statement = $this->queryBuilder->execute();
$this->statement->setFetchMode(\PDO::FETCH_ASSOC);

$this->currentItem = $this->statement->fetch();

if (false === $this->currentItem) {
$this->currentKey = -1;
}
}
}

Expand Down Expand Up @@ -160,6 +178,10 @@ public function rewind()
{
//Only perform rewind if current item is not the first element
if ($this->currentKey !== 0) {
$this->batchPosition = 0;
$this->queryBuilder->setFirstResult(0);
$this->queryBuilder->setMaxResults(self::BATCH_SIZE);

/* @var $stmt \Doctrine\DBAL\Statement */
$stmt = $this->queryBuilder->execute();
$stmt->setFetchMode(\PDO::FETCH_ASSOC);
Expand Down
31 changes: 31 additions & 0 deletions tests/DoctrineEventStoreAdapterTest.php
Expand Up @@ -433,6 +433,37 @@ public function it_fails_to_write_with_duplicate_aggregate_id_and_version()
$this->adapter->appendTo(new StreamName('Prooph\Model\User'), new \ArrayIterator([$streamEvent]));
}

/**
* @test
*/
public function it_replays_larger_streams_in_chunks()
{
$streamName = new StreamName('Prooph\Model\User');

$streamEvents = [];

for ($i = 1; $i <= 150; $i++) {
$streamEvents[] = UserCreated::with(
['name' => 'Max Mustermann ' . $i, 'email' => 'contact_' . $i . '@prooph.de'],
$i
);
}

$this->adapter->create(new Stream($streamName, new \ArrayIterator($streamEvents)));

$replay = $this->adapter->replay($streamName);

$count = 0;
foreach ($replay as $event) {
$count++;
$this->assertEquals('Max Mustermann ' . $count, $event->payload()['name']);
$this->assertEquals('contact_' . $count . '@prooph.de', $event->payload()['email']);
$this->assertEquals($count, $event->version());
}

$this->assertEquals(150, $count);
}

/**
* @return Stream
*/
Expand Down

0 comments on commit 935c74b

Please sign in to comment.