diff --git a/lib/sessions.js b/lib/sessions.js index 8d8920708..986c33dca 100644 --- a/lib/sessions.js +++ b/lib/sessions.js @@ -1,15 +1,37 @@ 'use strict'; -const retrieveBSON = require('./connection/utils').retrieveBSON, - EventEmitter = require('events'), - BSON = retrieveBSON(), - Binary = BSON.Binary, - uuidV4 = require('./utils').uuidV4; +const retrieveBSON = require('./connection/utils').retrieveBSON; +const EventEmitter = require('events'); +const BSON = retrieveBSON(); +const Binary = BSON.Binary; +const uuidV4 = require('./utils').uuidV4; +const MongoError = require('./error').MongoError; -/** - * - */ +function assertAlive(session, callback) { + if (session.serverSession == null) { + const error = new MongoError('Cannot use a session that has ended'); + if (typeof callback === 'function') { + return callback(error, null); + } + + throw error; + } +}; + +/** A class representing a client session on the server */ class ClientSession extends EventEmitter { + + /** + * Create a client session. + * WARNING: not meant to be instantiated directly + * + * @param {Topology} topology The current client's topology + * @param {ServerSessionPool} sessionPool The server session pool + * @param {Object} [options] Optional settings + * @param {Boolean} [options.causalConsistency] Whether causal consistency should be enabled on this session + * @param {Boolean} [options.autoStartTransaction=false] When enabled this session automatically starts a transaction with the provided defaultTransactionOptions. + * @param {Object} [options.defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session. + */ constructor(topology, sessionPool, options) { super(); @@ -42,10 +64,20 @@ class ClientSession extends EventEmitter { this.explicit = !!options.explicit; this.owner = options.owner; + this.transactionOptions = null; + this.defaultTransactionOptions = options.defaultTransactionOptions || {}; + + if (options.autoStartTransaction) { + this.startTransaction(); + } } /** + * Ends this session on the server * + * @param {Object} [options] Optional settings + * @param {Boolean} [options.skipCommand] Skip sending the actual endSessions command to the server + * @param {Function} [callback] Optional callback for completion of this operation */ endSession(options, callback) { if (typeof options === 'function') (callback = options), (options = {}); @@ -56,6 +88,10 @@ class ClientSession extends EventEmitter { return; } + if (this.serverSession && this.inTransaction()) { + this.abortTransaction(); // pass in callback? + } + if (!options.skipCommand) { // send the `endSessions` command this.topology.endSessions(this.id); @@ -98,6 +134,98 @@ class ClientSession extends EventEmitter { return this.id.id.buffer.equals(session.id.id.buffer); } + + /** + * @returns whether this session is current in a transaction or not + */ + inTransaction() { + return this.transactionOptions != null; + } + + /** + * Starts a new transaction with the given options. + * + * @param {Object} options Optional settings + * @param {ReadConcern} [options.readConcern] The readConcern to use for this transaction + * @param {WriteConcern} [options.writeConcern] The writeConcern to use for this transaction + */ + startTransaction(options) { + assertAlive(this); + if (this.inTransaction()) { + throw new MongoError('Transaction already started'); + } + + // increment txnNumber and reset stmtId to zero. + this.serverSession.txnNumber += 1; + this.serverSession.stmtId = 0; + + // set transaction options, we will use this to determine if we are in a transaction + this.transactionOptions = options || this.defaultTransactionOptions; + } + + /** + * Commits the currently active transaction in this session. + * + * @param {Function} [callback] optional callback for completion of this operation + * @return {Promise} A promise is returned if no callback is provided + */ + commitTransaction(callback) { + if (typeof callback === 'function') { + endTransaction(this, 'commitTransaction', callback); + return; + } + + return new Promise((resolve, reject) => { + endTransaction(this, 'commitTransaction', (err, reply) => err ? reject(err) : resolve(reply)); + }); + } + + /** + * Aborts the currently active transaction in this session. + * + * @param {Function} [callback] optional callback for completion of this operation + * @return {Promise} A promise is returned if no callback is provided + */ + abortTransaction(callback) { + if (typeof callback === 'function') { + endTransaction(this, 'abortTransaction', callback); + return; + } + + return new Promise((resolve, reject) => { + endTransaction(this, 'abortTransaction', (err, reply) => err ? reject(err) : resolve(reply)); + }); + + } +} + +function endTransaction(clientSession, commandName, callback) { + assertAlive(clientSession, callback); + + if (!clientSession.inTransaction()) { + callback(new MongoError('No transaction started')); + return; + } + + if (clientSession.serverSession.stmtId === 0) { + // The server transaction was never started. + callback(null, null); + return; + } + + // send the command + clientSession.topology.command('admin.$cmd', { [commandName]: 1 }, { + writeConcern: clientSession.transactionOptions.writeConcern + }, (err, reply) => { + // reset internal transaction state + if (clientSession.options.autoStartTransaction) { + clientSession.startTransaction(); + } else { + clientSession.transactionOptions = null; + } + + callback(err, reply); + }); } Object.defineProperty(ClientSession.prototype, 'id', { diff --git a/lib/topologies/mongos.js b/lib/topologies/mongos.js index 1ee863fb4..e7b3debc7 100644 --- a/lib/topologies/mongos.js +++ b/lib/topologies/mongos.js @@ -1,23 +1,22 @@ 'use strict'; -const inherits = require('util').inherits, - f = require('util').format, - EventEmitter = require('events').EventEmitter, - BasicCursor = require('../cursor'), - Logger = require('../connection/logger'), - retrieveBSON = require('../connection/utils').retrieveBSON, - MongoError = require('../error').MongoError, - errors = require('../error'), - Server = require('./server'), - clone = require('./shared').clone, - diff = require('./shared').diff, - cloneOptions = require('./shared').cloneOptions, - createClientInfo = require('./shared').createClientInfo, - SessionMixins = require('./shared').SessionMixins, - isRetryableWritesSupported = require('./shared').isRetryableWritesSupported, - getNextTransactionNumber = require('./shared').getNextTransactionNumber, - relayEvents = require('./shared').relayEvents; - +const inherits = require('util').inherits; +const f = require('util').format; +const EventEmitter = require('events').EventEmitter; +const BasicCursor = require('../cursor'); +const Logger = require('../connection/logger'); +const retrieveBSON = require('../connection/utils').retrieveBSON; +const MongoError = require('../error').MongoError; +const errors = require('../error'); +const Server = require('./server'); +const clone = require('./shared').clone; +const diff = require('./shared').diff; +const cloneOptions = require('./shared').cloneOptions; +const createClientInfo = require('./shared').createClientInfo; +const SessionMixins = require('./shared').SessionMixins; +const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported; +const incrementTransactionNumber = require('./shared').incrementTransactionNumber; +const relayEvents = require('./shared').relayEvents; const BSON = retrieveBSON(); /** @@ -909,7 +908,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) { } // increment and assign txnNumber - options.txnNumber = getNextTransactionNumber(options.session); + incrementTransactionNumber(options.session); server[op](ns, ops, options, (err, result) => { if (!err) return callback(null, result); diff --git a/lib/topologies/replset.js b/lib/topologies/replset.js index 8d95dfa9b..8ae87c276 100644 --- a/lib/topologies/replset.js +++ b/lib/topologies/replset.js @@ -1,24 +1,24 @@ 'use strict'; -var inherits = require('util').inherits, - f = require('util').format, - EventEmitter = require('events').EventEmitter, - ReadPreference = require('./read_preference'), - BasicCursor = require('../cursor'), - retrieveBSON = require('../connection/utils').retrieveBSON, - Logger = require('../connection/logger'), - MongoError = require('../error').MongoError, - errors = require('../error'), - Server = require('./server'), - ReplSetState = require('./replset_state'), - clone = require('./shared').clone, - Timeout = require('./shared').Timeout, - Interval = require('./shared').Interval, - createClientInfo = require('./shared').createClientInfo, - SessionMixins = require('./shared').SessionMixins, - isRetryableWritesSupported = require('./shared').isRetryableWritesSupported, - getNextTransactionNumber = require('./shared').getNextTransactionNumber, - relayEvents = require('./shared').relayEvents; +const inherits = require('util').inherits; +const f = require('util').format; +const EventEmitter = require('events').EventEmitter; +const ReadPreference = require('./read_preference'); +const BasicCursor = require('../cursor'); +const retrieveBSON = require('../connection/utils').retrieveBSON; +const Logger = require('../connection/logger'); +const MongoError = require('../error').MongoError; +const errors = require('../error'); +const Server = require('./server'); +const ReplSetState = require('./replset_state'); +const clone = require('./shared').clone; +const Timeout = require('./shared').Timeout; +const Interval = require('./shared').Interval; +const createClientInfo = require('./shared').createClientInfo; +const SessionMixins = require('./shared').SessionMixins; +const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported; +const incrementTransactionNumber = require('./shared').incrementTransactionNumber; +const relayEvents = require('./shared').relayEvents; var MongoCR = require('../auth/mongocr'), X509 = require('../auth/x509'), @@ -1230,7 +1230,7 @@ function executeWriteOperation(args, options, callback) { // increment and assign txnNumber if (willRetryWrite) { - options.txnNumber = getNextTransactionNumber(options.session); + incrementTransactionNumber(options.session); } return self.s.replicaSetState.primary[op](ns, ops, options, handler); diff --git a/lib/topologies/shared.js b/lib/topologies/shared.js index 7efab6904..6db60929d 100644 --- a/lib/topologies/shared.js +++ b/lib/topologies/shared.js @@ -425,9 +425,8 @@ const isRetryableWritesSupported = function(topology) { * * @param {ClientSession} session */ -const getNextTransactionNumber = function(session) { +const incrementTransactionNumber = function(session) { session.serverSession.txnNumber++; - return BSON.Long.fromNumber(session.serverSession.txnNumber); }; /** @@ -454,5 +453,5 @@ module.exports.diff = diff; module.exports.Interval = Interval; module.exports.Timeout = Timeout; module.exports.isRetryableWritesSupported = isRetryableWritesSupported; -module.exports.getNextTransactionNumber = getNextTransactionNumber; +module.exports.incrementTransactionNumber = incrementTransactionNumber; module.exports.relayEvents = relayEvents; diff --git a/lib/wireprotocol/3_2_support.js b/lib/wireprotocol/3_2_support.js index 409fad7e7..6adb030f0 100644 --- a/lib/wireprotocol/3_2_support.js +++ b/lib/wireprotocol/3_2_support.js @@ -1,14 +1,14 @@ 'use strict'; -var Query = require('../connection/commands').Query, - retrieveBSON = require('../connection/utils').retrieveBSON, - f = require('util').format, - MongoError = require('../error').MongoError, - MongoNetworkError = require('../error').MongoNetworkError, - getReadPreference = require('./shared').getReadPreference; +const Query = require('../connection/commands').Query; +const retrieveBSON = require('../connection/utils').retrieveBSON; +const f = require('util').format; +const MongoError = require('../error').MongoError; +const MongoNetworkError = require('../error').MongoNetworkError; +const getReadPreference = require('./shared').getReadPreference; -var BSON = retrieveBSON(), - Long = BSON.Long; +const BSON = retrieveBSON(); +const Long = BSON.Long; var WireProtocol = function(legacyWireProtocol) { this.legacyWireProtocol = legacyWireProtocol; @@ -57,8 +57,20 @@ var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callba } // optionally add a `txnNumber` if retryable writes are being attempted - if (typeof options.txnNumber !== 'undefined') { - writeCommand.txnNumber = options.txnNumber; + if (options.session && options.session.serverSession) { + const serverSession = options.session.serverSession; + if (serverSession.txnNumber) { + writeCommand.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber); + } + + if (typeof serverSession.stmtId !== 'undefined') { + writeCommand.stmtId = serverSession.stmtId; + + if (serverSession.stmtId === 0) { + writeCommand.startTransaction = true; + writeCommand.autocommit = false; + } + } } // Options object