From eb2dee4bbc7c43ed85cf25945480dd4d375b28fa Mon Sep 17 00:00:00 2001 From: Katherine Walker Date: Mon, 5 Feb 2018 11:06:18 -0500 Subject: [PATCH 1/3] PHPLIB-322: Add resume logic to ChangeStream::rewind() --- src/ChangeStream.php | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/ChangeStream.php b/src/ChangeStream.php index 6ddb45be5..799cc9ce1 100644 --- a/src/ChangeStream.php +++ b/src/ChangeStream.php @@ -117,7 +117,26 @@ public function next() */ public function rewind() { - $this->csIt->rewind(); + $resumable = false; + try { + $this->csIt->rewind(); + if ($this->valid()) { + $this->extractResumeToken($this->csIt->current()); + } + } catch (RuntimeException $e) { + if (strpos($e->getMessage(), "not master") !== false) { + $resumable = true; + } + if ($e->getCode() === self::CURSOR_NOT_FOUND) { + $resumable = true; + } + if ($e instanceof ConnectionTimeoutException) { + $resumable = true; + } + } + if ($resumable) { + $this->resume(); + } } /** From af3ec661faca08a89989b133c9b06ac46dcef22c Mon Sep 17 00:00:00 2001 From: Katherine Walker Date: Mon, 5 Feb 2018 11:07:14 -0500 Subject: [PATCH 2/3] Add rewind tests to WatchFunctionalTest --- tests/Operation/WatchFunctionalTest.php | 119 ++++++++++++++++++++++-- 1 file changed, 113 insertions(+), 6 deletions(-) diff --git a/tests/Operation/WatchFunctionalTest.php b/tests/Operation/WatchFunctionalTest.php index e0edbe65c..b64bfa95c 100644 --- a/tests/Operation/WatchFunctionalTest.php +++ b/tests/Operation/WatchFunctionalTest.php @@ -73,9 +73,6 @@ public function testNextResumesAfterCursorNotFound() $this->assertSameDocument($expectedResult, $changeStream->current()); } - /** - * @todo test that rewind() also resumes once PHPLIB-322 is implemented - */ public function testNextResumesAfterConnectionException() { /* In order to trigger a dropped connection, we'll use a new client with @@ -129,6 +126,56 @@ function(stdClass $command) use (&$commands) { $this->assertSame($expectedCommands, $commands); } + public function testRewindResumesAfterConnectionException() + { + /* In order to trigger a dropped connection, we'll use a new client with + * a socket timeout that is less than the change stream's maxAwaitTimeMS + * option. */ + $manager = new Manager($this->getUri(), ['socketTimeoutMS' => 50]); + $primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY)); + + $operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]); + $changeStream = $operation->execute($primaryServer); + + $commands = []; + + try { + (new CommandObserver)->observe( + function() use ($changeStream) { + $changeStream->rewind(); + }, + function(stdClass $command) use (&$commands) { + $commands[] = key((array) $command); + } + ); + $this->fail('ConnectionTimeoutException was not thrown'); + } catch (ConnectionTimeoutException $e) {} + + $expectedCommands = [ + /* The initial aggregate command for change streams returns a cursor + * envelope with an empty initial batch, since there are no changes + * to report at the moment the change stream is created. Therefore, + * we expect a getMore to be issued when we first advance the change + * stream (with either rewind() or next()). */ + 'getMore', + /* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous + * getMore command encounters a client socket timeout and leaves the + * cursor open on the server. ChangeStream should catch this error + * and resume by issuing a new aggregate command. */ + 'aggregate', + /* When ChangeStream resumes, it overwrites its original cursor with + * the new cursor resulting from the last aggregate command. This + * removes the last reference to the old cursor, which causes the + * driver to kill it (via mongoc_cursor_destroy()). */ + 'killCursors', + /* Finally, ChangeStream will rewind the new cursor as the last step + * of the resume process. This results in one last getMore. */ + 'getMore', + ]; + + $this->assertSame($expectedCommands, $commands); + } + public function testNoChangeAfterResumeBeforeInsert() { $this->insertDocument(['_id' => 1, 'x' => 'foo']); @@ -260,7 +307,6 @@ public function testInitialCursorIsNotClosed() /** * @expectedException MongoDB\Exception\ResumeTokenException - * @todo test that rewind() also attempts to extract the resume token once PHPLIB-322 is implemented */ public function testNextCannotExtractResumeToken() { @@ -269,13 +315,26 @@ public function testNextCannotExtractResumeToken() $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]); $changeStream = $operation->execute($this->getPrimaryServer()); - $changeStream->rewind(); - $this->insertDocument(['x' => 1]); $changeStream->next(); } + /** + * @expectedException MongoDB\Exception\ResumeTokenException + */ + public function testRewindCannotExtractResumeToken() + { + $pipeline = [['$project' => ['_id' => 0 ]]]; + + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]); + $changeStream = $operation->execute($this->getPrimaryServer()); + + $this->insertDocument(['x' => 1]); + + $changeStream->rewind(); + } + public function testMaxAwaitTimeMS() { /* On average, an acknowledged write takes about 20 ms to appear in a @@ -320,6 +379,54 @@ public function testMaxAwaitTimeMS() $this->assertTrue($changeStream->valid()); } + public function testResumeAfterKillThenNoOperations() + { + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]); + $changeStream = $operation->execute($this->getPrimaryServer()); + + $this->insertDocument(['_id' => 1, 'x' => 'foo']); + + $this->killChangeStreamCursor($changeStream); + + $changeStream->rewind(); + $this->assertFalse($changeStream->valid()); + $this->assertNull($changeStream->current()); + } + + public function testResumeAfterKillThenOperation() + { + $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]); + $changeStream = $operation->execute($this->getPrimaryServer()); + + $this->insertDocument(['_id' => 1, 'x' => 'foo']); + $this->insertDocument(['_id' => 2, 'x' => 'bar']); + + $changeStream->rewind(); + $this->assertTrue($changeStream->valid()); + $expectedResult = [ + '_id' => $changeStream->current()->_id, + 'operationType' => 'insert', + 'fullDocument' => ['_id' => 1, 'x' => 'foo'], + 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], + 'documentKey' => ['_id' => 1], + ]; + $this->assertSameDocument($expectedResult, $changeStream->current()); + + $this->killChangeStreamCursor($changeStream); + + $changeStream->next(); + $this->assertTrue($changeStream->valid()); + + $expectedResult = [ + '_id' => $changeStream->current()->_id, + 'operationType' => 'insert', + 'fullDocument' => ['_id' => 2, 'x' => 'bar'], + 'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], + 'documentKey' => ['_id' => 2], + ]; + $this->assertSameDocument($expectedResult, $changeStream->current()); + } + private function insertDocument($document) { $insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document); From 42bf82580ae03cfb44f04777002cc010f7191394 Mon Sep 17 00:00:00 2001 From: Katherine Walker Date: Mon, 5 Feb 2018 17:02:37 -0500 Subject: [PATCH 3/3] Rename test cases --- src/ChangeStream.php | 2 +- tests/Operation/WatchFunctionalTest.php | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ChangeStream.php b/src/ChangeStream.php index 799cc9ce1..e6ea6b375 100644 --- a/src/ChangeStream.php +++ b/src/ChangeStream.php @@ -128,7 +128,7 @@ public function rewind() $resumable = true; } if ($e->getCode() === self::CURSOR_NOT_FOUND) { - $resumable = true; + $resumable = true; } if ($e instanceof ConnectionTimeoutException) { $resumable = true; diff --git a/tests/Operation/WatchFunctionalTest.php b/tests/Operation/WatchFunctionalTest.php index b64bfa95c..6540d8ed2 100644 --- a/tests/Operation/WatchFunctionalTest.php +++ b/tests/Operation/WatchFunctionalTest.php @@ -315,6 +315,8 @@ public function testNextCannotExtractResumeToken() $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]); $changeStream = $operation->execute($this->getPrimaryServer()); + /* Note: we intentionally do not start iteration with rewind() to ensure + * that we test extraction functionality within next(). */ $this->insertDocument(['x' => 1]); $changeStream->next(); @@ -379,13 +381,11 @@ public function testMaxAwaitTimeMS() $this->assertTrue($changeStream->valid()); } - public function testResumeAfterKillThenNoOperations() + public function testRewindResumesAfterCursorNotFound() { $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]); $changeStream = $operation->execute($this->getPrimaryServer()); - $this->insertDocument(['_id' => 1, 'x' => 'foo']); - $this->killChangeStreamCursor($changeStream); $changeStream->rewind(); @@ -393,7 +393,7 @@ public function testResumeAfterKillThenNoOperations() $this->assertNull($changeStream->current()); } - public function testResumeAfterKillThenOperation() + public function testRewindExtractsResumeTokenAndNextResumes() { $operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]); $changeStream = $operation->execute($this->getPrimaryServer());