From 423132b635de039ad258b90452a4677c7232d74e Mon Sep 17 00:00:00 2001 From: prolic Date: Sat, 8 Oct 2016 06:39:56 +0800 Subject: [PATCH 1/2] replay in 100-chunks resolves: https://github.com/prooph/event-store-doctrine-adapter/issues/35 --- src/DoctrineStreamIterator.php | 24 ++++++++++++++++++- tests/DoctrineEventStoreAdapterTest.php | 31 +++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/DoctrineStreamIterator.php b/src/DoctrineStreamIterator.php index 31a4b1c..3415346 100644 --- a/src/DoctrineStreamIterator.php +++ b/src/DoctrineStreamIterator.php @@ -24,6 +24,8 @@ */ final class DoctrineStreamIterator implements Iterator { + private const BATCH_SIZE = 100; + /** * @var QueryBuilder */ @@ -64,6 +66,11 @@ final class DoctrineStreamIterator implements Iterator */ private $currentKey; + /** + * @var int + */ + private $batchPosition = 0; + /** * @param QueryBuilder $queryBuilder * @param MessageFactory $messageFactory @@ -129,7 +136,18 @@ public function next() if (false !== $this->currentItem) { $this->currentKey++; } else { - $this->currentKey = -1; + $this->batchPosition++; + $this->queryBuilder->setFirstResult(self::BATCH_SIZE * $this->batchPosition); + $this->queryBuilder->setMaxResults(self::BATCH_SIZE); + /* @var $stmt \Doctrine\DBAL\Statement */ + $this->statement = $this->queryBuilder->execute(); + $this->statement->setFetchMode(\PDO::FETCH_ASSOC); + + $this->currentItem = $this->statement->fetch(); + + if (false === $this->currentItem) { + $this->currentKey = -1; + } } } @@ -160,6 +178,10 @@ public function rewind() { //Only perform rewind if current item is not the first element if ($this->currentKey !== 0) { + $this->batchPosition = 0; + $this->queryBuilder->setFirstResult(0); + $this->queryBuilder->setMaxResults(self::BATCH_SIZE); + /* @var $stmt \Doctrine\DBAL\Statement */ $stmt = $this->queryBuilder->execute(); $stmt->setFetchMode(\PDO::FETCH_ASSOC); diff --git a/tests/DoctrineEventStoreAdapterTest.php b/tests/DoctrineEventStoreAdapterTest.php index 8c191a1..fa6d2a4 100644 --- a/tests/DoctrineEventStoreAdapterTest.php +++ b/tests/DoctrineEventStoreAdapterTest.php @@ -433,6 +433,37 @@ public function it_fails_to_write_with_duplicate_aggregate_id_and_version() $this->adapter->appendTo(new StreamName('Prooph\Model\User'), new \ArrayIterator([$streamEvent])); } + /** + * @test + */ + public function it_replays_larger_streams_in_chunks() + { + $streamName = new StreamName('Prooph\Model\User'); + + $streamEvents = []; + + for ($i = 1; $i <= 150; $i++) { + $streamEvents[] = UserCreated::with( + ['name' => 'Max Mustermann ' . $i, 'email' => 'contact_' . $i . '@prooph.de'], + $i + ); + } + + $this->adapter->create(new Stream($streamName, new \ArrayIterator($streamEvents))); + + $replay = $this->adapter->replay($streamName); + + $count = 0; + foreach ($replay as $event) { + $count++; + $this->assertEquals('Max Mustermann ' . $count, $event->payload()['name']); + $this->assertEquals('contact_' . $count . '@prooph.de', $event->payload()['email']); + $this->assertEquals($count, $event->version()); + } + + $this->assertEquals(150, $count); + } + /** * @return Stream */ From c20162a62e2b4187a7159bdc0ba1ee7db609ae5c Mon Sep 17 00:00:00 2001 From: prolic Date: Sat, 8 Oct 2016 06:46:59 +0800 Subject: [PATCH 2/2] fix DoctrineStreamIterator --- src/DoctrineStreamIterator.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DoctrineStreamIterator.php b/src/DoctrineStreamIterator.php index 3415346..7d15e7a 100644 --- a/src/DoctrineStreamIterator.php +++ b/src/DoctrineStreamIterator.php @@ -24,7 +24,7 @@ */ final class DoctrineStreamIterator implements Iterator { - private const BATCH_SIZE = 100; + const BATCH_SIZE = 100; /** * @var QueryBuilder