Skip to content

Commit

Permalink
PHPLIB-651: Support $merge and $out executing on secondaries (#861)
Browse files Browse the repository at this point in the history
Synced CRUD spec tests with mongodb/specifications@5f8f668
  • Loading branch information
jmikola committed Oct 18, 2021
1 parent f4a6e8d commit 7ed839a
Show file tree
Hide file tree
Showing 5 changed files with 975 additions and 41 deletions.
19 changes: 16 additions & 3 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class Collection
/** @var integer */
private static $wireVersionForReadConcernWithWriteStage = 8;

/** @var integer */
private static $wireVersionForSecondarySupportsWriteStage = 13;

/** @var string */
private $collectionName;

Expand Down Expand Up @@ -225,11 +228,21 @@ public function aggregate(array $pipeline, array $options = [])
$options['readPreference'] = $this->readPreference;
}

if ($hasWriteStage) {
$server = select_server($this->manager, $options);

/* If a write stage is being used with a read preference (explicit or
* inherited), check that the wire version supports it. If not, force a
* primary read preference and select a new server if necessary. */
if (
$hasWriteStage && isset($options['readPreference']) &&
! server_supports_feature($server, self::$wireVersionForSecondarySupportsWriteStage)
) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}

$server = select_server($this->manager, $options);
if ($server->isSecondary()) {
$server = select_server($this->manager, $options);
}
}

/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
Expand Down
19 changes: 16 additions & 3 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class Database
/** @var integer */
private static $wireVersionForReadConcernWithWriteStage = 8;

/** @var integer */
private static $wireVersionForSecondarySupportsWriteStage = 13;

/** @var string */
private $databaseName;

Expand Down Expand Up @@ -206,11 +209,21 @@ public function aggregate(array $pipeline, array $options = [])
$options['readPreference'] = $this->readPreference;
}

if ($hasWriteStage) {
$server = select_server($this->manager, $options);

/* If a write stage is being used with a read preference (explicit or
* inherited), check that the wire version supports it. If not, force a
* primary read preference and select a new server if necessary. */
if (
$hasWriteStage && isset($options['readPreference']) &&
! server_supports_feature($server, self::$wireVersionForSecondarySupportsWriteStage)
) {
$options['readPreference'] = new ReadPreference(ReadPreference::RP_PRIMARY);
}

$server = select_server($this->manager, $options);
if ($server->isSecondary()) {
$server = select_server($this->manager, $options);
}
}

/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
Expand Down
78 changes: 43 additions & 35 deletions src/Operation/Aggregate.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use ArrayIterator;
use MongoDB\Driver\Command;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
use MongoDB\Driver\ReadConcern;
use MongoDB\Driver\ReadPreference;
Expand Down Expand Up @@ -75,6 +76,12 @@ class Aggregate implements Executable, Explainable
/** @var array */
private $options;

/** @var bool */
private $isExplain;

/** @var bool */
private $isWrite;

/**
* Constructs an aggregate command.
*
Expand Down Expand Up @@ -253,8 +260,19 @@ public function __construct($databaseName, $collectionName, array $pipeline, arr
unset($options['writeConcern']);
}

if (! empty($options['explain'])) {
$this->isExplain = ! empty($options['explain']);
$this->isWrite = is_last_pipeline_operator_write($pipeline) && ! $this->isExplain;

// Explain does not use a cursor
if ($this->isExplain) {
$options['useCursor'] = false;
unset($options['batchSize']);
}

/* Ignore batchSize for writes, since no documents are returned and a
* batchSize of zero could prevent the pipeline from executing. */
if ($this->isWrite) {
unset($options['batchSize']);
}

$this->databaseName = (string) $databaseName;
Expand Down Expand Up @@ -298,20 +316,14 @@ public function execute(Server $server)
}
}

$hasExplain = ! empty($this->options['explain']);
$hasWriteStage = $this->hasWriteStage();

$command = new Command(
$this->createCommandDocument($server, $hasWriteStage),
$this->createCommandDocument($server),
$this->createCommandOptions()
);
$options = $this->createOptions($hasWriteStage, $hasExplain);

$cursor = $hasWriteStage && ! $hasExplain
? $server->executeReadWriteCommand($this->databaseName, $command, $options)
: $server->executeReadCommand($this->databaseName, $command, $options);
$cursor = $this->executeCommand($server, $command);

if ($this->options['useCursor'] || $hasExplain) {
if ($this->options['useCursor'] || $this->isExplain) {
if (isset($this->options['typeMap'])) {
$cursor->setTypeMap($this->options['typeMap']);
}
Expand Down Expand Up @@ -341,10 +353,10 @@ public function execute(Server $server)
*/
public function getCommandDocument(Server $server)
{
return $this->createCommandDocument($server, $this->hasWriteStage());
return $this->createCommandDocument($server);
}

private function createCommandDocument(Server $server, bool $hasWriteStage): array
private function createCommandDocument(Server $server): array
{
$cmd = [
'aggregate' => $this->collectionName ?? 1,
Expand Down Expand Up @@ -377,10 +389,7 @@ private function createCommandDocument(Server $server, bool $hasWriteStage): arr
}

if ($this->options['useCursor']) {
/* Ignore batchSize if pipeline includes an $out or $merge stage, as
* no documents will be returned and sending a batchSize of zero
* could prevent the pipeline from executing at all. */
$cmd['cursor'] = isset($this->options["batchSize"]) && ! $hasWriteStage
$cmd['cursor'] = isset($this->options["batchSize"])
? ['batchSize' => $this->options["batchSize"]]
: new stdClass();
}
Expand All @@ -400,39 +409,38 @@ private function createCommandOptions(): array
}

/**
* Create options for executing the command.
* Execute the aggregate command using the appropriate Server method.
*
* @see http://php.net/manual/en/mongodb-driver-server.executecommand.php
* @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php
* @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
* @param boolean $hasWriteStage
* @param boolean $hasExplain
* @return array
*/
private function createOptions($hasWriteStage, $hasExplain)
private function executeCommand(Server $server, Command $command): Cursor
{
$options = [];

if (isset($this->options['readConcern'])) {
$options['readConcern'] = $this->options['readConcern'];
foreach (['readConcern', 'readPreference', 'session'] as $option) {
if (isset($this->options[$option])) {
$options[$option] = $this->options[$option];
}
}

if (! $hasWriteStage && isset($this->options['readPreference'])) {
$options['readPreference'] = $this->options['readPreference'];
if ($this->isWrite && isset($this->options['writeConcern'])) {
$options['writeConcern'] = $this->options['writeConcern'];
}

if (isset($this->options['session'])) {
$options['session'] = $this->options['session'];
if (! $this->isWrite) {
return $server->executeReadCommand($this->databaseName, $command, $options);
}

if ($hasWriteStage && ! $hasExplain && isset($this->options['writeConcern'])) {
$options['writeConcern'] = $this->options['writeConcern'];
/* Server::executeReadWriteCommand() does not support a "readPreference"
* option, so fall back to executeCommand(). This means that libmongoc
* will not apply any client-level options (e.g. writeConcern), but that
* should not be an issue as PHPLIB handles inheritance on its own. */
if (isset($options['readPreference'])) {
return $server->executeCommand($this->databaseName, $command, $options);
}

return $options;
}

private function hasWriteStage(): bool
{
return is_last_pipeline_operator_write($this->pipeline);
return $server->executeReadWriteCommand($this->databaseName, $command, $options);
}
}

0 comments on commit 7ed839a

Please sign in to comment.