Skip to content

Commit

Permalink
feat: support hedged reads
Browse files Browse the repository at this point in the history
Adds driver support for server Hedged Reads. Adds `hedge` property to the `ReadPreference` class and exports it using the `.toJSON` method. Introduces `withMonitoredClient` a shared testing utility for testing command monitoring. Tests command sent to the server and verifies that `hedge` property was propagated. Tests are only run on `>=3.6.0` due to how prior versions send operations to the server.

NODE-2510
  • Loading branch information
Thomas Reggi committed May 5, 2020
1 parent 665b352 commit 2b7b936
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 25 deletions.
11 changes: 10 additions & 1 deletion lib/read_preference.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
* @param {object[]} [tags] A tag set used to target reads to members with the specified tag(s). tagSet is not available if using read preference mode primary.
* @param {object} [options] Additional read preference options
* @param {number} [options.maxStalenessSeconds] Max secondary read staleness in seconds, Minimum value is 90 seconds.
* @param {object} [options.hedge] Server mode in which the same query is dispatched in parallel to multiple replica set members.
* @param {boolean} [options.hedge.enabled] Explicitly enable or disable hedged reads.
* @see https://docs.mongodb.com/manual/core/read-preference/
* @returns {ReadPreference}
*/
Expand All @@ -25,6 +27,7 @@ const ReadPreference = function(mode, tags, options) {

this.mode = mode;
this.tags = tags;
this.hedge = options && options.hedge;

options = options || {};
if (options.maxStalenessSeconds != null) {
Expand All @@ -47,6 +50,10 @@ const ReadPreference = function(mode, tags, options) {
if (this.maxStalenessSeconds) {
throw new TypeError('Primary read preference cannot be combined with maxStalenessSeconds');
}

if (this.hedge) {
throw new TypeError('Primary read preference cannot be combined with hedge');
}
}
};

Expand Down Expand Up @@ -96,7 +103,8 @@ ReadPreference.fromOptions = function(options) {
const mode = readPreference.mode || readPreference.preference;
if (mode && typeof mode === 'string') {
return new ReadPreference(mode, readPreference.tags, {
maxStalenessSeconds: readPreference.maxStalenessSeconds
maxStalenessSeconds: readPreference.maxStalenessSeconds,
hedge: options.hedge
});
}
}
Expand Down Expand Up @@ -160,6 +168,7 @@ ReadPreference.prototype.toJSON = function() {
const readPreference = { mode: this.mode };
if (Array.isArray(this.tags)) readPreference.tags = this.tags;
if (this.maxStalenessSeconds) readPreference.maxStalenessSeconds = this.maxStalenessSeconds;
if (this.hedge) readPreference.hedge = this.hedge;
return readPreference;
};

Expand Down
3 changes: 2 additions & 1 deletion lib/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,8 @@ function translateReadPreference(options) {
const mode = r.mode || r.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
maxStalenessSeconds: r.maxStalenessSeconds,
hedge: r.hedge
});
}
} else if (!(r instanceof ReadPreference)) {
Expand Down
3 changes: 2 additions & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var translateReadPreference = function(options) {
const mode = r.mode || r.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
maxStalenessSeconds: r.maxStalenessSeconds,
hedge: r.hedge
});
}
} else if (!(r instanceof ReadPreference)) {
Expand Down
149 changes: 127 additions & 22 deletions test/functional/readpreference.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';
var test = require('./shared').assert;
var setupDatabase = require('./shared').setupDatabase;

const test = require('./shared').assert;
const setupDatabase = require('./shared').setupDatabase;
const withMonitoredClient = require('./shared').withMonitoredClient;
const expect = require('chai').expect;
const { ReadPreference } = require('../..');

Expand Down Expand Up @@ -81,7 +83,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand All @@ -100,7 +102,7 @@ describe('ReadPreference', function() {

// Execute count
collection.count(function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.topology.command = command;

client.close(done);
Expand All @@ -117,7 +119,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand All @@ -139,7 +141,7 @@ describe('ReadPreference', function() {
collection.group([], {}, { count: 0 }, 'function (obj, prev) { prev.count++; }', function(
err
) {
test.equal(null, err);
expect(err).to.not.exist;
client.topology.command = command;

client.close(done);
Expand All @@ -156,7 +158,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand Down Expand Up @@ -184,7 +186,7 @@ describe('ReadPreference', function() {

// Perform the map reduce
collection.mapReduce(map, reduce, { out: { inline: 1 } }, function(/* err */) {
// test.equal(null, err);
// expect(err).to.not.exist;

// eslint-disable-line
client.topology.command = command;
Expand All @@ -205,7 +207,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand Down Expand Up @@ -234,7 +236,7 @@ describe('ReadPreference', function() {

// Perform the map reduce
collection.mapReduce(map, reduce, { out: 'inline' }, function(/* err */) {
// test.equal(null, err);
// expect(err).to.not.exist;
client.topology.command = command;
client.close(done);
});
Expand All @@ -251,7 +253,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand Down Expand Up @@ -283,7 +285,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand Down Expand Up @@ -334,7 +336,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand All @@ -353,7 +355,7 @@ describe('ReadPreference', function() {

// Perform the map reduce
collection.stats(function(/* err */) {
// test.equal(null, err);
// expect(err).to.not.exist;
client.topology.command = command;
client.close(done);
});
Expand Down Expand Up @@ -382,7 +384,7 @@ describe('ReadPreference', function() {
};

db.command({ dbStats: true }, function(err) {
test.equal(null, err);
expect(err).to.not.exist;

client.topology.command = function() {
var args = Array.prototype.slice.call(arguments, 0);
Expand All @@ -394,7 +396,7 @@ describe('ReadPreference', function() {
};

db.command({ dbStats: true }, { readPreference: 'secondaryPreferred' }, function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.topology.command = command;
client.close(done);
});
Expand All @@ -411,11 +413,11 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Create read preference object.
var mySecondaryPreferred = { mode: 'secondaryPreferred', tags: [] };
db.command({ dbStats: true }, { readPreference: mySecondaryPreferred }, function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.close(done);
});
});
Expand All @@ -430,11 +432,11 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Create read preference object.
var mySecondaryPreferred = { mode: 'secondaryPreferred', tags: [] };
db.listCollections({}, { readPreference: mySecondaryPreferred }).toArray(function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.close(done);
});
});
Expand All @@ -449,12 +451,12 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Create read preference object.
var mySecondaryPreferred = { mode: 'secondaryPreferred', tags: [] };
var cursor = db.collection('test').find({}, { readPreference: mySecondaryPreferred });
cursor.toArray(function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.close(done);
});
});
Expand Down Expand Up @@ -492,4 +494,107 @@ describe('ReadPreference', function() {
client.close(done);
});
});

context('hedge', function() {
it('should set hedge using [find option & empty hedge]', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: {} });
client
.db(this.configuration.db)
.collection('test')
.find({}, { readPreference: rp })
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: {} };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});

it('should set hedge using [.setReadPreference & empty hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: {} });
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: {} };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});

it('should set hedge using [.setReadPreference & enabled hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: { enabled: true } });
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: { enabled: true } };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});

it('should set hedge using [.setReadPreference & disabled hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, {
hedge: { enabled: false }
});
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: { enabled: false } };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});

it('should set hedge using [.setReadPreference & undefined hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null);
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});
});
});
20 changes: 20 additions & 0 deletions test/functional/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,32 @@ class EventCollector {
}
}

function withMonitoredClient(commands, callback) {
if (!Object.prototype.hasOwnProperty.call(callback, 'prototype')) {
throw new Error('withMonitoredClient callback can not be arrow function');
}
return function(done) {
const configuration = this.configuration;
const client = configuration.newClient({ monitorCommands: true });
const events = [];
client.on('commandStarted', filterForCommands(commands, events));
client.connect((err, client) => {
expect(err).to.not.exist;
function _done(err) {
client.close(err2 => done(err || err2));
}
callback.bind(this)(client, events, _done);
});
};
}

module.exports = {
connectToDb,
setupDatabase,
assert,
delay,
withClient,
withMonitoredClient,
filterForCommands,
filterOutCommands,
ignoreNsNotFound,
Expand Down
Loading

0 comments on commit 2b7b936

Please sign in to comment.