From fc1a7750a7e904743a2b5dd508095a33baf01cd5 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Wed, 5 Feb 2020 18:01:25 -0500 Subject: [PATCH 1/3] refactor: errors before handshake should mark server unknown --- lib/core/sdam/server.js | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/lib/core/sdam/server.js b/lib/core/sdam/server.js index d60082a525..1c4aeeda79 100644 --- a/lib/core/sdam/server.js +++ b/lib/core/sdam/server.js @@ -279,7 +279,11 @@ class Server extends EventEmitter { } this.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(this, err); + return cb(err); + } + conn.command(ns, cmd, options, makeOperationHandler(this, options, cb)); }, callback); } @@ -299,7 +303,11 @@ class Server extends EventEmitter { } this.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(this, err); + return cb(err); + } + conn.query(ns, cmd, cursorState, options, makeOperationHandler(this, options, cb)); }, callback); } @@ -319,7 +327,11 @@ class Server extends EventEmitter { } this.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(this, err); + return cb(err); + } + conn.getMore(ns, cursorState, batchSize, options, makeOperationHandler(this, options, cb)); }, callback); } @@ -341,7 +353,11 @@ class Server extends EventEmitter { } this.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(this, err); + return cb(err); + } + conn.killCursors(ns, cursorState, makeOperationHandler(this, null, cb)); }, callback); } @@ -436,11 +452,22 @@ function executeWriteOperation(args, options, callback) { } server.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(server, err); + return cb(err); + } + conn[op](ns, ops, options, makeOperationHandler(server, options, cb)); }, callback); } +function markServerUnknown(server, error) { + server.emit( + 'descriptionReceived', + new ServerDescription(server.description.address, null, { error }) + ); +} + function makeOperationHandler(server, options, callback) { return function handleOperationResult(err, result) { if (err) { From 21195ce6729c8e24e74e39fa627854f34750e75c Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Thu, 6 Feb 2020 08:15:39 -0500 Subject: [PATCH 2/3] test: ensure OP_MSG exhaust test is only run on standalones --- test/functional/cmap/connection.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/functional/cmap/connection.test.js b/test/functional/cmap/connection.test.js index 7bfcf1d1b5..121fa94ec5 100644 --- a/test/functional/cmap/connection.test.js +++ b/test/functional/cmap/connection.test.js @@ -77,7 +77,7 @@ describe('Connection', function() { }); it('should support calling back multiple times on exhaust commands', { - metadata: { requires: { mongodb: '>=4.2.0' } }, + metadata: { requires: { mongodb: '>=4.2.0', topology: ['single'] } }, test: function(done) { const ns = `${this.configuration.db}.$cmd`; const connectOptions = Object.assign( From 5bf0df8c86a07c1c4ed03f33899147355867304b Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Thu, 6 Feb 2020 08:16:11 -0500 Subject: [PATCH 3/3] refactor: improve error handling in Server type This refactoring improves readability and reduces coupling between the `Server` and `Topology` types. Previously errors experienced at the `Server` level would emit an `error` event, which needed to be caught by the `Topology`. Since nothing about error handling required access to the `Topology` type, all of that behavior was moved back into the `Server` type. NODE-2449 --- lib/core/sdam/server.js | 22 ++++----- lib/core/sdam/topology.js | 74 +++--------------------------- test/unit/sdam/srv_polling.test.js | 2 +- test/unit/sdam/topology.test.js | 18 ++++---- 4 files changed, 27 insertions(+), 89 deletions(-) diff --git a/lib/core/sdam/server.js b/lib/core/sdam/server.js index 1c4aeeda79..b371a035d7 100644 --- a/lib/core/sdam/server.js +++ b/lib/core/sdam/server.js @@ -14,6 +14,8 @@ const collationNotSupported = require('../utils').collationNotSupported; const debugOptions = require('../connection/utils').debugOptions; const isSDAMUnrecoverableError = require('../error').isSDAMUnrecoverableError; const isNetworkTimeoutError = require('../error').isNetworkTimeoutError; +const isNodeShuttingDownError = require('../error').isNodeShuttingDownError; +const maxWireVersion = require('../utils').maxWireVersion; const makeStateMachine = require('../utils').makeStateMachine; const common = require('./common'); @@ -140,15 +142,7 @@ class Server extends EventEmitter { this.s.pool.clear(); }); - this[kMonitor].on('resetServer', error => { - // Revert to an `Unknown` state by emitting a default description with no isMaster, and the - // error from the heartbeat attempt - this.emit( - 'descriptionReceived', - new ServerDescription(this.description.address, null, { error }) - ); - }); - + this[kMonitor].on('resetServer', error => markServerUnknown(this, error)); this[kMonitor].on('serverHeartbeatSucceeded', event => { this.emit( 'descriptionReceived', @@ -477,10 +471,16 @@ function makeOperationHandler(server, options, callback) { } if (!isNetworkTimeoutError(err)) { - server.emit('error', err); + markServerUnknown(server, err); + server.s.pool.clear(); } } else if (isSDAMUnrecoverableError(err)) { - server.emit('error', err); + if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(err)) { + server.s.pool.clear(); + } + + markServerUnknown(server, err); + process.nextTick(() => server.requestCheck()); } } diff --git a/lib/core/sdam/topology.js b/lib/core/sdam/topology.js index 7c37dce8a1..d1caa828cd 100644 --- a/lib/core/sdam/topology.js +++ b/lib/core/sdam/topology.js @@ -15,8 +15,6 @@ const deprecate = require('util').deprecate; const BSON = require('../connection/utils').retrieveBSON(); const createCompressionInfo = require('../topologies/shared').createCompressionInfo; const isRetryableError = require('../error').isRetryableError; -const isNodeShuttingDownError = require('../error').isNodeShuttingDownError; -const maxWireVersion = require('../utils').maxWireVersion; const ClientSession = require('../sessions').ClientSession; const MongoError = require('../error').MongoError; const MongoServerSelectionError = require('../error').MongoServerSelectionError; @@ -55,7 +53,7 @@ const SERVER_RELAY_EVENTS = [ ].concat(CMAP_EVENT_NAMES); // all events we listen to from `Server` instances -const LOCAL_SERVER_EVENTS = ['error', 'connect', 'descriptionReceived', 'close', 'ended']; +const LOCAL_SERVER_EVENTS = ['connect', 'descriptionReceived', 'close', 'ended']; const STATE_CLOSING = common.STATE_CLOSING; const STATE_CLOSED = common.STATE_CLOSED; @@ -279,7 +277,7 @@ class Topology extends EventEmitter { translateReadPreference(options); const readPreference = options.readPreference || ReadPreference.primary; - this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { + this.selectServer(readPreferenceServerSelector(readPreference), options, err => { if (err) { this.close(); @@ -292,28 +290,11 @@ class Topology extends EventEmitter { return; } - const errorHandler = err => { - stateTransition(this, STATE_CLOSED); - server.removeListener('connect', connectHandler); - if (typeof callback === 'function') callback(err, null); - }; - - const connectHandler = (_, err) => { - stateTransition(this, STATE_CONNECTED); - server.removeListener('error', errorHandler); - this.emit('open', err, this); - this.emit('connect', this); - - if (typeof callback === 'function') callback(err, this); - }; - - if (server.s.state === STATE_CONNECTING) { - server.once('error', errorHandler); - server.once('connect', connectHandler); - return; - } + stateTransition(this, STATE_CONNECTED); + this.emit('open', err, this); + this.emit('connect', this); - connectHandler(); + if (typeof callback === 'function') callback(err, this); }); } @@ -794,9 +775,6 @@ function destroyServer(server, topology, options, callback) { options = options || {}; LOCAL_SERVER_EVENTS.forEach(event => server.removeAllListeners(event)); - // register a no-op for errors, we don't care now that we are destroying the server - server.on('error', () => {}); - server.destroy(options, () => { topology.emit( 'serverClosed', @@ -843,7 +821,6 @@ function createAndConnectServer(topology, serverDescription, connectDelay) { relayEvents(server, topology, SERVER_RELAY_EVENTS); server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology)); - server.on('error', serverErrorEventHandler(server, topology)); if (connectDelay) { const connectTimer = setTimeout(() => { @@ -904,21 +881,6 @@ function updateServers(topology, incomingServerDescription) { } } -function serverErrorEventHandler(server, topology) { - return function(err) { - if (topology.s.state === STATE_CLOSING || topology.s.state === STATE_CLOSED) { - return; - } - - if (maxWireVersion(server) >= 8 && !isNodeShuttingDownError(err)) { - resetServerState(server, err); - return; - } - - resetServerState(server, err, { clearPool: true }); - }; -} - function executeWriteOperation(args, options, callback) { if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; @@ -972,30 +934,6 @@ function executeWriteOperation(args, options, callback) { }); } -/** - * Resets the internal state of this server to `Unknown` by simulating an empty ismaster - * - * @private - * @param {Server} server - * @param {MongoError} error The error that caused the state reset - * @param {object} [options] Optional settings - * @param {boolean} [options.clearPool=false] Pool should be cleared out on state reset - */ -function resetServerState(server, error, options) { - options = Object.assign({}, { clearPool: false }, options); - - if (options.clearPool && server.s.pool) { - server.s.pool.clear(); - } - - server.emit( - 'descriptionReceived', - new ServerDescription(server.description.address, null, { error }) - ); - - process.nextTick(() => server.requestCheck()); -} - function translateReadPreference(options) { if (options.readPreference == null) { return; diff --git a/test/unit/sdam/srv_polling.test.js b/test/unit/sdam/srv_polling.test.js index e81340b11e..ccf824a197 100644 --- a/test/unit/sdam/srv_polling.test.js +++ b/test/unit/sdam/srv_polling.test.js @@ -361,7 +361,7 @@ describe('Mongos SRV Polling', function() { }); }); - srvPoller.trigger(recordSets[1]); + process.nextTick(() => srvPoller.trigger(recordSets[1])); } catch (e) { done(e); } diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index b45146cb6a..b84502ef75 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -139,8 +139,8 @@ describe('Topology (unit)', function() { expect(err).to.not.exist; this.defer(() => topology.close()); - let serverError; - server.on('error', err => (serverError = err)); + let serverDescription; + server.on('descriptionReceived', sd => (serverDescription = sd)); let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); @@ -148,7 +148,7 @@ describe('Topology (unit)', function() { server.command('test.test', { insert: { a: 42 } }, (err, result) => { expect(result).to.not.exist; expect(err).to.exist; - expect(err).to.eql(serverError); + expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.true; done(); }); @@ -176,8 +176,8 @@ describe('Topology (unit)', function() { expect(err).to.not.exist; this.defer(() => topology.close()); - let serverError; - server.on('error', err => (serverError = err)); + let serverDescription; + server.on('descriptionReceived', sd => (serverDescription = sd)); let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); @@ -185,7 +185,7 @@ describe('Topology (unit)', function() { server.command('test.test', { insert: { a: 42 } }, (err, result) => { expect(result).to.not.exist; expect(err).to.exist; - expect(err).to.eql(serverError); + expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.false; done(); }); @@ -213,13 +213,13 @@ describe('Topology (unit)', function() { expect(err).to.not.exist; this.defer(() => topology.close()); - let serverError; - server.on('error', err => (serverError = err)); + let serverDescription; + server.on('descriptionReceived', sd => (serverDescription = sd)); server.command('test.test', { insert: { a: 42 } }, (err, result) => { expect(result).to.not.exist; expect(err).to.exist; - expect(err).to.eql(serverError); + expect(err).to.eql(serverDescription.error); expect(server.description.type).to.equal('Unknown'); done(); });