Skip to content

Commit

Permalink
feat(ChangeStream): update default startAtOperationTime
Browse files Browse the repository at this point in the history
Updates the setting of a default startAtOperationTime to be
spec-compliant. Also re-syncs change-stream tests.

Fixes NODE-1520
  • Loading branch information
daprahamian authored Jun 20, 2018
1 parent 770db7f commit 50a9f65
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 114 deletions.
33 changes: 25 additions & 8 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,25 +281,36 @@ var createChangeStreamCursor = function(self) {
return changeStreamCursor;
};

function getResumeToken(self) {
return self.resumeToken || self.options.resumeAfter;
}

function getStartAtOperationTime(self) {
const isMaster = self.serverConfig.lastIsMaster() || {};
return (
isMaster.maxWireVersion && isMaster.maxWireVersion >= 7 && self.options.startAtOperationTime
);
}

var buildChangeStreamAggregationCommand = function(self) {
const serverConfig = self.serverConfig;
const namespace = self.namespace;
const pipeline = self.pipeline;
const resumeToken = self.resumeToken;
const options = self.options;
const cursorNamespace = self.cursorNamespace;

const isMaster = serverConfig.lastIsMaster() || {};

var changeStreamStageOptions = {
fullDocument: options.fullDocument || 'default'
};

if (resumeToken || options.resumeAfter) {
changeStreamStageOptions.resumeAfter = resumeToken || options.resumeAfter;
} else if (isMaster.maxWireVersion && isMaster.maxWireVersion >= 7) {
const ts = options.startAtOperationTime || self.operationTime;
changeStreamStageOptions.startAtOperationTime = ts;
const resumeToken = getResumeToken(self);
const startAtOperationTime = getStartAtOperationTime(self);
if (resumeToken) {
changeStreamStageOptions.resumeAfter = resumeToken;
}

if (startAtOperationTime) {
changeStreamStageOptions.startAtOperationTime = startAtOperationTime;
}

// Map cursor options
Expand Down Expand Up @@ -338,6 +349,12 @@ var processNewChange = function(self, err, change, callback) {
// Handle resumable MongoNetworkErrors
if (isResumableError(err) && !self.attemptingResume) {
self.attemptingResume = true;

if (!(getResumeToken(self) || getStartAtOperationTime(self))) {
const startAtOperationTime = self.cursor.cursorState.operationTime;
self.options = Object.assign({ startAtOperationTime }, self.options);
}

if (callback) {
return self.cursor.close(function(closeErr) {
if (closeErr) {
Expand Down
9 changes: 9 additions & 0 deletions test/functional/change_stream_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,10 @@ describe('Change Streams', function() {
firstBatch: [],
id: new Long('9064341847921713401'),
ns: 'test.test'
},
operationTime: OPERATION_TIME,
$clusterTime: {
clusterTime: OPERATION_TIME
}
};

Expand Down Expand Up @@ -1652,6 +1656,11 @@ describe('Change Streams', function() {
.to.have.nested.property('pipeline[0].$changeStream.startAtOperationTime')
.that.deep.equals(OPERATION_TIME);
expect(doc).to.not.have.nested.property('pipeline[0].$changeStream.resumeAfter');
} else {
expect(doc).to.not.have.nested.property(
'pipeline[0].$changeStream.startAtOperationTime'
);
expect(doc).to.not.have.nested.property('pipeline[0].$changeStream.resumeAfter');
}
return request.reply(AGGREGATE_RESPONSE);
} else if (doc.getMore) {
Expand Down
66 changes: 2 additions & 64 deletions test/functional/spec/change-stream/change-streams.json
Original file line number Diff line number Diff line change
Expand Up @@ -246,56 +246,6 @@
]
}
},
{
"description": "A fresh ChangeStream against a server >=4.0 will always include startAtOperationTime in the $changeStream stage.",
"minServerVersion": "3.8.0",
"target": "collection",
"topology": [
"replicaset"
],
"changeStreamPipeline": [],
"changeStreamOptions": {},
"operations": [
{
"database": "change-stream-tests",
"collection": "test",
"name": "insertOne",
"arguments": {
"document": {
"x": 1
}
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"aggregate": "test",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"fullDocument": "default",
"startAtOperationTime": {
"$timestamp": {
"i": 42,
"t": 42
}
}
}
}
]
},
"command_name": "aggregate",
"database_name": "change-stream-tests"
}
}
],
"result": {
"success": []
}
},
{
"description": "Executing a watch helper on a Database results in notifications for changes to all collections in the specified database.",
"minServerVersion": "3.8.0",
Expand Down Expand Up @@ -348,13 +298,7 @@
"pipeline": [
{
"$changeStream": {
"fullDocument": "default",
"startAtOperationTime": {
"$timestamp": {
"i": 42,
"t": 42
}
}
"fullDocument": "default"
}
}
]
Expand Down Expand Up @@ -446,13 +390,7 @@
{
"$changeStream": {
"fullDocument": "default",
"allChangesForCluster": true,
"startAtOperationTime": {
"$timestamp": {
"i": 42,
"t": 42
}
}
"allChangesForCluster": true
}
}
]
Expand Down
42 changes: 0 additions & 42 deletions test/functional/spec/change-stream/change-streams.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,40 +166,6 @@ tests:
fullDocument:
z:
$numberInt: "3"
-
description: A fresh ChangeStream against a server >=4.0 will always include startAtOperationTime in the $changeStream stage.
minServerVersion: "3.8.0"
target: collection
topology:
- replicaset
changeStreamPipeline: []
changeStreamOptions: {}
operations:
-
database: *database_name
collection: *collection_name
name: insertOne
arguments:
document:
x: 1
expectations:
-
command_started_event:
command:
aggregate: *collection_name
cursor: {}
pipeline:
-
$changeStream:
fullDocument: default
startAtOperationTime:
$timestamp:
i: 42
t: 42
command_name: aggregate
database_name: *database_name
result:
success: []
-
description: Executing a watch helper on a Database results in notifications for changes to all collections in the specified database.
minServerVersion: "3.8.0"
Expand Down Expand Up @@ -241,10 +207,6 @@ tests:
-
$changeStream:
fullDocument: default
startAtOperationTime:
$timestamp:
i: 42
t: 42
command_name: aggregate
database_name: *database_name
result:
Expand Down Expand Up @@ -307,10 +269,6 @@ tests:
$changeStream:
fullDocument: default
allChangesForCluster: true
startAtOperationTime:
$timestamp:
i: 42
t: 42
command_name: aggregate
database_name: admin
result:
Expand Down

0 comments on commit 50a9f65

Please sign in to comment.