Skip to content

Commit

Permalink
Merge branch '3.5'
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Feb 6, 2020
2 parents 265fe40 + 5bf0df8 commit 68170da
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 95 deletions.
60 changes: 43 additions & 17 deletions lib/core/sdam/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ const debugOptions = require('../connection/utils').debugOptions;
const isSDAMUnrecoverableError = require('../error').isSDAMUnrecoverableError;
const isNetworkTimeoutError = require('../error').isNetworkTimeoutError;
const isRetryableWriteError = require('../error').isRetryableWriteError;
const makeStateMachine = require('../utils').makeStateMachine;
const isNodeShuttingDownError = require('../error').isNodeShuttingDownError;
const maxWireVersion = require('../utils').maxWireVersion;
const makeStateMachine = require('../utils').makeStateMachine;
const common = require('./common');
const ServerType = common.ServerType;

Expand Down Expand Up @@ -143,15 +144,7 @@ class Server extends EventEmitter {
this.s.pool.clear();
});

this[kMonitor].on('resetServer', error => {
// Revert to an `Unknown` state by emitting a default description with no isMaster, and the
// error from the heartbeat attempt
this.emit(
'descriptionReceived',
new ServerDescription(this.description.address, null, { error })
);
});

this[kMonitor].on('resetServer', error => markServerUnknown(this, error));
this[kMonitor].on('serverHeartbeatSucceeded', event => {
this.emit(
'descriptionReceived',
Expand Down Expand Up @@ -282,7 +275,11 @@ class Server extends EventEmitter {
}

this.s.pool.withConnection((err, conn, cb) => {
if (err) return cb(err);
if (err) {
markServerUnknown(this, err);
return cb(err);
}

conn.command(ns, cmd, options, makeOperationHandler(this, options, cb));
}, callback);
}
Expand All @@ -302,7 +299,11 @@ class Server extends EventEmitter {
}

this.s.pool.withConnection((err, conn, cb) => {
if (err) return cb(err);
if (err) {
markServerUnknown(this, err);
return cb(err);
}

conn.query(ns, cmd, cursorState, options, makeOperationHandler(this, options, cb));
}, callback);
}
Expand All @@ -322,7 +323,11 @@ class Server extends EventEmitter {
}

this.s.pool.withConnection((err, conn, cb) => {
if (err) return cb(err);
if (err) {
markServerUnknown(this, err);
return cb(err);
}

conn.getMore(ns, cursorState, batchSize, options, makeOperationHandler(this, options, cb));
}, callback);
}
Expand All @@ -344,7 +349,11 @@ class Server extends EventEmitter {
}

this.s.pool.withConnection((err, conn, cb) => {
if (err) return cb(err);
if (err) {
markServerUnknown(this, err);
return cb(err);
}

conn.killCursors(ns, cursorState, makeOperationHandler(this, null, cb));
}, callback);
}
Expand Down Expand Up @@ -447,11 +456,22 @@ function executeWriteOperation(args, options, callback) {
}

server.s.pool.withConnection((err, conn, cb) => {
if (err) return cb(err);
if (err) {
markServerUnknown(server, err);
return cb(err);
}

conn[op](ns, ops, options, makeOperationHandler(server, options, cb));
}, callback);
}

function markServerUnknown(server, error) {
server.emit(
'descriptionReceived',
new ServerDescription(server.description.address, null, { error })
);
}

function makeOperationHandler(server, options, callback) {
return function handleOperationResult(err, result) {
if (err) {
Expand All @@ -465,7 +485,8 @@ function makeOperationHandler(server, options, callback) {
}

if (!isNetworkTimeoutError(err)) {
server.emit('error', err);
markServerUnknown(server, err);
server.s.pool.clear();
}
} else {
// if pre-4.4 server, then add error label if its a retryable write error
Expand All @@ -474,7 +495,12 @@ function makeOperationHandler(server, options, callback) {
}

if (isSDAMUnrecoverableError(err)) {
server.emit('error', err);
if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(err)) {
server.s.pool.clear();
}

markServerUnknown(server, err);
process.nextTick(() => server.requestCheck());
}
}
}
Expand Down
74 changes: 6 additions & 68 deletions lib/core/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ const CoreCursor = require('../cursor').CoreCursor;
const deprecate = require('util').deprecate;
const BSON = require('../connection/utils').retrieveBSON();
const createCompressionInfo = require('../topologies/shared').createCompressionInfo;
const isNodeShuttingDownError = require('../error').isNodeShuttingDownError;
const maxWireVersion = require('../utils').maxWireVersion;
const ClientSession = require('../sessions').ClientSession;
const MongoError = require('../error').MongoError;
const MongoServerSelectionError = require('../error').MongoServerSelectionError;
Expand Down Expand Up @@ -54,7 +52,7 @@ const SERVER_RELAY_EVENTS = [
].concat(CMAP_EVENT_NAMES);

// all events we listen to from `Server` instances
const LOCAL_SERVER_EVENTS = ['error', 'connect', 'descriptionReceived', 'close', 'ended'];
const LOCAL_SERVER_EVENTS = ['connect', 'descriptionReceived', 'close', 'ended'];

const STATE_CLOSING = common.STATE_CLOSING;
const STATE_CLOSED = common.STATE_CLOSED;
Expand Down Expand Up @@ -278,7 +276,7 @@ class Topology extends EventEmitter {

translateReadPreference(options);
const readPreference = options.readPreference || ReadPreference.primary;
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
this.selectServer(readPreferenceServerSelector(readPreference), options, err => {
if (err) {
this.close();

Expand All @@ -291,28 +289,11 @@ class Topology extends EventEmitter {
return;
}

const errorHandler = err => {
stateTransition(this, STATE_CLOSED);
server.removeListener('connect', connectHandler);
if (typeof callback === 'function') callback(err, null);
};

const connectHandler = (_, err) => {
stateTransition(this, STATE_CONNECTED);
server.removeListener('error', errorHandler);
this.emit('open', err, this);
this.emit('connect', this);

if (typeof callback === 'function') callback(err, this);
};

if (server.s.state === STATE_CONNECTING) {
server.once('error', errorHandler);
server.once('connect', connectHandler);
return;
}
stateTransition(this, STATE_CONNECTED);
this.emit('open', err, this);
this.emit('connect', this);

connectHandler();
if (typeof callback === 'function') callback(err, this);
});
}

Expand Down Expand Up @@ -793,9 +774,6 @@ function destroyServer(server, topology, options, callback) {
options = options || {};
LOCAL_SERVER_EVENTS.forEach(event => server.removeAllListeners(event));

// register a no-op for errors, we don't care now that we are destroying the server
server.on('error', () => {});

server.destroy(options, () => {
topology.emit(
'serverClosed',
Expand Down Expand Up @@ -842,7 +820,6 @@ function createAndConnectServer(topology, serverDescription, connectDelay) {
relayEvents(server, topology, SERVER_RELAY_EVENTS);

server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology));
server.on('error', serverErrorEventHandler(server, topology));

if (connectDelay) {
const connectTimer = setTimeout(() => {
Expand Down Expand Up @@ -903,21 +880,6 @@ function updateServers(topology, incomingServerDescription) {
}
}

function serverErrorEventHandler(server, topology) {
return function(err) {
if (topology.s.state === STATE_CLOSING || topology.s.state === STATE_CLOSED) {
return;
}

if (maxWireVersion(server) >= 8 && !isNodeShuttingDownError(err)) {
resetServerState(server, err);
return;
}

resetServerState(server, err, { clearPool: true });
};
}

function executeWriteOperation(args, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
Expand Down Expand Up @@ -975,30 +937,6 @@ function shouldRetryOperation(err) {
return err instanceof MongoError && err.hasErrorLabel('RetryableWriteError');
}

/**
* Resets the internal state of this server to `Unknown` by simulating an empty ismaster
*
* @private
* @param {Server} server
* @param {MongoError} error The error that caused the state reset
* @param {object} [options] Optional settings
* @param {boolean} [options.clearPool=false] Pool should be cleared out on state reset
*/
function resetServerState(server, error, options) {
options = Object.assign({}, { clearPool: false }, options);

if (options.clearPool && server.s.pool) {
server.s.pool.clear();
}

server.emit(
'descriptionReceived',
new ServerDescription(server.description.address, null, { error })
);

process.nextTick(() => server.requestCheck());
}

function translateReadPreference(options) {
if (options.readPreference == null) {
return;
Expand Down
2 changes: 1 addition & 1 deletion test/unit/sdam/srv_polling.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ describe('Mongos SRV Polling', function() {
});
});

srvPoller.trigger(recordSets[1]);
process.nextTick(() => srvPoller.trigger(recordSets[1]));
} catch (e) {
done(e);
}
Expand Down
18 changes: 9 additions & 9 deletions test/unit/sdam/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,16 @@ describe('Topology (unit)', function() {
expect(err).to.not.exist;
this.defer(() => topology.close());

let serverError;
server.on('error', err => (serverError = err));
let serverDescription;
server.on('descriptionReceived', sd => (serverDescription = sd));

let poolCleared = false;
topology.on('connectionPoolCleared', () => (poolCleared = true));

server.command('test.test', { insert: { a: 42 } }, (err, result) => {
expect(result).to.not.exist;
expect(err).to.exist;
expect(err).to.eql(serverError);
expect(err).to.eql(serverDescription.error);
expect(poolCleared).to.be.true;
done();
});
Expand Down Expand Up @@ -176,16 +176,16 @@ describe('Topology (unit)', function() {
expect(err).to.not.exist;
this.defer(() => topology.close());

let serverError;
server.on('error', err => (serverError = err));
let serverDescription;
server.on('descriptionReceived', sd => (serverDescription = sd));

let poolCleared = false;
topology.on('connectionPoolCleared', () => (poolCleared = true));

server.command('test.test', { insert: { a: 42 } }, (err, result) => {
expect(result).to.not.exist;
expect(err).to.exist;
expect(err).to.eql(serverError);
expect(err).to.eql(serverDescription.error);
expect(poolCleared).to.be.false;
done();
});
Expand Down Expand Up @@ -213,13 +213,13 @@ describe('Topology (unit)', function() {
expect(err).to.not.exist;
this.defer(() => topology.close());

let serverError;
server.on('error', err => (serverError = err));
let serverDescription;
server.on('descriptionReceived', sd => (serverDescription = sd));

server.command('test.test', { insert: { a: 42 } }, (err, result) => {
expect(result).to.not.exist;
expect(err).to.exist;
expect(err).to.eql(serverError);
expect(err).to.eql(serverDescription.error);
expect(server.description.type).to.equal('Unknown');
done();
});
Expand Down

0 comments on commit 68170da

Please sign in to comment.