From ccc5e1dbb4e69f84b16e8381b7f5a38470facf04 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Mon, 23 Apr 2018 16:10:54 -0400 Subject: [PATCH] feat(server-selection): add basic support for server selection NODE-1259 --- lib/sdam/server_description.js | 8 +- lib/sdam/server_selectors.js | 168 +++++++++++++++++++++++++++++++ lib/sdam/topology.js | 91 +++++++++++++++-- lib/sdam/topology_description.js | 18 ++-- 4 files changed, 265 insertions(+), 20 deletions(-) create mode 100644 lib/sdam/server_selectors.js diff --git a/lib/sdam/server_description.js b/lib/sdam/server_description.js index 4e80e343c..2cf1f782c 100644 --- a/lib/sdam/server_description.js +++ b/lib/sdam/server_description.js @@ -23,11 +23,15 @@ class ServerDescription { * Create a ServerDescription * @param {String} address The address of the server * @param {Object} [ismaster] An optional ismaster response for this server + * @param {Object} [options] Optional settings + * @param {Number} [options.roundTripTime] The round trip time to ping this server (in ms) */ - constructor(address, ismaster) { + constructor(address, ismaster, options) { + options = options || {}; + this.address = address; this.error = null; - this.roundTripTime = null; + this.roundTripTime = options.roundTripTime; this.lastWriteDate = ismaster && ismaster.lastWrite ? ismaster.lastWrite.lasteWriteDate : null; this.opTime = ismaster && ismaster.lastWrite ? ismaster.lastWrite.opTime : null; this.type = parseServerType(ismaster); diff --git a/lib/sdam/server_selectors.js b/lib/sdam/server_selectors.js new file mode 100644 index 000000000..e6f7efa71 --- /dev/null +++ b/lib/sdam/server_selectors.js @@ -0,0 +1,168 @@ +'use strict'; +const ServerType = require('./server_description').ServerType; +const TopologyType = require('./topology_description').TopologyType; +const ReadPreference = require('../topologies/read_preference'); + +function writableServerSelector() { + return function(topologyDescription, servers) { + if (topologyDescription === TopologyType.ReplicaSetNoPrimary) return []; + if ( + topologyDescription.type === TopologyType.Sharded || + topologyDescription.type === TopologyType.Single + ) { + return servers; + } + + return servers.filter(s => s.isWritable); + }; +} + +// reducers +function maxStalenessReducer(readPreference, topologyDescription, servers) { + if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) { + return servers; + } + + if (topologyDescription.type === TopologyType.ReplicaSetWithPrimary) { + const primary = servers.filter(primaryFilter); + return servers.reduce((result, server) => { + const staleness = + server.lastUpdateTime - + server.lastWriteDate - + (primary.lastUpdateTime - primary.lastWriteDate) + + topologyDescription.heartbeatFrequencyMS; + + if (staleness <= readPreference.maxStalenessSeconds) result.push(server); + return result; + }, []); + } else if (topologyDescription.type === TopologyType.ReplicaSetNoPrimary) { + const sMax = servers.reduce((max, s) => (s.lastWriteDate > max.lastWriteDate ? s : max)); + return servers.reduce((result, server) => { + const staleness = + sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS; + if (staleness <= readPreference.maxStalenessSeconds) result.push(server); + return result; + }, []); + } + + return servers; +} + +function tagSetMatch(tagSet, serverTags) { + const keys = Object.keys(tagSet); + const serverTagKeys = Object.keys(serverTags); + for (let i = 0; i < keys.length; ++i) { + const key = keys[i]; + if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) { + return false; + } + } + + return true; +} + +function tagSetReducer(readPreference, servers) { + if ( + readPreference.tags == null || + (Array.isArray(readPreference.tags) && readPreference.tags.length === 0) + ) { + return servers; + } + + for (let i = 0; i < readPreference.tags.length; ++i) { + const tagSet = readPreference.tags[i]; + const serversMatchingTagset = servers.reduce((matched, server) => { + if (tagSetMatch(tagSet, server.tags)) matched.push(server); + return matched; + }, []); + + if (serversMatchingTagset.length) { + return serversMatchingTagset; + } + } + + return []; +} + +function latencyWindowReducer(readPreference, servers) { + return servers; +} + +// filters +function primaryFilter(server) { + return server.type === ServerType.RSPrimary; +} + +function secondaryFilter(server) { + return server.type === ServerType.RSSecondary; +} + +function nearestFilter(server) { + return server.type === ServerType.RSSecondary || server.type === ServerType.RSPrimary; +} + +function readPreferenceServerSelector(readPreference) { + if (!readPreference.isValid()) { + throw new TypeError('Invalid read preference specified'); + } + + return function(topologyDescription, servers) { + if ( + topologyDescription.type === TopologyType.Single || + topologyDescription.type === TopologyType.Sharded || + topologyDescription.type === TopologyType.Unknown + ) { + return servers; + } + + if (readPreference.mode === ReadPreference.PRIMARY) { + return servers.filter(s => s.type === ServerType.RSPrimary); + } + + if (readPreference.mode === ReadPreference.SECONDARY) { + return latencyWindowReducer( + readPreference, + tagSetReducer( + readPreference, + maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter)) + ) + ); + } else if (readPreference.mode === ReadPreference.NEAREST) { + return latencyWindowReducer( + readPreference, + tagSetReducer( + readPreference, + maxStalenessReducer(readPreference, topologyDescription, servers.filter(nearestFilter)) + ) + ); + } else if (readPreference.mode === ReadPreference.SECONDARY_PREFERRED) { + const result = latencyWindowReducer( + readPreference, + tagSetReducer( + readPreference, + maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter)) + ) + ); + + return result.length === 0 ? servers.filter(primaryFilter) : result; + } else if (readPreference.mode === ReadPreference.PRIMARY_PREFERRED) { + const result = servers.filter(primaryFilter); + if (result.length) { + return result; + } + + return latencyWindowReducer( + readPreference, + tagSetReducer( + readPreference, + maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter)) + ) + ); + } + }; +} + +module.exports = { + writableServerSelector, + readPreferenceServerSelector +}; diff --git a/lib/sdam/topology.js b/lib/sdam/topology.js index 3749b363b..046b2f3b3 100644 --- a/lib/sdam/topology.js +++ b/lib/sdam/topology.js @@ -4,10 +4,18 @@ const ServerDescription = require('./server_description').ServerDescription; const TopologyDescription = require('./topology_description').TopologyDescription; const TopologyType = require('./topology_description').TopologyType; const monitoring = require('./monitoring'); +const calculateDurationInMs = require('../utils').calculateDurationInMs; +const MongoTimeoutError = require('../error').MongoTimeoutError; +const MongoError = require('../error').MongoError; // Global state let globalTopologyCounter = 0; +// Constants +const DEFAULT_LOCAL_THRESHOLD_MS = 15; +const DEFAULT_HEARTBEAT_FREQUENCY = 10000; +const DEFAULT_SERVER_SELECTION_TIMEOUT = 30000; + /** * A container of server instances representing a connection to a MongoDB topology. * @@ -27,11 +35,22 @@ class Topology extends EventEmitter { * * @param {Array|String} seedlist a string list, or array of Server instances to connect to * @param {Object} [options] Optional settings + * @param {Number} [options.localThresholdMS=15] The size of the latency window for selecting among multiple suitable servers + * @param {Number} [options.serverSelectionTimeoutMS=30000] How long to block for server selection before throwing an error + * @param {Number} [options.heartbeatFrequencyMS=10000] The frequency with which topology updates are scheduled */ constructor(seedlist, options) { super(); seedlist = seedlist || []; - options = options || {}; + options = Object.assign( + {}, + { + localThresholdMS: DEFAULT_LOCAL_THRESHOLD_MS, + serverSelectionTimeoutMS: DEFAULT_SERVER_SELECTION_TIMEOUT, + heartbeatFrequencyMS: DEFAULT_HEARTBEAT_FREQUENCY + }, + options + ); const topologyType = seedlist.length === 1 && !options.replicaset @@ -62,7 +81,11 @@ class Topology extends EventEmitter { null, null, options - ) + ), + serverSelectionTimeoutMS: + options.serverSelectionTimeoutMS || DEFAULT_SERVER_SELECTION_TIMEOUT, + heartbeatFrequencyMS: options.heartbeatFrequencyMS || DEFAULT_HEARTBEAT_FREQUENCY, + ServerClass: options.ServerClass || null /* eventually our Server class, but null for now */ }; } @@ -111,17 +134,33 @@ class Topology extends EventEmitter { /** * Selects a server according to the selection predicate provided * - * @param {function} [predicate] An optional predicate to select servers by, defaults to a random selection within a latency window + * @param {function} [selector] An optional selector to select servers by, defaults to a random selection within a latency window * @return {Server} An instance of a `Server` meeting the criteria of the predicate provided */ - selectServer(/* predicate */) { - return; + selectServer(selector, options, callback) { + if (typeof options === 'function') (callback = options), (options = {}); + options = Object.assign( + {}, + { serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS }, + options + ); + + selectServers( + this, + selector, + options.serverSelectionTimeoutMS, + process.hrtime(), + (err, servers) => { + if (err) return callback(err, null); + callback(null, randomSelection(servers)); + } + ); } /** - * Update the topology with a ServerDescription + * Update the internal TopologyDescription with a ServerDescription * - * @param {object} serverDescription the server to update + * @param {object} serverDescription The server to update in the internal list of server descriptions */ update(serverDescription) { // these will be used for monitoring events later @@ -153,6 +192,44 @@ class Topology extends EventEmitter { } } +function randomSelection(array) { + return array[Math.floor(Math.random() * array.length)]; +} + +class FakeServer { + constructor(description) { + this.description = description; + } +} + +/** + * + * @param {*} topology + * @param {*} selector + * @param {*} options + * @param {*} callback + */ +function selectServers(topology, selector, timeout, start, callback) { + if (!topology.description.compatible) { + return callback(new MongoError(topology.description.compatibilityError)); + } + + const serverDescriptions = Array.from(topology.description.servers.values()); + let descriptions = selector(topology.description, serverDescriptions); + if (descriptions.length) { + // TODO: obviously return the actual server in the future + const servers = descriptions.map(d => new FakeServer(d)); + return callback(null, servers); + } + + const duration = calculateDurationInMs(process.hrtime(start)); + if (duration > timeout) { + return callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`)); + } + + // TODO: loop this, add monitoring +} + /** * A server opening SDAM monitoring event * diff --git a/lib/sdam/topology_description.js b/lib/sdam/topology_description.js index 5db07a7c3..dca099398 100644 --- a/lib/sdam/topology_description.js +++ b/lib/sdam/topology_description.js @@ -29,18 +29,12 @@ class TopologyDescription { * @param {number} maxSetVersion * @param {ObjectId} maxElectionId */ - constructor( - topologyType, - serverDescriptions, - setName, - maxSetVersion, - maxElectionId - /*, options */ - ) { + constructor(topologyType, serverDescriptions, setName, maxSetVersion, maxElectionId, options) { + options = options || {}; + // TODO: consider assigning all these values to a temporary value `s` which // we use `Object.freeze` on, ensuring the internal state of this type // is immutable. - this.type = topologyType || TopologyType.Unknown; this.setName = setName || null; this.maxSetVersion = maxSetVersion || null; @@ -50,6 +44,8 @@ class TopologyDescription { this.compatible = true; this.compatibilityError = null; this.logicalSessionTimeoutMinutes = null; + this.heartbeatFrequencyMS = options.heartbeatFrequencyMS || 0; + this.options = options; // determine server compatibility for (const serverDescription of this.servers.values()) { @@ -112,7 +108,7 @@ class TopologyDescription { setName, maxSetVersion, maxElectionId, - {} + this.options ); } @@ -192,7 +188,7 @@ class TopologyDescription { setName, maxSetVersion, maxElectionId, - {} + this.options ); }