Skip to content

Commit

Permalink
fix(sdam): don't lose servers when they fail monitoring
Browse files Browse the repository at this point in the history
For legacy reasons the unified topology forced the connection pool
into auto reconnect mode by default. This caused failed server
checks to continue to emit errors on the server, causing the server
to lose track of its monitoring state, and never returning the node
to the pool of selectable servers. This results client-side as an
error about server selection timing out.

NODE-2274
  • Loading branch information
mbroadst committed Nov 5, 2019
1 parent 818055a commit 8a534bb
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 44 deletions.
11 changes: 2 additions & 9 deletions lib/core/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ function destroy(self, connections, options, callback) {
*/
Pool.prototype.destroy = function(force, callback) {
var self = this;

// Do not try again if the pool is already dead
if (this.state === DESTROYED || self.state === DESTROYING) {
if (typeof callback === 'function') callback(null, null);
Expand Down Expand Up @@ -958,15 +959,6 @@ function createConnection(pool, callback) {
pool.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
}

if (pool.options.legacyCompatMode === false) {
// The unified topology uses the reported `error` from a pool to track what error
// reason is returned to the user during selection timeout. We only want to emit
// this if the pool is active because the listeners are removed on destruction.
if (pool.state !== DESTROYED && pool.state !== DESTROYING) {
pool.emit('error', err);
}
}

// check if reconnect is enabled, and attempt retry if so
if (!pool.reconnectId && pool.options.reconnect) {
if (pool.state === CONNECTING && pool.options.legacyCompatMode) {
Expand Down Expand Up @@ -1044,6 +1036,7 @@ function _execute(self) {
// operations
if (self.connectingConnections > 0) {
self.executing = false;
setTimeout(() => _execute(self)(), 10);
return;
}

Expand Down
34 changes: 10 additions & 24 deletions lib/core/sdam/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,13 @@ class Server extends EventEmitter {
{ bson: this.s.bson }
);

// NOTE: this should only be the case if we are connecting to a single server
poolOptions.reconnect = true;
// NOTE: reconnect is explicitly false because of the server selection loop
poolOptions.reconnect = false;
poolOptions.legacyCompatMode = false;

this.s.pool = new Pool(this, poolOptions);

// setup listeners
this.s.pool.on('connect', connectEventHandler(this));
this.s.pool.on('close', errorEventHandler(this));
this.s.pool.on('error', errorEventHandler(this));
this.s.pool.on('parseError', parseErrorEventHandler(this));

// it is unclear whether consumers should even know about these events
Expand All @@ -169,14 +166,7 @@ class Server extends EventEmitter {
relayEvents(this.s.pool, this, ['commandStarted', 'commandSucceeded', 'commandFailed']);

stateTransition(this, STATE_CONNECTING);

// If auth settings have been provided, use them
if (options.auth) {
this.s.pool.connect.apply(this.s.pool, options.auth);
return;
}

this.s.pool.connect();
this.s.pool.connect(connectEventHandler(this));
}

/**
Expand Down Expand Up @@ -474,7 +464,13 @@ function executeWriteOperation(args, options, callback) {
}

function connectEventHandler(server) {
return function(pool, conn) {
return function(err, conn) {
if (err) {
server.emit('error', new MongoNetworkError(err));
server.emit('close');
return;
}

const ismaster = conn.ismaster;
server.s.lastIsMasterMS = conn.lastIsMasterMS;
if (conn.agreedCompressor) {
Expand Down Expand Up @@ -506,16 +502,6 @@ function connectEventHandler(server) {
};
}

function errorEventHandler(server) {
return function(err) {
if (err) {
server.emit('error', new MongoNetworkError(err));
}

server.emit('close');
};
}

function parseErrorEventHandler(server) {
return function(err) {
stateTransition(this, STATE_CLOSED);
Expand Down
66 changes: 55 additions & 11 deletions lib/core/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -894,15 +894,6 @@ function selectServers(topology, selector, timeout, start, callback) {
topology.s.monitorTimers.push(timer);
});

const descriptionChangedHandler = () => {
// successful iteration, clear the check timer
clearTimeout(iterationTimer);
topology.s.iterationTimers.splice(timerIndex, 1);

// topology description has changed due to monitoring, reattempt server selection
selectServers(topology, selector, timeout, start, callback);
};

const iterationTimer = setTimeout(() => {
topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler);
callback(
Expand All @@ -913,16 +904,25 @@ function selectServers(topology, selector, timeout, start, callback) {
);
}, timeout - duration);

const descriptionChangedHandler = () => {
// successful iteration, clear the check timer
removeTimerFrom(iterationTimer, topology.s.iterationTimers);
clearTimeout(iterationTimer);

// topology description has changed due to monitoring, reattempt server selection
selectServers(topology, selector, timeout, start, callback);
};

// track this timer in case we need to clean it up outside this loop
const timerIndex = topology.s.iterationTimers.push(iterationTimer);
topology.s.iterationTimers.push(iterationTimer);

topology.once('topologyDescriptionChanged', descriptionChangedHandler);
};

retrySelection();
}

function createAndConnectServer(topology, serverDescription) {
function createAndConnectServer(topology, serverDescription, connectDelay) {
topology.emit(
'serverOpening',
new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address)
Expand All @@ -934,10 +934,45 @@ function createAndConnectServer(topology, serverDescription) {
server.once('connect', serverConnectEventHandler(server, topology));
server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology));
server.on('error', serverErrorEventHandler(server, topology));

if (connectDelay) {
const connectTimer = setTimeout(() => {
removeTimerFrom(connectTimer, topology.s.iterationTimers);
server.connect();
}, connectDelay);

topology.s.iterationTimers.push(connectTimer);
return server;
}

server.connect();
return server;
}

function resetServer(topology, serverDescription) {
if (!topology.s.servers.has(serverDescription.address)) {
return;
}

// first remove the old server
const server = topology.s.servers.get(serverDescription.address);
destroyServer(server, topology);

// add the new server, and attempt connection after a delay
const newServer = createAndConnectServer(
topology,
serverDescription,
topology.s.heartbeatFrequencyMS
);

topology.s.servers.set(serverDescription.address, newServer);
}

function removeTimerFrom(timer, timers) {
const idx = timers.findIndex(t => t === timer);
timers.splice(idx, 1);
}

/**
* Create `Server` instances for all initially known servers, connect them, and assign
* them to the passed in `Topology`.
Expand All @@ -954,6 +989,15 @@ function connectServers(topology, serverDescriptions) {
}

function updateServers(topology, incomingServerDescription) {
// if the server was reset internally because of an error, we need to replace the
// `Server` instance for it so we can attempt reconnect.
//
// TODO: this logical can change once CMAP is put in place
if (incomingServerDescription && incomingServerDescription.error) {
resetServer(topology, incomingServerDescription);
return;
}

// update the internal server's description
if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) {
const server = topology.s.servers.get(incomingServerDescription.address);
Expand Down

0 comments on commit 8a534bb

Please sign in to comment.