Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/ChangeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,26 @@ public function next()
*/
public function rewind()
{
$this->csIt->rewind();
$resumable = false;
try {
$this->csIt->rewind();
if ($this->valid()) {
$this->extractResumeToken($this->csIt->current());
}
} catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) {
$resumable = true;
}
if ($e->getCode() === self::CURSOR_NOT_FOUND) {
$resumable = true;
}
if ($e instanceof ConnectionTimeoutException) {
$resumable = true;
}
}
if ($resumable) {
$this->resume();
}
}

/**
Expand Down
119 changes: 113 additions & 6 deletions tests/Operation/WatchFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ public function testNextResumesAfterCursorNotFound()
$this->assertSameDocument($expectedResult, $changeStream->current());
}

/**
* @todo test that rewind() also resumes once PHPLIB-322 is implemented
*/
public function testNextResumesAfterConnectionException()
{
/* In order to trigger a dropped connection, we'll use a new client with
Expand Down Expand Up @@ -129,6 +126,56 @@ function(stdClass $command) use (&$commands) {
$this->assertSame($expectedCommands, $commands);
}

public function testRewindResumesAfterConnectionException()
{
/* In order to trigger a dropped connection, we'll use a new client with
* a socket timeout that is less than the change stream's maxAwaitTimeMS
* option. */
$manager = new Manager($this->getUri(), ['socketTimeoutMS' => 50]);
$primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));

$operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($primaryServer);

$commands = [];

try {
(new CommandObserver)->observe(
function() use ($changeStream) {
$changeStream->rewind();
},
function(stdClass $command) use (&$commands) {
$commands[] = key((array) $command);
}
);
$this->fail('ConnectionTimeoutException was not thrown');
} catch (ConnectionTimeoutException $e) {}

$expectedCommands = [
/* The initial aggregate command for change streams returns a cursor
* envelope with an empty initial batch, since there are no changes
* to report at the moment the change stream is created. Therefore,
* we expect a getMore to be issued when we first advance the change
* stream (with either rewind() or next()). */
'getMore',
/* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
* getMore command encounters a client socket timeout and leaves the
* cursor open on the server. ChangeStream should catch this error
* and resume by issuing a new aggregate command. */
'aggregate',
/* When ChangeStream resumes, it overwrites its original cursor with
* the new cursor resulting from the last aggregate command. This
* removes the last reference to the old cursor, which causes the
* driver to kill it (via mongoc_cursor_destroy()). */
'killCursors',
/* Finally, ChangeStream will rewind the new cursor as the last step
* of the resume process. This results in one last getMore. */
'getMore',
];

$this->assertSame($expectedCommands, $commands);
}

public function testNoChangeAfterResumeBeforeInsert()
{
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
Expand Down Expand Up @@ -260,7 +307,6 @@ public function testInitialCursorIsNotClosed()

/**
* @expectedException MongoDB\Exception\ResumeTokenException
* @todo test that rewind() also attempts to extract the resume token once PHPLIB-322 is implemented
*/
public function testNextCannotExtractResumeToken()
{
Expand All @@ -269,13 +315,28 @@ public function testNextCannotExtractResumeToken()
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());

$changeStream->rewind();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this was removed because rewind() now throws its own exception and would prevent us from testing next()

I'd suggest replacing with:

/* Note: we intentionally do not start iteration with rewind() to ensure
 * that we test extraction functionality within next(). */

Borrowed from the socket exception test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side thought: does this actually throw or does it only introduce a delay with its own getMore? Since we've yet to insert a document, I'd expect rewind() to not find any result to attempt extraction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, the rewind() doesn't throw because it doesn't attempt extraction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now makes for very amusing diff output.


/* Note: we intentionally do not start iteration with rewind() to ensure
* that we test extraction functionality within next(). */
$this->insertDocument(['x' => 1]);

$changeStream->next();
}

/**
* @expectedException MongoDB\Exception\ResumeTokenException
*/
public function testRewindCannotExtractResumeToken()
{
$pipeline = [['$project' => ['_id' => 0 ]]];

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());

$this->insertDocument(['x' => 1]);

$changeStream->rewind();
}

public function testMaxAwaitTimeMS()
{
/* On average, an acknowledged write takes about 20 ms to appear in a
Expand Down Expand Up @@ -320,6 +381,52 @@ public function testMaxAwaitTimeMS()
$this->assertTrue($changeStream->valid());
}

public function testRewindResumesAfterCursorNotFound()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());

$this->killChangeStreamCursor($changeStream);

$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->current());
}

public function testRewindExtractsResumeTokenAndNextResumes()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
$changeStream = $operation->execute($this->getPrimaryServer());

$this->insertDocument(['_id' => 1, 'x' => 'foo']);
$this->insertDocument(['_id' => 2, 'x' => 'bar']);

$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1],
];
$this->assertSameDocument($expectedResult, $changeStream->current());

$this->killChangeStreamCursor($changeStream);

$changeStream->next();
$this->assertTrue($changeStream->valid());

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];
$this->assertSameDocument($expectedResult, $changeStream->current());
}

private function insertDocument($document)
{
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
Expand Down