Skip to content

Commit

Permalink
fix(topology): ensure selection wait queue is always processed
Browse files Browse the repository at this point in the history
There were two bugs here. First, processing the wait queue used a
dynamic upper bound while looping through members, resulting in not
all members actually being processed and subsequently timing out.
Second, if some sort of error occurred during applying the selector
to an array of server descriptions, processing would prematurely
stop, potentially leaving wait queue members stuck.

NODE-2467
  • Loading branch information
mbroadst committed Feb 24, 2020
1 parent 361bc1e commit bf701d6
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 6 deletions.
12 changes: 6 additions & 6 deletions lib/core/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,7 @@ class Topology extends EventEmitter {
}, serverSelectionTimeoutMS);
}

// place the member at the front of the wait queue
this[kWaitQueue].unshift(waitQueueMember);
this[kWaitQueue].push(waitQueueMember);
processWaitQueue(this);
}

Expand Down Expand Up @@ -980,7 +979,7 @@ function srvPollingHandler(topology) {

function drainWaitQueue(queue, err) {
while (queue.length) {
const waitQueueMember = queue.pop();
const waitQueueMember = queue.shift();
clearTimeout(waitQueueMember.timer);
if (!waitQueueMember[kCancelled]) {
waitQueueMember.callback(err);
Expand All @@ -996,7 +995,8 @@ function processWaitQueue(topology) {

const isSharded = topology.description.type === TopologyType.Sharded;
const serverDescriptions = Array.from(topology.description.servers.values());
for (let i = 0; i < topology[kWaitQueue].length; ++i) {
const membersToProcess = topology[kWaitQueue].length;
for (let i = 0; i < membersToProcess; ++i) {
const waitQueueMember = topology[kWaitQueue].shift();
if (waitQueueMember[kCancelled]) {
continue;
Expand All @@ -1011,12 +1011,12 @@ function processWaitQueue(topology) {
} catch (e) {
clearTimeout(waitQueueMember.timer);
waitQueueMember.callback(e);
break;
continue;
}

if (selectedDescriptions.length === 0) {
topology[kWaitQueue].push(waitQueueMember);
break;
continue;
}

const selectedServerDescription = randomSelection(selectedDescriptions);
Expand Down
58 changes: 58 additions & 0 deletions test/unit/sdam/server_selection/select_servers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,62 @@ describe('selectServer', function() {
});
});
});

describe('waitQueue', function() {
it('should process all wait queue members, including selection with errors', function(done) {
const topology = new Topology('someserver:27019');
const selectServer = this.sinon
.stub(Topology.prototype, 'selectServer')
.callsFake(function(selector, options, callback) {
const server = Array.from(this.s.servers.values())[0];
selectServer.restore();
callback(null, server);
});

this.sinon.stub(Server.prototype, 'connect').callsFake(function() {
this.s.state = 'connected';
this.emit('connect');
});

const toSelect = 10;
let completed = 0;
function finish() {
completed++;
console.log(completed);
if (completed === toSelect) done();
}

// methodology:
// - perform 9 server selections, a few with a selector that throws an error
// - ensure each selection immediately returns an empty result (gated by a boolean)
// guaranteeing tha the queue will be full before the last selection
// - make one last selection, but ensure that all selections are no longer blocked from
// returning their value
// - verify that 10 callbacks were called

topology.connect(err => {
expect(err).to.not.exist;

let preventSelection = true;
const anySelector = td => {
if (preventSelection) return [];
const server = Array.from(td.servers.values())[0];
return [server];
};

const failingSelector = () => {
if (preventSelection) return [];
throw new TypeError('bad news!');
};

preventSelection = true;
for (let i = 0; i < toSelect - 1; ++i) {
topology.selectServer(i % 5 === 0 ? failingSelector : anySelector, finish);
}

preventSelection = false;
topology.selectServer(anySelector, finish);
});
});
});
});

0 comments on commit bf701d6

Please sign in to comment.