diff --git a/composer.json b/composer.json index 0d5ef08..c946ca6 100644 --- a/composer.json +++ b/composer.json @@ -24,7 +24,7 @@ "php": "^7.1", "ext-mongodb": "^1.5.2", "mongodb/mongodb": "^1.4.2", - "prooph/event-store": "^7.3.6" + "prooph/event-store": "^7.4.0" }, "require-dev": { "sandrokeil/interop-config": "^2.0.1", diff --git a/env/docker/mongo/Dockerfile b/env/docker/mongo/Dockerfile index 35958af..dd60222 100644 --- a/env/docker/mongo/Dockerfile +++ b/env/docker/mongo/Dockerfile @@ -1,4 +1,4 @@ -FROM mongo:4.0 +FROM mongo:4.0.6 COPY mongod.conf /etc COPY init.sh / diff --git a/src/Container/MongoDbProjectionManagerFactory.php b/src/Container/MongoDbProjectionManagerFactory.php index 37a3e96..f954fb6 100644 --- a/src/Container/MongoDbProjectionManagerFactory.php +++ b/src/Container/MongoDbProjectionManagerFactory.php @@ -16,6 +16,7 @@ use Interop\Config\ProvidesDefaultOptions; use Interop\Config\RequiresConfigId; use Interop\Config\RequiresMandatoryOptions; +use Prooph\Common\Messaging\FQCNMessageFactory; use Prooph\EventStore\EventStore; use Prooph\EventStore\MongoDb\Exception\InvalidArgumentException; use Prooph\EventStore\MongoDb\Projection\MongoDbProjectionManager; @@ -70,6 +71,8 @@ public function __invoke(ContainerInterface $container): ProjectionManager return new MongoDbProjectionManager( $container->get($config['event_store']), $container->get($config['client']), + $container->get($config['persistence_strategy']), + $container->get($config['message_factory']), $config['database'], $config['event_streams_table'], $config['projections_table'] @@ -83,7 +86,7 @@ public function dimensions(): iterable public function mandatoryOptions(): iterable { - return ['client', 'database']; + return ['client', 'database', 'persistence_strategy']; } public function defaultOptions(): iterable @@ -92,6 +95,7 @@ public function defaultOptions(): iterable 'event_store' => EventStore::class, 'event_streams_table' => 'event_streams', 'projections_table' => 'projections', + 'message_factory' => FQCNMessageFactory::class, ]; } } diff --git a/src/Exception/ConcurrencyExceptionFactory.php b/src/Exception/ConcurrencyExceptionFactory.php new file mode 100644 index 0000000..47be545 --- /dev/null +++ b/src/Exception/ConcurrencyExceptionFactory.php @@ -0,0 +1,30 @@ + + * (c) 2016-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\EventStore\MongoDb\Exception; + +use Prooph\EventStore\Exception\ConcurrencyException; +use MongoDB\Driver\Exception\Exception as MongoDbException; + +class ConcurrencyExceptionFactory +{ + public static function fromMongoDbException(MongoDbException $exception): ConcurrencyException + { + return new ConcurrencyException( + \sprintf( + "Some of the aggregate IDs or event IDs have already been used in the same stream. The MongoDB error should contain more information:\nError %s.\nWrite-Errors: %s", + $exception->getMessage(), + var_export($exception->getWriteResult()->getWriteErrors(), true) + ) + ); + } +} \ No newline at end of file diff --git a/src/MongoDbEventStore.php b/src/MongoDbEventStore.php index 61d309c..fae4bc6 100644 --- a/src/MongoDbEventStore.php +++ b/src/MongoDbEventStore.php @@ -25,10 +25,12 @@ use Prooph\EventStore\Metadata\FieldType; use Prooph\EventStore\Metadata\MetadataMatcher; use Prooph\EventStore\Metadata\Operator; +use Prooph\EventStore\MongoDb\Exception\ConcurrencyExceptionFactory; use Prooph\EventStore\MongoDb\Exception\ExtensionNotLoaded; use Prooph\EventStore\MongoDb\Exception\RuntimeException; use Prooph\EventStore\MongoDb\PersistenceStrategy\MongoDbAggregateStreamStrategy; use Prooph\EventStore\Stream; +use Prooph\EventStore\StreamIterator\EmptyStreamIterator; use Prooph\EventStore\StreamName; use Prooph\EventStore\TransactionalEventStore; use Prooph\EventStore\Util\Assertion; @@ -240,6 +242,13 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void try { $this->collection($tableName)->insertMany($data, $options); } catch (MongoDbException $exception) { + $code = isset($exception->getWriteResult()->getWriteErrors()[0]) ? + $exception->getWriteResult()->getWriteErrors()[0]->getCode() + : $exception->getCode(); + + if (\in_array($code, [11000, 11001, 12582], true)) { + throw ConcurrencyExceptionFactory::fromMongoDbException($exception); + } throw RuntimeException::fromMongoDbException($exception); } } @@ -286,9 +295,16 @@ public function load( 'session' => $this->session, ]; } + $counted = $collection->countDocuments( + ['$and' => $where], + $options + ); - return new MongoDbStreamIterator( - function (array $filter = [], array $innerOptions = []) use ($collection, $options, $where) { + if (0 === (null === $count ? $counted : \min($counted, $count))) { + return new EmptyStreamIterator(); + } + $callable = function (string $method) use ($collection, $options, $where): callable { + return function (array $filter = [], array $innerOptions = []) use ($collection, $options, $where, $method) { $innerOptions = \array_replace_recursive( $options, $innerOptions @@ -299,11 +315,16 @@ function (array $filter = [], array $innerOptions = []) use ($collection, $optio } }); - return $collection->find( + return $collection->{$method}( ['$and' => $where], $innerOptions ); - }, + }; + }; + + return new MongoDbStreamIterator( + $callable('find'), + $callable('countDocuments'), $this->messageFactory, $this->loadBatchSize, $fromNumber, @@ -358,8 +379,22 @@ public function loadReverse( ]; } - return new MongoDbStreamIterator( - function (array $filter = [], array $innerOptions = []) use ($collection, $options, $where) { + $counted = $collection->countDocuments( + ['$and' => $where], + $options + ); + + if (0 === (null === $count ? $counted : \min($counted, $count))) { + return new EmptyStreamIterator(); + } + + $callable = function (string $method) use ($collection, $options, $where): callable { + return function (array $filter = [], array $innerOptions = []) use ( + $collection, + $options, + $where, + $method + ) { $innerOptions = \array_replace_recursive( $options, $innerOptions @@ -370,11 +405,16 @@ function (array $filter = [], array $innerOptions = []) use ($collection, $optio } }); - return $collection->find( + return $collection->{$method}( ['$and' => $where], $innerOptions ); - }, + }; + }; + + return new MongoDbStreamIterator( + $callable('find'), + $callable('countDocuments'), $this->messageFactory, $this->loadBatchSize, $fromNumber, @@ -422,10 +462,30 @@ public function commit(): void throw new TransactionNotStarted(); } - $this->session->commitTransaction(); - $this->session->endSession(); - $this->session = null; - $this->createdCollections = []; + $retries = 0; + + do { + $retries++; + try { + $this->session->commitTransaction(); + + $this->session = null; + $this->createdCollections = []; + break; + } catch (\MongoDB\Driver\Exception\CommandException $exception) { + $resultDoc = $exception->getResultDocument(); + + if (isset($resultDoc->errorLabels) + && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels, true) + ) { + continue; + } + // we use concurrency exception here, so the message will be dispatched again via ConcurrencyMessageDispatcher + throw ConcurrencyExceptionFactory::fromMongoDbException($exception); + } catch (\MongoDB\Driver\Exception\Exception $exception) { + throw ConcurrencyExceptionFactory::fromMongoDbException($exception); + } + } while($retries <= 3); } public function rollback(): void @@ -437,14 +497,18 @@ public function rollback(): void if (null === $this->session) { throw new TransactionNotStarted(); } - $this->session->abortTransaction(); - $this->session->endSession(); + + try { + $this->session->abortTransaction(); + } catch (\MongoDB\Driver\Exception\RuntimeException $error) { + // it's ok, we can't do anything here + } finally { + $this->session = null; + } foreach ($this->createdCollections as $collectionName) { $this->collection($collectionName)->drop(); } - - $this->session = null; $this->createdCollections = []; } @@ -493,6 +557,10 @@ public function fetchStreamNamesRegex( int $limit = 20, int $offset = 0 ): array { + if (empty($filter) || false === @\preg_match("/$filter/", '')) { + throw new Exception\InvalidArgumentException('Invalid regex pattern given'); + } + $where = $this->createWhereClause($metadataMatcher); $where[]['real_stream_name'] = ['$regex' => $filter]; @@ -556,6 +624,10 @@ public function fetchCategoryNames(?string $filter, int $limit = 20, int $offset public function fetchCategoryNamesRegex(string $filter, int $limit = 20, int $offset = 0): array { + if (empty($filter) || false === @\preg_match("/$filter/", '')) { + throw new Exception\InvalidArgumentException('Invalid regex pattern given'); + } + $where['category'] = ['$regex' => $filter]; try { diff --git a/src/MongoDbStreamIterator.php b/src/MongoDbStreamIterator.php index 37b4621..9f76a8e 100644 --- a/src/MongoDbStreamIterator.php +++ b/src/MongoDbStreamIterator.php @@ -15,20 +15,25 @@ use DateTimeImmutable; use DateTimeZone; use Generator; -use Iterator; use MongoDB\Driver\Cursor; use MongoDB\Exception\Exception as MongoDbException; use Prooph\Common\Messaging\Message; use Prooph\Common\Messaging\MessageFactory; use Prooph\EventStore\MongoDb\Exception\RuntimeException; +use Prooph\EventStore\StreamIterator\StreamIterator; -final class MongoDbStreamIterator implements Iterator +final class MongoDbStreamIterator implements StreamIterator { /** * @var callable */ private $query; + /** + * @var callable + */ + private $countQuery; + /** * @var MessageFactory */ @@ -81,6 +86,7 @@ final class MongoDbStreamIterator implements Iterator public function __construct( callable $query, + callable $countQuery, MessageFactory $messageFactory, int $batchSize, int $fromNumber, @@ -88,6 +94,7 @@ public function __construct( bool $forward ) { $this->query = $query; + $this->countQuery = $countQuery; $this->messageFactory = $messageFactory; $this->batchSize = $batchSize; $this->fromNumber = $fromNumber; @@ -98,6 +105,22 @@ public function __construct( $this->next(); } + public function count() + { + [$filter, $options] = $this->getFilterAndOptions($this->fromNumber); + + $callable = $this->countQuery; + try { + $count = (int) $callable($filter, $options); + + return null === $this->count ? $count : \min($count, $this->count); + } catch (MongoDbException $exception) { + // ignore + } + + return 0; + } + /** * @return null|Message */ @@ -225,29 +248,36 @@ private function ensureCursor(array $filter = [], array $options = []): void } private function buildStatement(int $fromNumber): void + { + [$filter, $options] = $this->getFilterAndOptions($fromNumber); + + $this->ensureCursor($filter, $options); + } + + private function yieldCursor(Cursor $cursor): Generator + { + foreach ($cursor as $item) { + yield $item; + } + } + + private function getFilterAndOptions(int $fromNumber): array { if (null === $this->count || $this->count < ($this->batchSize * ($this->batchPosition + 1)) ) { $limit = $this->batchSize; } else { - $limit = $this->count - ($this->batchSize * $this->batchPosition); + $limit = $this->count - ($this->batchSize * ($this->batchPosition + 1)); } - $this->ensureCursor( + return [ [ '_id' => [$this->forward ? '$gte' : '$lte' => $fromNumber], ], [ 'limit' => $limit, - ] - ); - } - - private function yieldCursor(Cursor $cursor): Generator - { - foreach ($cursor as $item) { - yield $item; - } + ], + ]; } } diff --git a/src/Projection/MongoDbEventStoreProjector.php b/src/Projection/MongoDbEventStoreProjector.php index 4a29370..33fc90b 100644 --- a/src/Projection/MongoDbEventStoreProjector.php +++ b/src/Projection/MongoDbEventStoreProjector.php @@ -17,10 +17,10 @@ use DateTimeImmutable; use DateTimeZone; use EmptyIterator; -use Iterator; use MongoDB\Client; use MongoDB\Driver\Exception\Exception as MongoDbException; use Prooph\Common\Messaging\Message; +use Prooph\Common\Messaging\MessageFactory; use Prooph\EventStore\EventStore; use Prooph\EventStore\EventStoreDecorator; use Prooph\EventStore\Exception; @@ -29,6 +29,7 @@ use Prooph\EventStore\MongoDb\Exception\RuntimeException; use Prooph\EventStore\MongoDb\MongoDbHelper; use Prooph\EventStore\MongoDb\MongoEventStore; +use Prooph\EventStore\MongoDb\PersistenceStrategy; use Prooph\EventStore\Projection\ProjectionStatus; use Prooph\EventStore\Projection\Projector; use Prooph\EventStore\Stream; @@ -38,6 +39,7 @@ final class MongoDbEventStoreProjector implements Projector { use MongoDbHelper; + use ProcessEvents; /** * @var EventStore @@ -159,9 +161,21 @@ final class MongoDbEventStoreProjector implements Projector */ private $lastLockUpdate; + /** + * @var MessageFactory + */ + private $messageFactory; + + /** + * @var PersistenceStrategy + */ + private $persistenceStrategy; + public function __construct( EventStore $eventStore, Client $client, + PersistenceStrategy $persistenceStrategy, + MessageFactory $messageFactory, string $database, string $name, string $eventStreamsTable, @@ -179,6 +193,8 @@ public function __construct( $this->eventStore = $eventStore; $this->client = $client; + $this->persistenceStrategy = $persistenceStrategy; + $this->messageFactory = $messageFactory; $this->database = $database; $this->name = $name; $this->eventStreamsTable = $eventStreamsTable; @@ -490,58 +506,7 @@ public function run(bool $keepRunning = true): void $this->isStopped = false; try { - do { - foreach ($this->streamPositions as $streamName => $position) { - try { - $streamEvents = $this->eventStore->load(new StreamName($streamName), $position + 1); - } catch (Exception\StreamNotFound $e) { - // ignore - continue; - } - - if ($singleHandler) { - $this->handleStreamWithSingleHandler($streamName, $streamEvents); - } else { - $this->handleStreamWithHandlers($streamName, $streamEvents); - } - - if ($this->isStopped) { - break; - } - } - - if (0 === $this->eventCounter) { - \usleep($this->sleep); - $this->updateLock(); - } else { - $this->persist(); - } - - $this->eventCounter = 0; - - if ($this->triggerPcntlSignalDispatch) { - \pcntl_signal_dispatch(); - } - - switch ($this->fetchRemoteStatus()) { - case ProjectionStatus::STOPPING(): - $this->stop(); - break; - case ProjectionStatus::DELETING(): - $this->delete(false); - break; - case ProjectionStatus::DELETING_INCL_EMITTED_EVENTS(): - $this->delete(true); - break; - case ProjectionStatus::RESETTING(): - $this->reset(); - break; - default: - break; - } - - $this->prepareStreamPositions(); - } while ($keepRunning && ! $this->isStopped); + $this->processEvents($keepRunning, $singleHandler); } finally { $this->releaseLock(); } @@ -571,71 +536,6 @@ private function fetchRemoteStatus(): ProjectionStatus return ProjectionStatus::byValue($result['status']); } - private function handleStreamWithSingleHandler(string $streamName, Iterator $events): void - { - $this->currentStreamName = $streamName; - $handler = $this->handler; - - foreach ($events as $key => $event) { - if ($this->triggerPcntlSignalDispatch) { - \pcntl_signal_dispatch(); - } - /* @var Message $event */ - $this->streamPositions[$streamName] = $key; - $this->eventCounter++; - - $result = $handler($this->state, $event); - - if (\is_array($result)) { - $this->state = $result; - } - - if ($this->eventCounter === $this->persistBlockSize) { - $this->persist(); - $this->eventCounter = 0; - } - - if ($this->isStopped) { - break; - } - } - } - - private function handleStreamWithHandlers(string $streamName, Iterator $events): void - { - $this->currentStreamName = $streamName; - - foreach ($events as $key => $event) { - if ($this->triggerPcntlSignalDispatch) { - \pcntl_signal_dispatch(); - } - /* @var Message $event */ - $this->streamPositions[$streamName] = $key; - - if (! isset($this->handlers[$event->messageName()])) { - continue; - } - - $this->eventCounter++; - - $handler = $this->handlers[$event->messageName()]; - $result = $handler($this->state, $event); - - if (\is_array($result)) { - $this->state = $result; - } - - if ($this->eventCounter === $this->persistBlockSize) { - $this->persist(); - $this->eventCounter = 0; - } - - if ($this->isStopped) { - break; - } - } - } - private function createHandlerContext(?string &$streamName) { return new class($this, $streamName) { diff --git a/src/Projection/MongoDbEventStoreReadModelProjector.php b/src/Projection/MongoDbEventStoreReadModelProjector.php index dd5f9af..e233158 100644 --- a/src/Projection/MongoDbEventStoreReadModelProjector.php +++ b/src/Projection/MongoDbEventStoreReadModelProjector.php @@ -15,10 +15,9 @@ use Closure; use DateTimeImmutable; use DateTimeZone; -use Iterator; use MongoDB\Client; use MongoDB\Driver\Exception\Exception as MongoDbException; -use Prooph\Common\Messaging\Message; +use Prooph\Common\Messaging\MessageFactory; use Prooph\EventStore\EventStore; use Prooph\EventStore\EventStoreDecorator; use Prooph\EventStore\Exception; @@ -27,6 +26,7 @@ use Prooph\EventStore\MongoDb\Exception\RuntimeException; use Prooph\EventStore\MongoDb\MongoDbHelper; use Prooph\EventStore\MongoDb\MongoEventStore; +use Prooph\EventStore\MongoDb\PersistenceStrategy; use Prooph\EventStore\Projection\ProjectionStatus; use Prooph\EventStore\Projection\ReadModel; use Prooph\EventStore\Projection\ReadModelProjector; @@ -35,6 +35,7 @@ final class MongoDbEventStoreReadModelProjector implements ReadModelProjector { use MongoDbHelper; + use ProcessEvents; /** * @var EventStore @@ -151,9 +152,21 @@ final class MongoDbEventStoreReadModelProjector implements ReadModelProjector */ private $lastLockUpdate; + /** + * @var MessageFactory + */ + private $messageFactory; + + /** + * @var PersistenceStrategy + */ + private $persistenceStrategy; + public function __construct( EventStore $eventStore, Client $client, + PersistenceStrategy $persistenceStrategy, + MessageFactory $messageFactory, string $database, string $name, ReadModel $readModel, @@ -171,6 +184,8 @@ public function __construct( $this->eventStore = $eventStore; $this->client = $client; + $this->persistenceStrategy = $persistenceStrategy; + $this->messageFactory = $messageFactory; $this->database = $database; $this->name = $name; $this->readModel = $readModel; @@ -455,58 +470,7 @@ public function run(bool $keepRunning = true): void $this->isStopped = false; try { - do { - foreach ($this->streamPositions as $streamName => $position) { - try { - $streamEvents = $this->eventStore->load(new StreamName($streamName), $position + 1); - } catch (Exception\StreamNotFound $e) { - // ignore - continue; - } - - if ($singleHandler) { - $this->handleStreamWithSingleHandler($streamName, $streamEvents); - } else { - $this->handleStreamWithHandlers($streamName, $streamEvents); - } - - if ($this->isStopped) { - break; - } - } - - if (0 === $this->eventCounter) { - \usleep($this->sleep); - $this->updateLock(); - } else { - $this->persist(); - } - - $this->eventCounter = 0; - - if ($this->triggerPcntlSignalDispatch) { - \pcntl_signal_dispatch(); - } - - switch ($this->fetchRemoteStatus()) { - case ProjectionStatus::STOPPING(): - $this->stop(); - break; - case ProjectionStatus::DELETING(): - $this->delete(false); - break; - case ProjectionStatus::DELETING_INCL_EMITTED_EVENTS(): - $this->delete(true); - break; - case ProjectionStatus::RESETTING(): - $this->reset(); - break; - default: - break; - } - - $this->prepareStreamPositions(); - } while ($keepRunning && ! $this->isStopped); + $this->processEvents($keepRunning, $singleHandler); } finally { $this->releaseLock(); } @@ -536,71 +500,6 @@ private function fetchRemoteStatus(): ProjectionStatus return ProjectionStatus::byValue($result['status']); } - private function handleStreamWithSingleHandler(string $streamName, Iterator $events): void - { - $this->currentStreamName = $streamName; - $handler = $this->handler; - - foreach ($events as $key => $event) { - if ($this->triggerPcntlSignalDispatch) { - \pcntl_signal_dispatch(); - } - /* @var Message $event */ - $this->streamPositions[$streamName] = $key; - $this->eventCounter++; - - $result = $handler($this->state, $event); - - if (\is_array($result)) { - $this->state = $result; - } - - if ($this->eventCounter === $this->persistBlockSize) { - $this->persist(); - $this->eventCounter = 0; - } - - if ($this->isStopped) { - break; - } - } - } - - private function handleStreamWithHandlers(string $streamName, Iterator $events): void - { - $this->currentStreamName = $streamName; - - foreach ($events as $key => $event) { - if ($this->triggerPcntlSignalDispatch) { - \pcntl_signal_dispatch(); - } - /* @var Message $event */ - $this->streamPositions[$streamName] = $key; - - if (! isset($this->handlers[$event->messageName()])) { - continue; - } - - $this->eventCounter++; - - $handler = $this->handlers[$event->messageName()]; - $result = $handler($this->state, $event); - - if (\is_array($result)) { - $this->state = $result; - } - - if ($this->eventCounter === $this->persistBlockSize) { - $this->persist(); - $this->eventCounter = 0; - } - - if ($this->isStopped) { - break; - } - } - } - private function createHandlerContext(?string &$streamName) { return new class($this, $streamName) { diff --git a/src/Projection/MongoDbProjectionManager.php b/src/Projection/MongoDbProjectionManager.php index 5b580e4..440b012 100644 --- a/src/Projection/MongoDbProjectionManager.php +++ b/src/Projection/MongoDbProjectionManager.php @@ -14,6 +14,7 @@ use MongoDB\Client; use MongoDB\Driver\Exception\Exception as MongoDbException; +use Prooph\Common\Messaging\MessageFactory; use Prooph\EventStore\EventStore; use Prooph\EventStore\EventStoreDecorator; use Prooph\EventStore\Exception\OutOfRangeException; @@ -21,6 +22,7 @@ use Prooph\EventStore\MongoDb\Exception; use Prooph\EventStore\MongoDb\MongoDbHelper; use Prooph\EventStore\MongoDb\MongoEventStore; +use Prooph\EventStore\MongoDb\PersistenceStrategy; use Prooph\EventStore\Projection\ProjectionManager; use Prooph\EventStore\Projection\ProjectionStatus; use Prooph\EventStore\Projection\Projector; @@ -57,15 +59,29 @@ final class MongoDbProjectionManager implements ProjectionManager */ private $projectionsTable; + /** + * @var MessageFactory + */ + private $messageFactory; + + /** + * @var PersistenceStrategy + */ + private $persistenceStrategy; + public function __construct( EventStore $eventStore, Client $client, + PersistenceStrategy $persistenceStrategy, + MessageFactory $messageFactory, string $database, string $eventStreamsTable = 'event_streams', string $projectionsTable = 'projections' ) { $this->eventStore = $eventStore; $this->client = $client; + $this->persistenceStrategy = $persistenceStrategy; + $this->messageFactory = $messageFactory; $this->database = $database; $this->eventStreamsTable = $eventStreamsTable; $this->projectionsTable = $projectionsTable; @@ -97,6 +113,8 @@ public function createProjection( return new MongoDbEventStoreProjector( $this->eventStore, $this->client, + $this->persistenceStrategy, + $this->messageFactory, $this->database, $name, $this->eventStreamsTable, @@ -118,6 +136,8 @@ public function createReadModelProjection( return new MongoDbEventStoreReadModelProjector( $this->eventStore, $this->client, + $this->persistenceStrategy, + $this->messageFactory, $this->database, $name, $readModel, @@ -262,6 +282,10 @@ public function fetchProjectionNamesRegex(string $filter, int $limit = 20, int $ ); } + if (empty($filter) || false === @\preg_match("/$filter/", '')) { + throw new Exception\InvalidArgumentException('Invalid regex pattern given'); + } + try { if (! $this->checkCollectionExists($this->projectionsTable)) { throw Exception\CollectionNotSetupException::with($this->projectionsTable); diff --git a/src/Projection/ProcessEvents.php b/src/Projection/ProcessEvents.php new file mode 100644 index 0000000..27aa5d5 --- /dev/null +++ b/src/Projection/ProcessEvents.php @@ -0,0 +1,321 @@ + + * (c) 2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\EventStore\MongoDb\Projection; + +use Iterator; +use Prooph\Common\Messaging\Message; +use Prooph\EventStore\Exception; +use Prooph\EventStore\Projection\ProjectionStatus; +use Prooph\EventStore\StreamName; + +trait ProcessEvents +{ + /** + * Gap history until processed + * + * @var array + */ + private $gaps = []; + + private function processEvents(bool $keepRunning, bool $singleHandler): void + { + do { + $collectionNames = []; + $streamTimestampsStart = []; + + foreach ($this->streamPositions as $streamName => $position) { + $collectionNames[$this->persistenceStrategy->generateCollectionName(new StreamName($streamName))] = $streamName; + } + // initialize stream to get new events during processing current events + $changeStream = $this->client->selectDatabase($this->database)->watch( + [ + [ + '$match' => [ + 'ns.coll' => ['$in' => \array_keys($collectionNames)], + 'operationType' => 'insert', + ], + ], + ], + [ + 'batchSize' => 1000, + ] + ); + + foreach ($this->streamPositions as $streamName => $position) { + try { + $streamTimestampsStart[$streamName] = \time(); + $streamEvents = $this->eventStore->load(new StreamName($streamName), $position + 1); + } catch (Exception\StreamNotFound $e) { + // ignore + continue; + } + + if ($singleHandler) { + $this->handleStreamWithSingleHandler($streamName, $streamEvents); + } else { + $this->handleStreamWithHandlers($streamName, $streamEvents); + } + + if ($this->isStopped) { + break; + } + } + + if (0 === $this->eventCounter) { + \usleep($this->sleep); + $this->updateLock(); + } else { + $this->persist(); + } + + $this->eventCounter = 0; + + if ($this->triggerPcntlSignalDispatch) { + \pcntl_signal_dispatch(); + } + + switch ($this->fetchRemoteStatus()) { + case ProjectionStatus::STOPPING(): + $this->stop(); + break; + case ProjectionStatus::DELETING(): + $this->delete(false); + break; + case ProjectionStatus::DELETING_INCL_EMITTED_EVENTS(): + $this->delete(true); + break; + case ProjectionStatus::RESETTING(): + $this->reset(); + break; + default: + break; + } + + if (! $this->isStopped && $this->status === ProjectionStatus::RUNNING()) { + for ($changeStream->rewind(); true; $changeStream->next()) { + if (! $changeStream->valid()) { + \usleep($this->sleep); + $this->updateLock(); + if ($this->triggerPcntlSignalDispatch) { + \pcntl_signal_dispatch(); + } + switch ($this->fetchRemoteStatus()) { + case ProjectionStatus::STOPPING(): + $this->stop(); + break; + case ProjectionStatus::DELETING(): + $this->delete(false); + break; + case ProjectionStatus::DELETING_INCL_EMITTED_EVENTS(): + $this->delete(true); + break; + case ProjectionStatus::RESETTING(): + $this->reset(); + break; + default: + break; + } + + if ($this->isStopped || ! $keepRunning) { + break; + } + continue; + } + + $event = $changeStream->current(); + + if ($event['operationType'] === 'invalidate') { + break; + } + + $streamName = $collectionNames[$event['ns']['coll']]; + $eventTimestamp = $event['clusterTime']->getTimestamp(); + + // event already processed + if ($eventTimestamp <= $streamTimestampsStart[$streamName] + && $event['fullDocument']['_id'] <= $this->streamPositions[$streamName] + && ! isset($this->gaps[$event['fullDocument']['_id']]) + ) { + $this->updateLock(); + continue; + } + + $streamEvents = new \ArrayIterator([$event['fullDocument']['_id'] => $this->createMessage($event['fullDocument'])]); + + if ($singleHandler) { + $this->handleStreamWithSingleHandler($streamName, $streamEvents); + } else { + $this->handleStreamWithHandlers($streamName, $streamEvents); + } + $this->persist(); + + if ($this->triggerPcntlSignalDispatch) { + \pcntl_signal_dispatch(); + } + + switch ($this->fetchRemoteStatus()) { + case ProjectionStatus::STOPPING(): + $this->stop(); + break; + case ProjectionStatus::DELETING(): + $this->delete(false); + break; + case ProjectionStatus::DELETING_INCL_EMITTED_EVENTS(): + $this->delete(true); + break; + case ProjectionStatus::RESETTING(): + $this->reset(); + break; + default: + break; + } + + if ($this->isStopped) { + break; + } + } + } + + $this->prepareStreamPositions(); + } while ($keepRunning && ! $this->isStopped); + } + + private function addGap(int $from, int $to): void + { + if ($from >= $to) { + return; + } + + for ($i = $from; $i < $to; $i++) { + $this->gaps[$i] = true; + } + } + + private function handleStreamWithSingleHandler(string $streamName, Iterator $events): void + { + $this->currentStreamName = $streamName; + $handler = $this->handler; + + foreach ($events as $key => $event) { + if ($this->triggerPcntlSignalDispatch) { + \pcntl_signal_dispatch(); + } + if ($this->streamPositions[$streamName] + 1 !== $key) { + $this->addGap($this->streamPositions[$streamName] + 1, $key); + } + unset($this->gaps[$key]); + // stream position should not be in the past + if ($key > $this->streamPositions[$streamName]) { + $this->streamPositions[$streamName] = $key; + } + $this->eventCounter++; + + /* @var Message $event */ + $result = $handler($this->state, $event); + + if (\is_array($result)) { + $this->state = $result; + } + + if ($this->eventCounter === $this->persistBlockSize) { + $this->persist(); + $this->eventCounter = 0; + + $this->status = $this->fetchRemoteStatus(); + if (! $this->status->is(ProjectionStatus::RUNNING()) && ! $this->status->is(ProjectionStatus::IDLE())) { + $this->isStopped = true; + } + } + + if ($this->isStopped) { + break; + } + } + } + + private function handleStreamWithHandlers(string $streamName, Iterator $events): void + { + $this->currentStreamName = $streamName; + + foreach ($events as $key => $event) { + if ($this->triggerPcntlSignalDispatch) { + \pcntl_signal_dispatch(); + } + if ($this->streamPositions[$streamName] + 1 !== $key) { + $this->addGap($this->streamPositions[$streamName] + 1, $key); + } + unset($this->gaps[$key]); + // stream position is in the past + if ($key > $this->streamPositions[$streamName]) { + $this->streamPositions[$streamName] = $key; + } + /* @var Message $event */ + if (! isset($this->handlers[$event->messageName()])) { + continue; + } + + $this->eventCounter++; + + $handler = $this->handlers[$event->messageName()]; + $result = $handler($this->state, $event); + + if (\is_array($result)) { + $this->state = $result; + } + + if ($this->eventCounter === $this->persistBlockSize) { + $this->persist(); + $this->eventCounter = 0; + + $this->status = $this->fetchRemoteStatus(); + if (! $this->status->is(ProjectionStatus::RUNNING()) && ! $this->status->is(ProjectionStatus::IDLE())) { + $this->isStopped = true; + } + } + + if ($this->isStopped) { + break; + } + } + } + + private function createMessage(array $document): ?Message + { + $createdAt = $document['created_at']; + + if (\strlen($createdAt) === 19) { + $createdAt .= '.000'; + } + + $createdAt = \DateTimeImmutable::createFromFormat( + 'Y-m-d\TH:i:s.u', + $createdAt, + new \DateTimeZone('UTC') + ); + + $metadata = $document['metadata']; + + if (! \array_key_exists('_position', $metadata)) { + $metadata['_position'] = $document['_id']; + } + + $payload = $document['payload']; + + return $this->messageFactory->createMessageFromArray($document['event_name'], [ + 'uuid' => $document['event_id'], + 'created_at' => $createdAt, + 'payload' => $payload, + 'metadata' => $metadata, + ]); + } +} diff --git a/tests/Container/MongoDbProjectionManagerFactoryTest.php b/tests/Container/MongoDbProjectionManagerFactoryTest.php index 664744a..9507a04 100644 --- a/tests/Container/MongoDbProjectionManagerFactoryTest.php +++ b/tests/Container/MongoDbProjectionManagerFactoryTest.php @@ -13,6 +13,7 @@ namespace ProophTest\EventStore\MongoDb\Container; use PHPUnit\Framework\TestCase; +use Prooph\Common\Messaging\FQCNMessageFactory; use Prooph\Common\Messaging\MessageFactory; use Prooph\EventStore\EventStore; use Prooph\EventStore\MongoDb\Container\MongoDbProjectionManagerFactory; @@ -36,6 +37,7 @@ public function it_creates_service(): void $config['prooph']['projection_manager']['default'] = [ 'client' => 'my client', 'database' => 'test_db', + 'persistence_strategy' => PersistenceStrategy\MongoDbAggregateStreamStrategy::class, ]; $client = TestUtil::getClient(); @@ -51,6 +53,8 @@ public function it_creates_service(): void $container->get('my client')->willReturn($client)->shouldBeCalled(); $container->get(EventStore::class)->willReturn($eventStore)->shouldBeCalled(); $container->get('config')->willReturn($config)->shouldBeCalled(); + $container->get(FQCNMessageFactory::class)->willReturn(new FQCNMessageFactory())->shouldBeCalled(); + $container->get(PersistenceStrategy\MongoDbAggregateStreamStrategy::class)->willReturn($this->prophesize(PersistenceStrategy::class))->shouldBeCalled(); $factory = new MongoDbProjectionManagerFactory(); $projectionManager = $factory($container->reveal()); @@ -66,6 +70,7 @@ public function it_creates_service_via_callstatic(): void $config['prooph']['projection_manager']['default'] = [ 'client' => 'my client', 'database' => 'test_db', + 'persistence_strategy' => PersistenceStrategy\MongoDbAggregateStreamStrategy::class, ]; $client = TestUtil::getClient(); @@ -81,6 +86,8 @@ public function it_creates_service_via_callstatic(): void $container->get('my client')->willReturn($client)->shouldBeCalled(); $container->get(EventStore::class)->willReturn($eventStore)->shouldBeCalled(); $container->get('config')->willReturn($config)->shouldBeCalled(); + $container->get(FQCNMessageFactory::class)->willReturn(new FQCNMessageFactory())->shouldBeCalled(); + $container->get(PersistenceStrategy\MongoDbAggregateStreamStrategy::class)->willReturn($this->prophesize(PersistenceStrategy::class))->shouldBeCalled(); $name = 'default'; $pdo = MongoDbProjectionManagerFactory::$name($container->reveal()); diff --git a/tests/Projection/AbstractMongoDbEventStoreProjectorTest.php b/tests/Projection/AbstractMongoDbEventStoreProjectorTest.php index 8afebac..ea9ee25 100644 --- a/tests/Projection/AbstractMongoDbEventStoreProjectorTest.php +++ b/tests/Projection/AbstractMongoDbEventStoreProjectorTest.php @@ -59,7 +59,13 @@ protected function setUp(): void $this->database, $this->getPersistenceStrategy() ); - $this->projectionManager = new MongoDbProjectionManager($this->eventStore, $this->client, $this->database); + $this->projectionManager = new MongoDbProjectionManager( + $this->eventStore, + $this->client, + $this->getPersistenceStrategy(), + new FQCNMessageFactory(), + $this->database + ); } protected function tearDown(): void @@ -208,6 +214,8 @@ public function it_throws_exception_when_invalid_wrapped_event_store_instance_pa new MongoDbEventStoreProjector( $wrappedEventStore->reveal(), $this->client, + $this->getPersistenceStrategy(), + new FQCNMessageFactory(), $this->database, 'test_projection', 'event_streams', @@ -233,6 +241,8 @@ public function it_throws_exception_when_unknown_event_store_instance_passed(): new MongoDbEventStoreProjector( $eventStore->reveal(), $client->reveal(), + $this->getPersistenceStrategy(), + new FQCNMessageFactory(), $this->database, 'test_projection', 'event_streams', @@ -328,7 +338,7 @@ public function it_respects_update_lock_threshold(): void \posix_kill($processDetails['pid'], SIGQUIT); - \sleep(1); + \usleep(1500000); $processDetails = \proc_get_status($projectionProcess); $this->assertFalse( diff --git a/tests/Projection/AbstractMongoDbEventStoreQueryTest.php b/tests/Projection/AbstractMongoDbEventStoreQueryTest.php index 4ddb00c..7a886d0 100644 --- a/tests/Projection/AbstractMongoDbEventStoreQueryTest.php +++ b/tests/Projection/AbstractMongoDbEventStoreQueryTest.php @@ -64,7 +64,13 @@ protected function setUp(): void $this->database, $this->getPersistenceStrategy() ); - $this->projectionManager = new MongoDbProjectionManager($this->eventStore, $this->client, $this->database); + $this->projectionManager = new MongoDbProjectionManager( + $this->eventStore, + $this->client, + $this->getPersistenceStrategy(), + new FQCNMessageFactory(), + $this->database + ); } protected function tearDown(): void diff --git a/tests/Projection/AbstractMongoDbEventStoreReadModelProjectorTest.php b/tests/Projection/AbstractMongoDbEventStoreReadModelProjectorTest.php index 1b6a0d2..78b6759 100644 --- a/tests/Projection/AbstractMongoDbEventStoreReadModelProjectorTest.php +++ b/tests/Projection/AbstractMongoDbEventStoreReadModelProjectorTest.php @@ -74,7 +74,13 @@ protected function setUp(): void $this->database, $this->getPersistenceStrategy() ); - $this->projectionManager = new MongoDbProjectionManager($this->eventStore, $this->client, $this->database); + $this->projectionManager = new MongoDbProjectionManager( + $this->eventStore, + $this->client, + $this->getPersistenceStrategy(), + new FQCNMessageFactory(), + $this->database + ); } /** @@ -236,6 +242,8 @@ public function it_throws_exception_when_invalid_wrapped_event_store_instance_pa new MongoDbEventStoreReadModelProjector( $wrappedEventStore->reveal(), $this->client, + $this->getPersistenceStrategy(), + new FQCNMessageFactory(), $this->database, 'test_projection', new ReadModelMock(), @@ -262,6 +270,8 @@ public function it_throws_exception_when_unknown_event_store_instance_passed(): new MongoDbEventStoreReadModelProjector( $eventStore->reveal(), $client->reveal(), + $this->getPersistenceStrategy(), + new FQCNMessageFactory(), $this->database, 'test_projection', $readModel->reveal(), @@ -355,7 +365,7 @@ public function it_respects_update_lock_threshold(): void \posix_kill($processDetails['pid'], SIGQUIT); - \sleep(1); + \usleep(1500000); $processDetails = \proc_get_status($projectionProcess); $this->assertEquals( @@ -367,7 +377,7 @@ public function it_respects_update_lock_threshold(): void /** * @test */ - public function it_should_update_lock_if_projection_is_not_locked() + public function it_should_update_lock_if_projection_is_not_locked(): void { $projectorRef = new \ReflectionClass(MongoDbEventStoreReadModelProjector::class); @@ -383,7 +393,7 @@ public function it_should_update_lock_if_projection_is_not_locked() /** * @test */ - public function it_should_update_lock_if_update_lock_threshold_is_set_to_0() + public function it_should_update_lock_if_update_lock_threshold_is_set_to_0(): void { $projectorRef = new \ReflectionClass(MongoDbEventStoreReadModelProjector::class); @@ -409,7 +419,7 @@ public function it_should_update_lock_if_update_lock_threshold_is_set_to_0() /** * @test */ - public function it_should_update_lock_if_now_is_greater_than_last_lock_update_plus_threshold() + public function it_should_update_lock_if_now_is_greater_than_last_lock_update_plus_threshold(): void { $projectorRef = new \ReflectionClass(MongoDbEventStoreReadModelProjector::class); @@ -435,7 +445,7 @@ public function it_should_update_lock_if_now_is_greater_than_last_lock_update_pl /** * @test */ - public function it_should_not_update_lock_if_now_is_lower_than_last_lock_update_plus_threshold() + public function it_should_not_update_lock_if_now_is_lower_than_last_lock_update_plus_threshold(): void { $projectorRef = new \ReflectionClass(MongoDbEventStoreReadModelProjector::class); diff --git a/tests/Projection/MongoDbEventStoreProjectorSimpleStreamStrategyTest.php b/tests/Projection/MongoDbEventStoreProjectorSimpleStreamStrategyTest.php index 53025a6..c91330b 100644 --- a/tests/Projection/MongoDbEventStoreProjectorSimpleStreamStrategyTest.php +++ b/tests/Projection/MongoDbEventStoreProjectorSimpleStreamStrategyTest.php @@ -13,6 +13,9 @@ namespace ProophTest\EventStore\MongoDb\Projection; use Prooph\EventStore\MongoDb\PersistenceStrategy; +use Prooph\EventStore\Stream; +use Prooph\EventStore\StreamName; +use ProophTest\EventStore\Mock\UserCreated; /** * @group Projector @@ -25,4 +28,82 @@ protected function getPersistenceStrategy(): PersistenceStrategy { return new PersistenceStrategy\MongoDbSimpleStreamStrategy(); } + + /** + * @test + */ + public function it_changes_to_mongodb_change_stream(): void + { + if (! \extension_loaded('pcntl')) { + $this->markTestSkipped('The PCNTL extension is not available.'); + + return; + } + + $events = []; + + for ($i = 1; $i < 21; $i++) { + $events[] = UserCreated::with([ + 'id' => $i, + 'time' => \microtime(true), + ], 1); + } + + $this->eventStore->create(new Stream(new StreamName('user-123'), new \ArrayIterator($events))); + + $command = 'exec php ' . \realpath(__DIR__) . '/isolated-change-stream-projection.php'; + $descriptorSpec = [ + 0 => ['pipe', 'r'], + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ]; + /** + * Created process inherits env variables from this process. + * Script returns with non-standard code SIGUSR1 from the handler and -1 else + */ + $projectionProcess = \proc_open($command, $descriptorSpec, $pipes); + + $result = null; + while ($result === null) { + \usleep(1000000); + $result = $this->client->selectCollection($this->database, 'projections')->findOne(); + } + $this->assertTrue($result['position']['user-123'] < 10); + + $this->eventStore->appendTo( + new StreamName('user-123'), + new \ArrayIterator([ + UserCreated::with([ + 'id' => 21, + 'time' => \microtime(true), + ], 1), + UserCreated::with([ + 'id' => 22, + 'time' => \microtime(true), + ], 1), + UserCreated::with([ + 'id' => 23, + 'time' => \microtime(true), + ], 1), + ]) + ); + $result = $this->client->selectCollection($this->database, 'projections')->findOne(); + $this->assertTrue($result['position']['user-123'] < 10); + + \sleep(3); + $result = $this->client->selectCollection($this->database, 'projections')->findOne(); + $this->assertSame(23, $result['position']['user-123']); + $this->assertCount(23, $result['state']['aggregate_versions']); + $this->assertSame(20, $result['state']['aggregate_versions'][19]); + $this->assertSame(21, $result['state']['aggregate_versions'][20]); + $this->assertSame(22, $result['state']['aggregate_versions'][21]); + $this->assertSame(23, $result['state']['aggregate_versions'][22]); + + \sleep(1); + $processDetails = \proc_get_status($projectionProcess); + $this->assertEquals( + SIG_DFL, + $processDetails['exitcode'] + ); + } } diff --git a/tests/Projection/MongoDbEventStoreReadModelProjectorSimpleStreamStrategyTest.php b/tests/Projection/MongoDbEventStoreReadModelProjectorSimpleStreamStrategyTest.php new file mode 100644 index 0000000..e6ec871 --- /dev/null +++ b/tests/Projection/MongoDbEventStoreReadModelProjectorSimpleStreamStrategyTest.php @@ -0,0 +1,93 @@ + + * (c) 2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStore\MongoDb\Projection; + +use Prooph\EventStore\MongoDb\PersistenceStrategy; +use Prooph\EventStore\StreamName; +use ProophTest\EventStore\Mock\TestDomainEvent; + +/** + * @group ReadModel + * @group Projection + * @group SinpleStream + */ +class MongoDbEventStoreReadModelProjectorSimpleStreamStrategyTest extends AbstractMongoDbEventStoreReadModelProjectorTest +{ + protected function getPersistenceStrategy(): PersistenceStrategy + { + return new PersistenceStrategy\MongoDbSimpleStreamStrategy(); + } + + /** + * @test + */ + public function it_changes_to_mongodb_change_stream(): void + { + if (! \extension_loaded('pcntl')) { + $this->markTestSkipped('The PCNTL extension is not available.'); + + return; + } + + $command = 'exec php ' . \realpath(__DIR__) . '/isolated-change-stream-read-model-projection.php'; + $descriptorSpec = [ + 0 => ['pipe', 'r'], + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ]; + /** + * Created process inherits env variables from this process. + * Script returns with non-standard code SIGUSR1 from the handler and -1 else + */ + $projectionProcess = \proc_open($command, $descriptorSpec, $pipes); + + \usleep(200000); + $this->eventStore->beginTransaction(); + + $this->eventStore->appendTo( + new StreamName('user-123'), + new \ArrayIterator([ + TestDomainEvent::with(['test' => 22], 22), + TestDomainEvent::with(['test' => 23], 23), + ]) + ); + + $result = null; + while ($result === null) { + \usleep(500000); + $result = $this->client->selectCollection($this->database, 'projections')->findOne(); + } + + $this->assertTrue($result['position']['user-123'] < 10); + $this->assertTrue($result['position']['user-123'] > 1); + + \usleep(500000); + $this->eventStore->commit(); + + \sleep(3); + $result = $this->client->selectCollection($this->database, 'projections')->findOne(); + $this->assertSame(22, $result['position']['user-123']); + $this->assertCount(22, $result['state']['aggregate_positions']); + $this->assertSame(22, $result['state']['aggregate_positions'][19]); + // events from above are saved/processed as last but have first position + $this->assertSame(1, $result['state']['aggregate_positions'][20]); + $this->assertSame(2, $result['state']['aggregate_positions'][21]); + + \sleep(1); + $processDetails = \proc_get_status($projectionProcess); + $this->assertEquals( + SIG_DFL, + $processDetails['exitcode'] + ); + } +} diff --git a/tests/Projection/MongoDbEventStoreReadModelProjectorSingleStreamStrategyTest.php b/tests/Projection/MongoDbEventStoreReadModelProjectorSingleStreamStrategyTest.php new file mode 100644 index 0000000..ac3f94a --- /dev/null +++ b/tests/Projection/MongoDbEventStoreReadModelProjectorSingleStreamStrategyTest.php @@ -0,0 +1,28 @@ + + * (c) 2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStore\MongoDb\Projection; + +use Prooph\EventStore\MongoDb\PersistenceStrategy; + +/** + * @group ReadModel + * @group Projection + * @group SingleStream + */ +class MongoDbEventStoreReadModelProjectorSingleStreamStrategyTest extends AbstractMongoDbEventStoreReadModelProjectorTest +{ + protected function getPersistenceStrategy(): PersistenceStrategy + { + return new PersistenceStrategy\MongoDbSingleStreamStrategy(); + } +} diff --git a/tests/Projection/MongoDbProjectionManagerTest.php b/tests/Projection/MongoDbProjectionManagerTest.php index a8fae45..731c403 100644 --- a/tests/Projection/MongoDbProjectionManagerTest.php +++ b/tests/Projection/MongoDbProjectionManagerTest.php @@ -70,7 +70,13 @@ protected function setUp(): void $this->database, $persistenceStrategy ); - $this->projectionManager = new MongoDbProjectionManager($this->eventStore, $this->client, $this->database); + $this->projectionManager = new MongoDbProjectionManager( + $this->eventStore, + $this->client, + $persistenceStrategy, + new FQCNMessageFactory(), + $this->database + ); } protected function tearDown(): void @@ -100,8 +106,15 @@ public function it_throws_exception_when_invalid_wrapped_event_store_instance_pa $eventStore = $this->prophesize(EventStore::class); $wrappedEventStore = $this->prophesize(EventStoreDecorator::class); $wrappedEventStore->getInnerEventStore()->willReturn($eventStore->reveal())->shouldBeCalled(); + $persistenceStrategy = $this->prophesize(PersistenceStrategy::class)->reveal(); - new MongoDbProjectionManager($wrappedEventStore->reveal(), $this->client, TestUtil::getDatabaseName()); + new MongoDbProjectionManager( + $wrappedEventStore->reveal(), + $this->client, + $persistenceStrategy, + new FQCNMessageFactory(), + TestUtil::getDatabaseName() + ); } /** diff --git a/tests/Projection/isolated-change-stream-projection.php b/tests/Projection/isolated-change-stream-projection.php new file mode 100755 index 0000000..363785c --- /dev/null +++ b/tests/Projection/isolated-change-stream-projection.php @@ -0,0 +1,67 @@ + + * (c) 2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +use Prooph\Common\Messaging\FQCNMessageFactory; +use Prooph\Common\Messaging\NoOpMessageConverter; +use Prooph\EventStore\MongoDb\MongoDbEventStore; +use Prooph\EventStore\MongoDb\PersistenceStrategy\MongoDbSimpleStreamStrategy; +use Prooph\EventStore\MongoDb\Projection\MongoDbEventStoreProjector; +use Prooph\EventStore\MongoDb\Projection\MongoDbProjectionManager; +use ProophTest\EventStore\Mock\UserCreated; +use ProophTest\EventStore\MongoDb\TestUtil; + +require __DIR__ . '/../../vendor/autoload.php'; + +$client = TestUtil::getClient(); +$database = TestUtil::getDatabaseName(); + +$persistenceStrategy = new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()); + +$eventStore = new MongoDbEventStore( + new FQCNMessageFactory(), + $client, + $database, + $persistenceStrategy +); + +$projectionManager = new MongoDbProjectionManager( + $eventStore, + $client, + $persistenceStrategy, + new FQCNMessageFactory(), + $database +); +$projection = $projectionManager->createProjection( + 'test_projection', + [ + MongoDbEventStoreProjector::OPTION_PCNTL_DISPATCH => true, + MongoDbEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE => 2, + ] +); +\pcntl_signal(SIGQUIT, function () use ($projection) { + $projection->stop(); + exit(SIGUSR1); +}); +$projection + ->init(function (): array { + return ['aggregate_versions' => []]; + }) + ->fromStream('user-123') + ->when([ + UserCreated::class => function (array $state, UserCreated $event): array { + \usleep(100000); + $state['aggregate_versions'][] = $event->payload()['id']; + + return $state; + }, + ]) + ->run(false); diff --git a/tests/Projection/isolated-change-stream-read-model-projection.php b/tests/Projection/isolated-change-stream-read-model-projection.php new file mode 100755 index 0000000..b43fc87 --- /dev/null +++ b/tests/Projection/isolated-change-stream-read-model-projection.php @@ -0,0 +1,113 @@ + + * (c) 2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +use Prooph\Common\Messaging\FQCNMessageFactory; +use Prooph\Common\Messaging\NoOpMessageConverter; +use Prooph\EventStore\MongoDb\MongoDbEventStore; +use Prooph\EventStore\MongoDb\PersistenceStrategy\MongoDbSimpleStreamStrategy; +use Prooph\EventStore\MongoDb\Projection\MongoDbProjectionManager; +use Prooph\EventStore\Projection\ReadModel; +use Prooph\EventStore\Projection\ReadModelProjector; +use Prooph\EventStore\Stream; +use Prooph\EventStore\StreamName; +use ProophTest\EventStore\Mock\TestDomainEvent; +use ProophTest\EventStore\MongoDb\TestUtil; + +require __DIR__ . '/../../vendor/autoload.php'; + +$readModel = new class() implements ReadModel { + public function init(): void + { + } + + public function isInitialized(): bool + { + return true; + } + + public function reset(): void + { + } + + public function delete(): void + { + } + + public function stack(string $operation, ...$args): void + { + } + + public function persist(): void + { + } +}; + +$delayedIterator = new class() extends ArrayIterator { + public function current() + { + \usleep(1000); + + return parent::current(); + } +}; + +$client = TestUtil::getClient(); +$database = TestUtil::getDatabaseName(); + +$persistenceStrategy = new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()); + +$eventStore = new MongoDbEventStore( + new FQCNMessageFactory(), + $client, + $database, + $persistenceStrategy +); +$events = []; + +for ($i = 1; $i < 21; $i++) { + $events[] = TestDomainEvent::with(['test' => $i], $i); +} + +$eventStore->create(new Stream(new StreamName('user-123'), new $delayedIterator($events))); + +$projectionManager = new MongoDbProjectionManager( + $eventStore, + $client, + $persistenceStrategy, + new FQCNMessageFactory(), + $database +); + +$projection = $projectionManager->createReadModelProjection( + 'test_projection', + $readModel, + [ + ReadModelProjector::OPTION_PCNTL_DISPATCH => true, + ReadModelProjector::OPTION_PERSIST_BLOCK_SIZE => 2, + ] +); +\pcntl_signal(SIGQUIT, function () use ($projection) { + $projection->stop(); + exit(SIGUSR1); +}); +$projection + ->init(function (): array { + return ['aggregate_positions' => []]; + }) + ->fromStream('user-123') + ->whenAny(function (array $state, \Prooph\Common\Messaging\Message $event) { + \usleep(100000); + $state['aggregate_positions'][] = $event->metadata()['_position']; + + return $state; + }) + ->run(false); diff --git a/tests/Projection/isolated-long-running-projection.php b/tests/Projection/isolated-long-running-projection.php index 4fa982b..e713128 100755 --- a/tests/Projection/isolated-long-running-projection.php +++ b/tests/Projection/isolated-long-running-projection.php @@ -26,17 +26,18 @@ $client = TestUtil::getClient(); $database = TestUtil::getDatabaseName(); +$persistenceStrategy = new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()); + $eventStore = new MongoDbEventStore( new FQCNMessageFactory(), $client, $database, - new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()) + $persistenceStrategy ); $events = []; for ($i = 0; $i < 100; $i++) { $events[] = TestDomainEvent::with(['test' => 1], $i); - $i++; } $eventStore->create(new Stream(new StreamName('user-123'), new ArrayIterator($events))); @@ -44,6 +45,8 @@ $projectionManager = new MongoDbProjectionManager( $eventStore, $client, + $persistenceStrategy, + new FQCNMessageFactory(), $database ); diff --git a/tests/Projection/isolated-long-running-query.php b/tests/Projection/isolated-long-running-query.php index f104b5b..50a39ba 100755 --- a/tests/Projection/isolated-long-running-query.php +++ b/tests/Projection/isolated-long-running-query.php @@ -26,20 +26,18 @@ $client = TestUtil::getClient(); $database = TestUtil::getDatabaseName(); -//\Prooph\EventStore\MongoDb\MongoDbHelper::createEventStreamsCollection($client, TestUtil::getDatabaseName(), 'event_streams'); -//\Prooph\EventStore\MongoDb\MongoDbHelper::createProjectionCollection($client, TestUtil::getDatabaseName(), 'projections'); +$persistenceStrategy = new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()); $eventStore = new MongoDbEventStore( new FQCNMessageFactory(), $client, $database, - new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()) + $persistenceStrategy ); $events = []; for ($i = 0; $i < 100; $i++) { $events[] = TestDomainEvent::with(['test' => 1], $i); - $i++; } $eventStore->create(new Stream(new StreamName('user-123'), new ArrayIterator($events))); @@ -47,6 +45,8 @@ $projectionManager = new MongoDbProjectionManager( $eventStore, $client, + $persistenceStrategy, + new FQCNMessageFactory(), $database ); diff --git a/tests/Projection/isolated-long-running-read-model-projection.php b/tests/Projection/isolated-long-running-read-model-projection.php index d1f9578..8205bf3 100755 --- a/tests/Projection/isolated-long-running-read-model-projection.php +++ b/tests/Projection/isolated-long-running-read-model-projection.php @@ -54,17 +54,18 @@ public function persist(): void $client = TestUtil::getClient(); $database = TestUtil::getDatabaseName(); +$persistenceStrategy = new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()); + $eventStore = new MongoDbEventStore( new FQCNMessageFactory(), $client, $database, - new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()) + $persistenceStrategy ); $events = []; for ($i = 0; $i < 100; $i++) { $events[] = TestDomainEvent::with(['test' => 1], $i); - $i++; } $eventStore->create(new Stream(new StreamName('user-123'), new ArrayIterator($events))); @@ -72,6 +73,8 @@ public function persist(): void $projectionManager = new MongoDbProjectionManager( $eventStore, $client, + $persistenceStrategy, + new FQCNMessageFactory(), $database ); diff --git a/tests/Projection/isolated-projection.php b/tests/Projection/isolated-projection.php index de61e94..55fe27d 100755 --- a/tests/Projection/isolated-projection.php +++ b/tests/Projection/isolated-projection.php @@ -24,16 +24,20 @@ $client = TestUtil::getClient(); $database = TestUtil::getDatabaseName(); +$persistenceStrategy = new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()); + $eventStore = new MongoDbEventStore( new FQCNMessageFactory(), $client, $database, - new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()) + $persistenceStrategy ); $projectionManager = new MongoDbProjectionManager( $eventStore, $client, + $persistenceStrategy, + new FQCNMessageFactory(), $database ); $projection = $projectionManager->createProjection( diff --git a/tests/Projection/isolated-read-model-projection.php b/tests/Projection/isolated-read-model-projection.php index 20c6269..9d001a1 100755 --- a/tests/Projection/isolated-read-model-projection.php +++ b/tests/Projection/isolated-read-model-projection.php @@ -52,16 +52,20 @@ public function persist(): void $client = TestUtil::getClient(); $database = TestUtil::getDatabaseName(); +$persistenceStrategy = new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()); + $eventStore = new MongoDbEventStore( new FQCNMessageFactory(), $client, $database, - new MongoDbSimpleStreamStrategy(new NoOpMessageConverter()) + $persistenceStrategy ); $projectionManager = new MongoDbProjectionManager( $eventStore, $client, + $persistenceStrategy, + new FQCNMessageFactory(), $database ); $projection = $projectionManager->createReadModelProjection( diff --git a/tests/TestUtil.php b/tests/TestUtil.php index 1488c74..f3cc4a4 100644 --- a/tests/TestUtil.php +++ b/tests/TestUtil.php @@ -32,7 +32,8 @@ public static function getClient(): Client try { self::$client = new Client( $clientParams['uri'], - $clientParams['uriOptions'] + $clientParams['uriOptions'], + ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']] ); } catch (RuntimeException $e) { $retries--;