diff --git a/lib/connection/pool.js b/lib/connection/pool.js index ad29c9c96..359c2778a 100644 --- a/lib/connection/pool.js +++ b/lib/connection/pool.js @@ -544,13 +544,23 @@ function messageHandler(self) { return handleOperationCallback(self, workItem.cb, new MongoError(err)); } - // Look for clusterTime, and update it if necessary - if (message.documents[0] && message.documents[0].hasOwnProperty('$clusterTime')) { - const $clusterTime = message.documents[0].$clusterTime; - self.topology.clusterTime = $clusterTime; + // Look for clusterTime, and operationTime and update them if necessary + if (message.documents[0]) { + if (message.documents[0].$clusterTime) { + const $clusterTime = message.documents[0].$clusterTime; + self.topology.clusterTime = $clusterTime; + + if (workItem.session != null) { + resolveClusterTime(workItem.session, $clusterTime); + } + } - if (workItem.session != null) { - resolveClusterTime(workItem.session, $clusterTime); + if ( + message.documents[0].operationTime && + workItem.session && + workItem.session.supports.causalConsistency + ) { + workItem.session.advanceOperationTime(message.documents[0].operationTime); } } diff --git a/lib/sessions.js b/lib/sessions.js index d41896459..8dcee4907 100644 --- a/lib/sessions.js +++ b/lib/sessions.js @@ -19,15 +19,24 @@ class ClientSession { throw new Error('ClientSession requires a ServerSessionPool'); } + options = options || {}; this.topology = topology; this.sessionPool = sessionPool; this.hasEnded = false; this.serverSession = sessionPool.acquire(); + this.supports = { + causalConsistency: !!options.causalConsistency + }; + options = options || {}; if (typeof options.initialClusterTime !== 'undefined') { this.clusterTime = options.initialClusterTime; + } else { + this.clusterTime = null; } + + this.operationTime = null; } /** @@ -64,6 +73,22 @@ class ClientSession { // spec indicates that we should ignore all errors for `endSessions` if (typeof callback === 'function') callback(null, null); } + + /** + * Advances the operationTime for a ClientSession. + * + * @param {object} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to + */ + advanceOperationTime(operationTime) { + if (this.operationTime == null) { + this.operationTime = operationTime; + return; + } + + if (operationTime.greaterThan(this.operationTime)) { + this.operationTime = operationTime; + } + } } Object.defineProperty(ClientSession.prototype, 'id', { diff --git a/test/mock/index.js b/test/mock/index.js index 3a960159e..e2a8d31bb 100644 --- a/test/mock/index.js +++ b/test/mock/index.js @@ -52,6 +52,11 @@ const DEFAULT_ISMASTER = { ok: 1 }; +const DEFAULT_ISMASTER_36 = Object.assign({}, DEFAULT_ISMASTER, { + maxWireVersion: 6, + logicalSessionTimeoutMinutes: 10 +}); + /* * Main module */ @@ -67,5 +72,6 @@ module.exports = { }, cleanup: cleanup, - DEFAULT_ISMASTER: DEFAULT_ISMASTER + DEFAULT_ISMASTER: DEFAULT_ISMASTER, + DEFAULT_ISMASTER_36: DEFAULT_ISMASTER_36 }; diff --git a/test/tests/unit/common.js b/test/tests/unit/common.js index c31155532..227add322 100644 --- a/test/tests/unit/common.js +++ b/test/tests/unit/common.js @@ -91,6 +91,13 @@ class ReplSetFixture { } } +/** + * Creates a cluster time for use in unit testing cluster time gossiping and + * causal consistency. + * + * @param {Number} time the logical time + * @returns a cluster time according to the driver sessions specification + */ function genClusterTime(time) { return { clusterTime: new Timestamp(time), diff --git a/test/tests/unit/single/sessions_tests.js b/test/tests/unit/single/sessions_tests.js index 661ed726a..eb8deeb38 100644 --- a/test/tests/unit/single/sessions_tests.js +++ b/test/tests/unit/single/sessions_tests.js @@ -2,6 +2,7 @@ var Server = require('../../../../lib/topologies/server'), Long = require('bson').Long, ObjectId = require('bson').ObjectId, + Timestamp = require('bson').Timestamp, expect = require('chai').expect, assign = require('../../../../lib/utils').assign, mock = require('../../../mock'), @@ -578,4 +579,50 @@ describe('Sessions (Single)', function() { client2.connect(); } }); + + it('should track the highest `operationTime` seen, if causal consistency is enabled', { + metadata: { requires: { topology: 'single' } }, + test: function(done) { + const client = new Server(test.server.address()), + sessionPool = new ServerSessionPool(client), + session = new ClientSession(client, sessionPool, { causalConsistency: true }), + insertOperationTime1 = Timestamp.fromNumber(Date.now()), + insertOperationTime2 = Timestamp.fromNumber(Date.now() + 10 * 60 * 1000); + + let insertCount = 0; + test.server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(mock.DEFAULT_ISMASTER_36); + } else if (doc.insert) { + request.reply({ + ok: 1, + operationTime: insertCount === 0 ? insertOperationTime1 : insertOperationTime2 + }); + + insertCount++; + } + }); + + client.on('error', done); + client.once('connect', () => { + client.insert('db.test', [{ a: 42 }], { session: session }, err => { + expect(err).to.not.exist; + expect(session.operationTime).to.exist; + expect(session.operationTime).to.eql(insertOperationTime1); + + client.insert('db.test', [{ b: 52 }], { session: session }, err => { + expect(err).to.not.exist; + expect(session.operationTime).to.exist; + expect(session.operationTime).to.eql(insertOperationTime2); + + client.destroy(); + done(); + }); + }); + }); + + client.connect(); + } + }); });