diff --git a/lib/core/sdam/server.js b/lib/core/sdam/server.js index 4a4ee864b1..c23674f89a 100644 --- a/lib/core/sdam/server.js +++ b/lib/core/sdam/server.js @@ -15,8 +15,9 @@ const debugOptions = require('../connection/utils').debugOptions; const isSDAMUnrecoverableError = require('../error').isSDAMUnrecoverableError; const isNetworkTimeoutError = require('../error').isNetworkTimeoutError; const isRetryableWriteError = require('../error').isRetryableWriteError; -const makeStateMachine = require('../utils').makeStateMachine; +const isNodeShuttingDownError = require('../error').isNodeShuttingDownError; const maxWireVersion = require('../utils').maxWireVersion; +const makeStateMachine = require('../utils').makeStateMachine; const common = require('./common'); const ServerType = common.ServerType; @@ -143,15 +144,7 @@ class Server extends EventEmitter { this.s.pool.clear(); }); - this[kMonitor].on('resetServer', error => { - // Revert to an `Unknown` state by emitting a default description with no isMaster, and the - // error from the heartbeat attempt - this.emit( - 'descriptionReceived', - new ServerDescription(this.description.address, null, { error }) - ); - }); - + this[kMonitor].on('resetServer', error => markServerUnknown(this, error)); this[kMonitor].on('serverHeartbeatSucceeded', event => { this.emit( 'descriptionReceived', @@ -282,7 +275,11 @@ class Server extends EventEmitter { } this.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(this, err); + return cb(err); + } + conn.command(ns, cmd, options, makeOperationHandler(this, options, cb)); }, callback); } @@ -302,7 +299,11 @@ class Server extends EventEmitter { } this.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(this, err); + return cb(err); + } + conn.query(ns, cmd, cursorState, options, makeOperationHandler(this, options, cb)); }, callback); } @@ -322,7 +323,11 @@ class Server extends EventEmitter { } this.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(this, err); + return cb(err); + } + conn.getMore(ns, cursorState, batchSize, options, makeOperationHandler(this, options, cb)); }, callback); } @@ -344,7 +349,11 @@ class Server extends EventEmitter { } this.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(this, err); + return cb(err); + } + conn.killCursors(ns, cursorState, makeOperationHandler(this, null, cb)); }, callback); } @@ -447,11 +456,22 @@ function executeWriteOperation(args, options, callback) { } server.s.pool.withConnection((err, conn, cb) => { - if (err) return cb(err); + if (err) { + markServerUnknown(server, err); + return cb(err); + } + conn[op](ns, ops, options, makeOperationHandler(server, options, cb)); }, callback); } +function markServerUnknown(server, error) { + server.emit( + 'descriptionReceived', + new ServerDescription(server.description.address, null, { error }) + ); +} + function makeOperationHandler(server, options, callback) { return function handleOperationResult(err, result) { if (err) { @@ -465,7 +485,8 @@ function makeOperationHandler(server, options, callback) { } if (!isNetworkTimeoutError(err)) { - server.emit('error', err); + markServerUnknown(server, err); + server.s.pool.clear(); } } else { // if pre-4.4 server, then add error label if its a retryable write error @@ -474,7 +495,12 @@ function makeOperationHandler(server, options, callback) { } if (isSDAMUnrecoverableError(err)) { - server.emit('error', err); + if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(err)) { + server.s.pool.clear(); + } + + markServerUnknown(server, err); + process.nextTick(() => server.requestCheck()); } } } diff --git a/lib/core/sdam/topology.js b/lib/core/sdam/topology.js index 418effd07c..ed991abeea 100644 --- a/lib/core/sdam/topology.js +++ b/lib/core/sdam/topology.js @@ -14,8 +14,6 @@ const CoreCursor = require('../cursor').CoreCursor; const deprecate = require('util').deprecate; const BSON = require('../connection/utils').retrieveBSON(); const createCompressionInfo = require('../topologies/shared').createCompressionInfo; -const isNodeShuttingDownError = require('../error').isNodeShuttingDownError; -const maxWireVersion = require('../utils').maxWireVersion; const ClientSession = require('../sessions').ClientSession; const MongoError = require('../error').MongoError; const MongoServerSelectionError = require('../error').MongoServerSelectionError; @@ -54,7 +52,7 @@ const SERVER_RELAY_EVENTS = [ ].concat(CMAP_EVENT_NAMES); // all events we listen to from `Server` instances -const LOCAL_SERVER_EVENTS = ['error', 'connect', 'descriptionReceived', 'close', 'ended']; +const LOCAL_SERVER_EVENTS = ['connect', 'descriptionReceived', 'close', 'ended']; const STATE_CLOSING = common.STATE_CLOSING; const STATE_CLOSED = common.STATE_CLOSED; @@ -278,7 +276,7 @@ class Topology extends EventEmitter { translateReadPreference(options); const readPreference = options.readPreference || ReadPreference.primary; - this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { + this.selectServer(readPreferenceServerSelector(readPreference), options, err => { if (err) { this.close(); @@ -291,28 +289,11 @@ class Topology extends EventEmitter { return; } - const errorHandler = err => { - stateTransition(this, STATE_CLOSED); - server.removeListener('connect', connectHandler); - if (typeof callback === 'function') callback(err, null); - }; - - const connectHandler = (_, err) => { - stateTransition(this, STATE_CONNECTED); - server.removeListener('error', errorHandler); - this.emit('open', err, this); - this.emit('connect', this); - - if (typeof callback === 'function') callback(err, this); - }; - - if (server.s.state === STATE_CONNECTING) { - server.once('error', errorHandler); - server.once('connect', connectHandler); - return; - } + stateTransition(this, STATE_CONNECTED); + this.emit('open', err, this); + this.emit('connect', this); - connectHandler(); + if (typeof callback === 'function') callback(err, this); }); } @@ -793,9 +774,6 @@ function destroyServer(server, topology, options, callback) { options = options || {}; LOCAL_SERVER_EVENTS.forEach(event => server.removeAllListeners(event)); - // register a no-op for errors, we don't care now that we are destroying the server - server.on('error', () => {}); - server.destroy(options, () => { topology.emit( 'serverClosed', @@ -842,7 +820,6 @@ function createAndConnectServer(topology, serverDescription, connectDelay) { relayEvents(server, topology, SERVER_RELAY_EVENTS); server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology)); - server.on('error', serverErrorEventHandler(server, topology)); if (connectDelay) { const connectTimer = setTimeout(() => { @@ -903,21 +880,6 @@ function updateServers(topology, incomingServerDescription) { } } -function serverErrorEventHandler(server, topology) { - return function(err) { - if (topology.s.state === STATE_CLOSING || topology.s.state === STATE_CLOSED) { - return; - } - - if (maxWireVersion(server) >= 8 && !isNodeShuttingDownError(err)) { - resetServerState(server, err); - return; - } - - resetServerState(server, err, { clearPool: true }); - }; -} - function executeWriteOperation(args, options, callback) { if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; @@ -975,30 +937,6 @@ function shouldRetryOperation(err) { return err instanceof MongoError && err.hasErrorLabel('RetryableWriteError'); } -/** - * Resets the internal state of this server to `Unknown` by simulating an empty ismaster - * - * @private - * @param {Server} server - * @param {MongoError} error The error that caused the state reset - * @param {object} [options] Optional settings - * @param {boolean} [options.clearPool=false] Pool should be cleared out on state reset - */ -function resetServerState(server, error, options) { - options = Object.assign({}, { clearPool: false }, options); - - if (options.clearPool && server.s.pool) { - server.s.pool.clear(); - } - - server.emit( - 'descriptionReceived', - new ServerDescription(server.description.address, null, { error }) - ); - - process.nextTick(() => server.requestCheck()); -} - function translateReadPreference(options) { if (options.readPreference == null) { return; diff --git a/test/unit/sdam/srv_polling.test.js b/test/unit/sdam/srv_polling.test.js index e81340b11e..ccf824a197 100644 --- a/test/unit/sdam/srv_polling.test.js +++ b/test/unit/sdam/srv_polling.test.js @@ -361,7 +361,7 @@ describe('Mongos SRV Polling', function() { }); }); - srvPoller.trigger(recordSets[1]); + process.nextTick(() => srvPoller.trigger(recordSets[1])); } catch (e) { done(e); } diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index b45146cb6a..b84502ef75 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -139,8 +139,8 @@ describe('Topology (unit)', function() { expect(err).to.not.exist; this.defer(() => topology.close()); - let serverError; - server.on('error', err => (serverError = err)); + let serverDescription; + server.on('descriptionReceived', sd => (serverDescription = sd)); let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); @@ -148,7 +148,7 @@ describe('Topology (unit)', function() { server.command('test.test', { insert: { a: 42 } }, (err, result) => { expect(result).to.not.exist; expect(err).to.exist; - expect(err).to.eql(serverError); + expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.true; done(); }); @@ -176,8 +176,8 @@ describe('Topology (unit)', function() { expect(err).to.not.exist; this.defer(() => topology.close()); - let serverError; - server.on('error', err => (serverError = err)); + let serverDescription; + server.on('descriptionReceived', sd => (serverDescription = sd)); let poolCleared = false; topology.on('connectionPoolCleared', () => (poolCleared = true)); @@ -185,7 +185,7 @@ describe('Topology (unit)', function() { server.command('test.test', { insert: { a: 42 } }, (err, result) => { expect(result).to.not.exist; expect(err).to.exist; - expect(err).to.eql(serverError); + expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.false; done(); }); @@ -213,13 +213,13 @@ describe('Topology (unit)', function() { expect(err).to.not.exist; this.defer(() => topology.close()); - let serverError; - server.on('error', err => (serverError = err)); + let serverDescription; + server.on('descriptionReceived', sd => (serverDescription = sd)); server.command('test.test', { insert: { a: 42 } }, (err, result) => { expect(result).to.not.exist; expect(err).to.exist; - expect(err).to.eql(serverError); + expect(err).to.eql(serverDescription.error); expect(server.description.type).to.equal('Unknown'); done(); });