Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add change stream support for projections #1

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"php": "^7.1",
"ext-mongodb": "^1.5.2",
"mongodb/mongodb": "^1.4.2",
"prooph/event-store": "^7.3.6"
"prooph/event-store": "^7.4.0"
},
"require-dev": {
"sandrokeil/interop-config": "^2.0.1",
Expand Down
2 changes: 1 addition & 1 deletion env/docker/mongo/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM mongo:4.0
FROM mongo:4.0.6

COPY mongod.conf /etc
COPY init.sh /
Expand Down
6 changes: 5 additions & 1 deletion src/Container/MongoDbProjectionManagerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Interop\Config\ProvidesDefaultOptions;
use Interop\Config\RequiresConfigId;
use Interop\Config\RequiresMandatoryOptions;
use Prooph\Common\Messaging\FQCNMessageFactory;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\MongoDb\Exception\InvalidArgumentException;
use Prooph\EventStore\MongoDb\Projection\MongoDbProjectionManager;
Expand Down Expand Up @@ -70,6 +71,8 @@ public function __invoke(ContainerInterface $container): ProjectionManager
return new MongoDbProjectionManager(
$container->get($config['event_store']),
$container->get($config['client']),
$container->get($config['persistence_strategy']),
$container->get($config['message_factory']),
$config['database'],
$config['event_streams_table'],
$config['projections_table']
Expand All @@ -83,7 +86,7 @@ public function dimensions(): iterable

public function mandatoryOptions(): iterable
{
return ['client', 'database'];
return ['client', 'database', 'persistence_strategy'];
}

public function defaultOptions(): iterable
Expand All @@ -92,6 +95,7 @@ public function defaultOptions(): iterable
'event_store' => EventStore::class,
'event_streams_table' => 'event_streams',
'projections_table' => 'projections',
'message_factory' => FQCNMessageFactory::class,
];
}
}
30 changes: 30 additions & 0 deletions src/Exception/ConcurrencyExceptionFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php
/**
* This file is part of the prooph/pdo-event-store.
* (c) 2016-2018 prooph software GmbH <contact@prooph.de>
* (c) 2016-2018 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStore\MongoDb\Exception;

use Prooph\EventStore\Exception\ConcurrencyException;
use MongoDB\Driver\Exception\Exception as MongoDbException;

class ConcurrencyExceptionFactory
{
public static function fromMongoDbException(MongoDbException $exception): ConcurrencyException
{
return new ConcurrencyException(
\sprintf(
"Some of the aggregate IDs or event IDs have already been used in the same stream. The MongoDB error should contain more information:\nError %s.\nWrite-Errors: %s",
$exception->getMessage(),
var_export($exception->getWriteResult()->getWriteErrors(), true)
)
);
}
}
104 changes: 88 additions & 16 deletions src/MongoDbEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
use Prooph\EventStore\Metadata\FieldType;
use Prooph\EventStore\Metadata\MetadataMatcher;
use Prooph\EventStore\Metadata\Operator;
use Prooph\EventStore\MongoDb\Exception\ConcurrencyExceptionFactory;
use Prooph\EventStore\MongoDb\Exception\ExtensionNotLoaded;
use Prooph\EventStore\MongoDb\Exception\RuntimeException;
use Prooph\EventStore\MongoDb\PersistenceStrategy\MongoDbAggregateStreamStrategy;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamIterator\EmptyStreamIterator;
use Prooph\EventStore\StreamName;
use Prooph\EventStore\TransactionalEventStore;
use Prooph\EventStore\Util\Assertion;
Expand Down Expand Up @@ -240,6 +242,13 @@ public function appendTo(StreamName $streamName, Iterator $streamEvents): void
try {
$this->collection($tableName)->insertMany($data, $options);
} catch (MongoDbException $exception) {
$code = isset($exception->getWriteResult()->getWriteErrors()[0]) ?
$exception->getWriteResult()->getWriteErrors()[0]->getCode()
: $exception->getCode();

if (\in_array($code, [11000, 11001, 12582], true)) {
throw ConcurrencyExceptionFactory::fromMongoDbException($exception);
}
throw RuntimeException::fromMongoDbException($exception);
}
}
Expand Down Expand Up @@ -286,9 +295,16 @@ public function load(
'session' => $this->session,
];
}
$counted = $collection->countDocuments(
['$and' => $where],
$options
);

return new MongoDbStreamIterator(
function (array $filter = [], array $innerOptions = []) use ($collection, $options, $where) {
if (0 === (null === $count ? $counted : \min($counted, $count))) {
return new EmptyStreamIterator();
}
$callable = function (string $method) use ($collection, $options, $where): callable {
return function (array $filter = [], array $innerOptions = []) use ($collection, $options, $where, $method) {
$innerOptions = \array_replace_recursive(
$options,
$innerOptions
Expand All @@ -299,11 +315,16 @@ function (array $filter = [], array $innerOptions = []) use ($collection, $optio
}
});

return $collection->find(
return $collection->{$method}(
['$and' => $where],
$innerOptions
);
},
};
};

return new MongoDbStreamIterator(
$callable('find'),
$callable('countDocuments'),
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
Expand Down Expand Up @@ -358,8 +379,22 @@ public function loadReverse(
];
}

return new MongoDbStreamIterator(
function (array $filter = [], array $innerOptions = []) use ($collection, $options, $where) {
$counted = $collection->countDocuments(
['$and' => $where],
$options
);

if (0 === (null === $count ? $counted : \min($counted, $count))) {
return new EmptyStreamIterator();
}

$callable = function (string $method) use ($collection, $options, $where): callable {
return function (array $filter = [], array $innerOptions = []) use (
$collection,
$options,
$where,
$method
) {
$innerOptions = \array_replace_recursive(
$options,
$innerOptions
Expand All @@ -370,11 +405,16 @@ function (array $filter = [], array $innerOptions = []) use ($collection, $optio
}
});

return $collection->find(
return $collection->{$method}(
['$and' => $where],
$innerOptions
);
},
};
};

return new MongoDbStreamIterator(
$callable('find'),
$callable('countDocuments'),
$this->messageFactory,
$this->loadBatchSize,
$fromNumber,
Expand Down Expand Up @@ -422,10 +462,30 @@ public function commit(): void
throw new TransactionNotStarted();
}

$this->session->commitTransaction();
$this->session->endSession();
$this->session = null;
$this->createdCollections = [];
$retries = 0;

do {
$retries++;
try {
$this->session->commitTransaction();

$this->session = null;
$this->createdCollections = [];
break;
} catch (\MongoDB\Driver\Exception\CommandException $exception) {
$resultDoc = $exception->getResultDocument();

if (isset($resultDoc->errorLabels)
&& in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels, true)
) {
continue;
}
// we use concurrency exception here, so the message will be dispatched again via ConcurrencyMessageDispatcher
throw ConcurrencyExceptionFactory::fromMongoDbException($exception);
} catch (\MongoDB\Driver\Exception\Exception $exception) {
throw ConcurrencyExceptionFactory::fromMongoDbException($exception);
}
} while($retries <= 3);
}

public function rollback(): void
Expand All @@ -437,14 +497,18 @@ public function rollback(): void
if (null === $this->session) {
throw new TransactionNotStarted();
}
$this->session->abortTransaction();
$this->session->endSession();

try {
$this->session->abortTransaction();
} catch (\MongoDB\Driver\Exception\RuntimeException $error) {
// it's ok, we can't do anything here
} finally {
$this->session = null;
}

foreach ($this->createdCollections as $collectionName) {
$this->collection($collectionName)->drop();
}

$this->session = null;
$this->createdCollections = [];
}

Expand Down Expand Up @@ -493,6 +557,10 @@ public function fetchStreamNamesRegex(
int $limit = 20,
int $offset = 0
): array {
if (empty($filter) || false === @\preg_match("/$filter/", '')) {
throw new Exception\InvalidArgumentException('Invalid regex pattern given');
}

$where = $this->createWhereClause($metadataMatcher);
$where[]['real_stream_name'] = ['$regex' => $filter];

Expand Down Expand Up @@ -556,6 +624,10 @@ public function fetchCategoryNames(?string $filter, int $limit = 20, int $offset

public function fetchCategoryNamesRegex(string $filter, int $limit = 20, int $offset = 0): array
{
if (empty($filter) || false === @\preg_match("/$filter/", '')) {
throw new Exception\InvalidArgumentException('Invalid regex pattern given');
}

$where['category'] = ['$regex' => $filter];

try {
Expand Down
56 changes: 43 additions & 13 deletions src/MongoDbStreamIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@
use DateTimeImmutable;
use DateTimeZone;
use Generator;
use Iterator;
use MongoDB\Driver\Cursor;
use MongoDB\Exception\Exception as MongoDbException;
use Prooph\Common\Messaging\Message;
use Prooph\Common\Messaging\MessageFactory;
use Prooph\EventStore\MongoDb\Exception\RuntimeException;
use Prooph\EventStore\StreamIterator\StreamIterator;

final class MongoDbStreamIterator implements Iterator
final class MongoDbStreamIterator implements StreamIterator
{
/**
* @var callable
*/
private $query;

/**
* @var callable
*/
private $countQuery;

/**
* @var MessageFactory
*/
Expand Down Expand Up @@ -81,13 +86,15 @@ final class MongoDbStreamIterator implements Iterator

public function __construct(
callable $query,
callable $countQuery,
MessageFactory $messageFactory,
int $batchSize,
int $fromNumber,
?int $count,
bool $forward
) {
$this->query = $query;
$this->countQuery = $countQuery;
$this->messageFactory = $messageFactory;
$this->batchSize = $batchSize;
$this->fromNumber = $fromNumber;
Expand All @@ -98,6 +105,22 @@ public function __construct(
$this->next();
}

public function count()
{
[$filter, $options] = $this->getFilterAndOptions($this->fromNumber);

$callable = $this->countQuery;
try {
$count = (int) $callable($filter, $options);

return null === $this->count ? $count : \min($count, $this->count);
} catch (MongoDbException $exception) {
// ignore
}

return 0;
}

/**
* @return null|Message
*/
Expand Down Expand Up @@ -225,29 +248,36 @@ private function ensureCursor(array $filter = [], array $options = []): void
}

private function buildStatement(int $fromNumber): void
{
[$filter, $options] = $this->getFilterAndOptions($fromNumber);

$this->ensureCursor($filter, $options);
}

private function yieldCursor(Cursor $cursor): Generator
{
foreach ($cursor as $item) {
yield $item;
}
}

private function getFilterAndOptions(int $fromNumber): array
{
if (null === $this->count
|| $this->count < ($this->batchSize * ($this->batchPosition + 1))
) {
$limit = $this->batchSize;
} else {
$limit = $this->count - ($this->batchSize * $this->batchPosition);
$limit = $this->count - ($this->batchSize * ($this->batchPosition + 1));
}

$this->ensureCursor(
return [
[
'_id' => [$this->forward ? '$gte' : '$lte' => $fromNumber],
],
[
'limit' => $limit,
]
);
}

private function yieldCursor(Cursor $cursor): Generator
{
foreach ($cursor as $item) {
yield $item;
}
],
];
}
}