Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support hedged reads #2350

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/read_preference.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
* @param {object[]} [tags] A tag set used to target reads to members with the specified tag(s). tagSet is not available if using read preference mode primary.
* @param {object} [options] Additional read preference options
* @param {number} [options.maxStalenessSeconds] Max secondary read staleness in seconds, Minimum value is 90 seconds.
* @param {object} [options.hedge] Server mode in which the same query is dispatched in parallel to multiple replica set members.
* @param {boolean} [options.hedge.enabled] Explicitly enable or disable hedged reads.
* @see https://docs.mongodb.com/manual/core/read-preference/
* @returns {ReadPreference}
*/
Expand All @@ -25,6 +27,7 @@ const ReadPreference = function(mode, tags, options) {

this.mode = mode;
this.tags = tags;
this.hedge = options && options.hedge;

options = options || {};
if (options.maxStalenessSeconds != null) {
Expand All @@ -47,6 +50,10 @@ const ReadPreference = function(mode, tags, options) {
if (this.maxStalenessSeconds) {
throw new TypeError('Primary read preference cannot be combined with maxStalenessSeconds');
}

if (this.hedge) {
throw new TypeError('Primary read preference cannot be combined with hedge');
}
}
};

Expand Down Expand Up @@ -96,7 +103,8 @@ ReadPreference.fromOptions = function(options) {
const mode = readPreference.mode || readPreference.preference;
if (mode && typeof mode === 'string') {
return new ReadPreference(mode, readPreference.tags, {
maxStalenessSeconds: readPreference.maxStalenessSeconds
maxStalenessSeconds: readPreference.maxStalenessSeconds,
hedge: options.hedge
});
}
}
Expand Down Expand Up @@ -160,6 +168,7 @@ ReadPreference.prototype.toJSON = function() {
const readPreference = { mode: this.mode };
if (Array.isArray(this.tags)) readPreference.tags = this.tags;
if (this.maxStalenessSeconds) readPreference.maxStalenessSeconds = this.maxStalenessSeconds;
if (this.hedge) readPreference.hedge = this.hedge;
return readPreference;
};

Expand Down
3 changes: 2 additions & 1 deletion lib/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,8 @@ function translateReadPreference(options) {
const mode = r.mode || r.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
maxStalenessSeconds: r.maxStalenessSeconds,
hedge: r.hedge
});
}
} else if (!(r instanceof ReadPreference)) {
Expand Down
3 changes: 2 additions & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var translateReadPreference = function(options) {
const mode = r.mode || r.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
maxStalenessSeconds: r.maxStalenessSeconds,
hedge: r.hedge
});
}
} else if (!(r instanceof ReadPreference)) {
Expand Down
149 changes: 127 additions & 22 deletions test/functional/readpreference.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';
var test = require('./shared').assert;
var setupDatabase = require('./shared').setupDatabase;

const test = require('./shared').assert;
const setupDatabase = require('./shared').setupDatabase;
const withMonitoredClient = require('./shared').withMonitoredClient;
const expect = require('chai').expect;
const { ReadPreference } = require('../..');

Expand Down Expand Up @@ -81,7 +83,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand All @@ -100,7 +102,7 @@ describe('ReadPreference', function() {

// Execute count
collection.count(function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.topology.command = command;

client.close(done);
Expand All @@ -117,7 +119,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand All @@ -139,7 +141,7 @@ describe('ReadPreference', function() {
collection.group([], {}, { count: 0 }, 'function (obj, prev) { prev.count++; }', function(
err
) {
test.equal(null, err);
expect(err).to.not.exist;
client.topology.command = command;

client.close(done);
Expand All @@ -156,7 +158,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand Down Expand Up @@ -184,7 +186,7 @@ describe('ReadPreference', function() {

// Perform the map reduce
collection.mapReduce(map, reduce, { out: { inline: 1 } }, function(/* err */) {
// test.equal(null, err);
// expect(err).to.not.exist;

// eslint-disable-line
client.topology.command = command;
Expand All @@ -205,7 +207,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand Down Expand Up @@ -234,7 +236,7 @@ describe('ReadPreference', function() {

// Perform the map reduce
collection.mapReduce(map, reduce, { out: 'inline' }, function(/* err */) {
// test.equal(null, err);
// expect(err).to.not.exist;
client.topology.command = command;
client.close(done);
});
Expand All @@ -251,7 +253,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand Down Expand Up @@ -283,7 +285,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand Down Expand Up @@ -334,7 +336,7 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Set read preference
var collection = db.collection('read_pref_1', {
readPreference: ReadPreference.SECONDARY_PREFERRED
Expand All @@ -353,7 +355,7 @@ describe('ReadPreference', function() {

// Perform the map reduce
collection.stats(function(/* err */) {
// test.equal(null, err);
// expect(err).to.not.exist;
client.topology.command = command;
client.close(done);
});
Expand Down Expand Up @@ -382,7 +384,7 @@ describe('ReadPreference', function() {
};

db.command({ dbStats: true }, function(err) {
test.equal(null, err);
expect(err).to.not.exist;

client.topology.command = function() {
var args = Array.prototype.slice.call(arguments, 0);
Expand All @@ -394,7 +396,7 @@ describe('ReadPreference', function() {
};

db.command({ dbStats: true }, { readPreference: 'secondaryPreferred' }, function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.topology.command = command;
client.close(done);
});
Expand All @@ -411,11 +413,11 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Create read preference object.
var mySecondaryPreferred = { mode: 'secondaryPreferred', tags: [] };
db.command({ dbStats: true }, { readPreference: mySecondaryPreferred }, function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.close(done);
});
});
Expand All @@ -430,11 +432,11 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Create read preference object.
var mySecondaryPreferred = { mode: 'secondaryPreferred', tags: [] };
db.listCollections({}, { readPreference: mySecondaryPreferred }).toArray(function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.close(done);
});
});
Expand All @@ -449,12 +451,12 @@ describe('ReadPreference', function() {
var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });
client.connect(function(err, client) {
var db = client.db(configuration.db);
test.equal(null, err);
expect(err).to.not.exist;
// Create read preference object.
var mySecondaryPreferred = { mode: 'secondaryPreferred', tags: [] };
var cursor = db.collection('test').find({}, { readPreference: mySecondaryPreferred });
cursor.toArray(function(err) {
test.equal(null, err);
expect(err).to.not.exist;
client.close(done);
});
});
Expand Down Expand Up @@ -492,4 +494,107 @@ describe('ReadPreference', function() {
client.close(done);
});
});

context('hedge', function() {
it('should set hedge using [find option & empty hedge]', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: {} });
client
.db(this.configuration.db)
.collection('test')
.find({}, { readPreference: rp })
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: {} };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});

it('should set hedge using [.setReadPreference & empty hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: {} });
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: {} };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});

it('should set hedge using [.setReadPreference & enabled hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: { enabled: true } });
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: { enabled: true } };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});

it('should set hedge using [.setReadPreference & disabled hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, {
hedge: { enabled: false }
});
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: { enabled: false } };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});

it('should set hedge using [.setReadPreference & undefined hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function(client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null);
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY };
expect(events[0])
.nested.property('command.$readPreference')
.to.deep.equal(expected);
done();
});
})
});
});
});
20 changes: 20 additions & 0 deletions test/functional/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,32 @@ class EventCollector {
}
}

function withMonitoredClient(commands, callback) {
if (!Object.prototype.hasOwnProperty.call(callback, 'prototype')) {
throw new Error('withMonitoredClient callback can not be arrow function');
}
return function(done) {
const configuration = this.configuration;
const client = configuration.newClient({ monitorCommands: true });
const events = [];
client.on('commandStarted', filterForCommands(commands, events));
client.connect((err, client) => {
expect(err).to.not.exist;
function _done(err) {
client.close(err2 => done(err || err2));
}
callback.bind(this)(client, events, _done);
});
};
}

module.exports = {
connectToDb,
setupDatabase,
assert,
delay,
withClient,
withMonitoredClient,
filterForCommands,
filterOutCommands,
ignoreNsNotFound,
Expand Down
Loading