From d4c15974a535e7732f9bd456cf678eafc6f572b0 Mon Sep 17 00:00:00 2001 From: Dan Aprahamian Date: Thu, 31 May 2018 18:15:54 -0400 Subject: [PATCH] feat(retryableWrites): adding more support for retries - Adds support for retryable commands that route through the topology `.commands` function instead of a designated function like `.insert`. This adds coverage for `findAndModify` commands. - Adds support for retrying on more types of errors, and retrying on writeConcern errors Fixes NODE-1456 --- lib/connection/pool.js | 22 +++++++++------- lib/error.js | 32 ++++++++++++++++++++++- lib/sessions.js | 31 +---------------------- lib/topologies/mongos.js | 40 ++++++++++++++++++++++++++--- lib/topologies/replset.js | 45 ++++++++++++++++++++++++++++++--- lib/wireprotocol/3_2_support.js | 2 +- 6 files changed, 125 insertions(+), 47 deletions(-) diff --git a/lib/connection/pool.js b/lib/connection/pool.js index 7d31ec272..0138c9a03 100644 --- a/lib/connection/pool.js +++ b/lib/connection/pool.js @@ -575,15 +575,19 @@ function messageHandler(self) { } // Establish if we have an error - if ( - workItem.command && - message.documents[0] && - (message.documents[0].ok === 0 || - message.documents[0]['$err'] || - message.documents[0]['errmsg'] || - message.documents[0]['code']) - ) { - return handleOperationCallback(self, workItem.cb, new MongoError(message.documents[0])); + if (workItem.command && message.documents[0]) { + const responseDoc = message.documents[0]; + if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) { + return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc)); + } + + if (responseDoc.writeConcernError) { + return handleOperationCallback( + self, + workItem.cb, + new MongoError(responseDoc.writeConcernError) + ); + } } // Add the connection details diff --git a/lib/error.js b/lib/error.js index 6a8c1a420..ba499e404 100644 --- a/lib/error.js +++ b/lib/error.js @@ -94,10 +94,40 @@ const MongoTimeoutError = function(message) { }; util.inherits(MongoTimeoutError, MongoError); +// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms +const RETRYABLE_ERROR_CODES = new Set([ + 6, // HostUnreachable + 7, // HostNotFound + 64, // WriteConcernFailed + 89, // NetworkTimeout + 91, // ShutdownInProgress + 189, // PrimarySteppedDown + 9001, // SocketException + 11600, // InterruptedAtShutdown + 11602, // InterruptedDueToReplStateChange + 10107, // NotMaster + 13435, // NotMasterNoSlaveOk + 13436 // NotMasterOrSecondary +]); + +function isRetryableError(error) { + if ( + RETRYABLE_ERROR_CODES.has(error.code) || + error instanceof MongoNetworkError || + error.message.match(/not master/) || + error.message.match(/node is recovering/) + ) { + return true; + } + + return false; +} + module.exports = { MongoError, MongoNetworkError, MongoParseError, MongoTimeoutError, - mongoErrorContextSymbol + mongoErrorContextSymbol, + isRetryableError }; diff --git a/lib/sessions.js b/lib/sessions.js index f06616a61..6ee0b458c 100644 --- a/lib/sessions.js +++ b/lib/sessions.js @@ -6,7 +6,7 @@ const BSON = retrieveBSON(); const Binary = BSON.Binary; const uuidV4 = require('./utils').uuidV4; const MongoError = require('./error').MongoError; -const MongoNetworkError = require('./error').MongoNetworkError; +const isRetryableError = require('././error').isRetryableError; function assertAlive(session, callback) { if (session.serverSession == null) { @@ -219,35 +219,6 @@ class ClientSession extends EventEmitter { } } -// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms -const RETRYABLE_ERROR_CODES = new Set([ - 6, // HostUnreachable - 7, // HostNotFound - 64, // WriteConcernFailed - 89, // NetworkTimeout - 91, // ShutdownInProgress - 189, // PrimarySteppedDown - 9001, // SocketException - 11600, // InterruptedAtShutdown - 11602, // InterruptedDueToReplStateChange - 10107, // NotMaster - 13435, // NotMasterNoSlaveOk - 13436 // NotMasterOrSecondary -]); - -function isRetryableError(error) { - if ( - RETRYABLE_ERROR_CODES.has(error.code) || - error instanceof MongoNetworkError || - error.message.match(/not master/) || - error.message.match(/node is recovering/) - ) { - return true; - } - - return false; -} - function resetTransactionState(clientSession) { clientSession.transactionOptions = null; } diff --git a/lib/topologies/mongos.js b/lib/topologies/mongos.js index d311da612..9de247d23 100644 --- a/lib/topologies/mongos.js +++ b/lib/topologies/mongos.js @@ -7,7 +7,6 @@ const BasicCursor = require('../cursor'); const Logger = require('../connection/logger'); const retrieveBSON = require('../connection/utils').retrieveBSON; const MongoError = require('../error').MongoError; -const errors = require('../error'); const Server = require('./server'); const clone = require('./shared').clone; const diff = require('./shared').diff; @@ -16,6 +15,7 @@ const createClientInfo = require('./shared').createClientInfo; const SessionMixins = require('./shared').SessionMixins; const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported; const relayEvents = require('./shared').relayEvents; +const isRetryableError = require('../error').isRetryableError; const BSON = retrieveBSON(); /** @@ -900,7 +900,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) { server[op](ns, ops, options, (err, result) => { if (!err) return callback(null, result); - if (!(err instanceof errors.MongoNetworkError) && !err.message.match(/not master/)) { + if (!isRetryableError(err)) { return callback(err); } @@ -1019,6 +1019,12 @@ Mongos.prototype.remove = function(ns, ops, options, callback) { executeWriteOperation(this, 'remove', ns, ops, options, callback); }; +const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete']; + +function isWriteCommand(command) { + return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]); +} + /** * Execute a command * @method @@ -1057,8 +1063,36 @@ Mongos.prototype.command = function(ns, cmd, options, callback) { var clonedOptions = cloneOptions(options); clonedOptions.topology = self; + const willRetryWrite = + !options.retrying && + options.retryWrites && + options.session && + isRetryableWritesSupported(self) && + !options.session.inTransaction() && + isWriteCommand(cmd); + + const cb = (err, result) => { + if (!err) return callback(null, result); + if (!isRetryableError(err)) { + return callback(err); + } + + if (willRetryWrite) { + const newOptions = Object.assign({}, clonedOptions, { retrying: true }); + return this.command(ns, cmd, newOptions, callback); + } + + return callback(err); + }; + + // increment and assign txnNumber + if (willRetryWrite) { + options.session.incrementTransactionNumber(); + options.willRetryWrite = willRetryWrite; + } + // Execute the command - server.command(ns, cmd, clonedOptions, callback); + server.command(ns, cmd, clonedOptions, cb); }; /** diff --git a/lib/topologies/replset.js b/lib/topologies/replset.js index cb07787c7..6a59feab0 100644 --- a/lib/topologies/replset.js +++ b/lib/topologies/replset.js @@ -8,7 +8,6 @@ const BasicCursor = require('../cursor'); const retrieveBSON = require('../connection/utils').retrieveBSON; const Logger = require('../connection/logger'); const MongoError = require('../error').MongoError; -const errors = require('../error'); const Server = require('./server'); const ReplSetState = require('./replset_state'); const clone = require('./shared').clone; @@ -18,6 +17,7 @@ const createClientInfo = require('./shared').createClientInfo; const SessionMixins = require('./shared').SessionMixins; const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported; const relayEvents = require('./shared').relayEvents; +const isRetryableError = require('../error').isRetryableError; const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders; @@ -1206,7 +1206,7 @@ function executeWriteOperation(args, options, callback) { const handler = (err, result) => { if (!err) return callback(null, result); - if (!(err instanceof errors.MongoNetworkError) && !err.message.match(/not master/)) { + if (!isRetryableError(err)) { return callback(err); } @@ -1298,6 +1298,12 @@ ReplSet.prototype.remove = function(ns, ops, options, callback) { executeWriteOperation({ self: this, op: 'remove', ns, ops }, options, callback); }; +const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete']; + +function isWriteCommand(command) { + return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]); +} + /** * Execute a command * @method @@ -1370,8 +1376,41 @@ ReplSet.prototype.command = function(ns, cmd, options, callback) { ); } + const willRetryWrite = + !options.retrying && + options.retryWrites && + options.session && + isRetryableWritesSupported(self) && + !options.session.inTransaction() && + isWriteCommand(cmd); + + const cb = (err, result) => { + if (!err) return callback(null, result); + if (!isRetryableError(err)) { + return callback(err); + } + + if (willRetryWrite) { + const newOptions = Object.assign({}, options, { retrying: true }); + return this.command(ns, cmd, newOptions, callback); + } + + // Per SDAM, remove primary from replicaset + if (this.s.replicaSetState.primary) { + this.s.replicaSetState.remove(this.s.replicaSetState.primary, { force: true }); + } + + return callback(err); + }; + + // increment and assign txnNumber + if (willRetryWrite) { + options.session.incrementTransactionNumber(); + options.willRetryWrite = willRetryWrite; + } + // Execute the command - server.command(ns, cmd, options, callback); + server.command(ns, cmd, options, cb); }; /** diff --git a/lib/wireprotocol/3_2_support.js b/lib/wireprotocol/3_2_support.js index 611490e71..ee652e8fb 100644 --- a/lib/wireprotocol/3_2_support.js +++ b/lib/wireprotocol/3_2_support.js @@ -350,7 +350,7 @@ WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, } // optionally decorate query with transaction data - decorateWithTransactionsData(query.query, options.session); + decorateWithTransactionsData(query.query, options.session, options.willRetryWrite); // We need to increment the statement id if we're in a transaction if (options.session && options.session.inTransaction()) {