From 98bdec24532233d0bb825fa3ebe5486a4634023c Mon Sep 17 00:00:00 2001 From: luin Date: Sun, 7 Feb 2016 10:22:14 +0800 Subject: [PATCH] feat(cluster): support scaling reads to slaves The new option scaleReads is used to specify where to send the reads. Add two new events: 1. "+node": a new node is discovered. 2. "-node": a node is disconnected. BREAKING CHANGE: 1. Cluster#masterNodes and Cluster#nodes is removed. Use Cluster#nodes('masters') and Cluster#nodes('all') instead. 2. Cluster#to() is removed. Use Promise.all(Cluster#nodes().map(function (node) {})) instead. Closes #170. --- API.md | 86 -------- lib/cluster/connection_pool.js | 89 ++++++++ lib/{cluster.js => cluster/index.js} | 316 ++++++++++----------------- lib/commander.js | 3 +- lib/pipeline.js | 15 +- lib/utils/index.js | 12 +- test/functional/cluster.js | 207 +++++++++--------- 7 files changed, 333 insertions(+), 395 deletions(-) create mode 100644 lib/cluster/connection_pool.js rename lib/{cluster.js => cluster/index.js} (67%) diff --git a/API.md b/API.md index d4211785..477f6c46 100644 --- a/API.md +++ b/API.md @@ -2,8 +2,6 @@
Redis[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
-
Cluster[EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
-
Commander
@@ -12,9 +10,6 @@
defaultOptions

Default options

-
defaultOptions
-

Default options

-
## Redis ⇐ [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter) @@ -190,81 +185,6 @@ Define a custom command using lua script Create a Redis instance **Kind**: static method of [Redis](#Redis) - -## Cluster ⇐ [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter) -**Kind**: global class -**Extends:** [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter), [Commander](#Commander) - -* [Cluster](#Cluster) ⇐ [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter) - * [new Cluster(startupNodes, options)](#new_Cluster_new) - * [.disconnect()](#Cluster+disconnect) - * [.getBuiltinCommands()](#Commander+getBuiltinCommands) ⇒ Array.<string> - * [.createBuiltinCommand(commandName)](#Commander+createBuiltinCommand) ⇒ object - * [.defineCommand(name, definition)](#Commander+defineCommand) - * *[.sendCommand()](#Commander+sendCommand)* - - -### new Cluster(startupNodes, options) -Creates a Redis Cluster instance - - -| Param | Type | Default | Description | -| --- | --- | --- | --- | -| startupNodes | Array.<Object> | | An array of nodes in the cluster, [{ port: number, host: string }] | -| options | Object | | | -| [options.enableOfflineQueue] | boolean | true | See Redis class | -| [options.lazyConnect] | boolean | false | See Redis class | -| [options.readOnly] | boolean | false | Connect in READONLY mode | -| [options.maxRedirections] | number | 16 | When a MOVED or ASK error is received, client will redirect the command to another node. This option limits the max redirections allowed to send a command. | -| [options.clusterRetryStrategy] | function | | See "Quick Start" section | -| [options.retryDelayOnFailover] | number | 2000 | When an error is received when sending a command(e.g. "Connection is closed." when the target Redis node is down), | -| [options.retryDelayOnClusterDown] | number | 1000 | When a CLUSTERDOWN error is received, client will retry if `retryDelayOnClusterDown` is valid delay time. | - - -### cluster.disconnect() -Disconnect from every node in the cluster. - -**Kind**: instance method of [Cluster](#Cluster) -**Access:** public - -### cluster.getBuiltinCommands() ⇒ Array.<string> -Return supported builtin commands - -**Kind**: instance method of [Cluster](#Cluster) -**Returns**: Array.<string> - command list -**Access:** public - -### cluster.createBuiltinCommand(commandName) ⇒ object -Create a builtin command - -**Kind**: instance method of [Cluster](#Cluster) -**Returns**: object - functions -**Access:** public - -| Param | Type | Description | -| --- | --- | --- | -| commandName | string | command name | - - -### cluster.defineCommand(name, definition) -Define a custom command using lua script - -**Kind**: instance method of [Cluster](#Cluster) - -| Param | Type | Default | Description | -| --- | --- | --- | --- | -| name | string | | the command name | -| definition | object | | | -| definition.lua | string | | the lua code | -| [definition.numberOfKeys] | number | | the number of keys. If omit, you have to pass the number of keys as the first argument every time you invoke the command | - - -### *cluster.sendCommand()* -Send a command - -**Kind**: instance abstract method of [Cluster](#Cluster) -**Overrides:** [sendCommand](#Commander+sendCommand) -**Access:** public ## Commander **Kind**: global class @@ -331,9 +251,3 @@ Default options **Kind**: global variable **Access:** protected - -## defaultOptions -Default options - -**Kind**: global variable -**Access:** protected diff --git a/lib/cluster/connection_pool.js b/lib/cluster/connection_pool.js new file mode 100644 index 00000000..26de8287 --- /dev/null +++ b/lib/cluster/connection_pool.js @@ -0,0 +1,89 @@ +'use strict'; + +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var _ = require('lodash'); +var Redis = require('../redis'); + +function ConnectionPool(redisOptions) { + EventEmitter.call(this); + this.redisOptions = redisOptions; + + // this.masters + this.slaves = this.nodes + this.nodes = {}; + this.masters = {}; + this.slaves = {}; + + this.specifiedOptions = {}; +} + +util.inherits(ConnectionPool, EventEmitter); + +ConnectionPool.prototype.findOrCreate = function (node, readOnly) { + node.port = node.port || 6379; + node.host = node.host || '127.0.0.1'; + node.key = node.key || node.host + ':' + node.port; + + if (this.specifiedOptions[node.key]) { + _.assign(node, this.specifiedOptions[node.key]); + } else { + this.specifiedOptions[node.key] = node; + } + + if (this.nodes[node.key] && this.nodes[node.key].options.readOnly !== readOnly) { + this.remove(node.key); + } + + if (!this.nodes[node.key]) { + var redis = this.nodes[node.key] = new Redis(_.defaults({ + retryStrategy: null, + enableOfflineQueue: true, + readOnly: readOnly + }, node, this.redisOptions, { lazyConnect: true })); + this[readOnly ? 'slaves' : 'masters'][node.key] = redis; + + var _this = this; + redis.once('end', function () { + delete _this.nodes[node.key]; + delete _this.masters[node.key]; + delete _this.slaves[node.key]; + _this.emit('-node', redis); + if (!Object.keys(_this.nodes).length) { + _this.emit('drain'); + } + }); + + this.emit('+node', redis); + } + + return this.nodes[node.key]; +}; + +ConnectionPool.prototype.remove = function (key) { + if (this.nodes[key]) { + this.nodes[key].disconnect(); + delete this.nodes[key]; + delete this.masters[key]; + delete this.slaves[key]; + } +}; + +ConnectionPool.prototype.reset = function (nodes) { + var newNodes = {}; + for (var i = 0; i < nodes.length; i++) { + var node = nodes[i]; + node.key = node.host + ':' + node.port; + newNodes[node.key] = node; + } + var _this = this; + Object.keys(this.nodes).forEach(function (key) { + if (!newNodes[key]) { + _this.remove(key); + } + }); + Object.keys(newNodes).forEach(function (key) { + _this.findOrCreate(newNodes[key], newNodes[key].readOnly); + }); +}; + +module.exports = ConnectionPool; diff --git a/lib/cluster.js b/lib/cluster/index.js similarity index 67% rename from lib/cluster.js rename to lib/cluster/index.js index bbc9b974..4759919d 100644 --- a/lib/cluster.js +++ b/lib/cluster/index.js @@ -2,15 +2,17 @@ var Promise = require('bluebird'); var Deque = require('double-ended-queue'); -var Redis = require('./redis'); -var utils = require('./utils'); +var Redis = require('../redis'); +var utils = require('../utils'); var util = require('util'); var EventEmitter = require('events').EventEmitter; var debug = require('debug')('ioredis:cluster'); var _ = require('lodash'); -var ScanStream = require('./scan_stream'); -var Commander = require('./commander'); -var Command = require('./command'); +var ScanStream = require('../scan_stream'); +var Commander = require('../commander'); +var Command = require('../command'); +var commands = require('../../commands'); +var ConnectionPool = require('./connection_pool'); /** * Creates a Redis Cluster instance @@ -20,7 +22,8 @@ var Command = require('./command'); * @param {Object} options * @param {boolean} [options.enableOfflineQueue=true] - See Redis class * @param {boolean} [options.lazyConnect=false] - See Redis class - * @param {boolean} [options.readOnly=false] - Connect in READONLY mode + * @param {string} [options.scaleReads="masters"] - Scale reads to the node with the specified role. + * Available values are "masters", "slaves" and "all". * @param {number} [options.maxRedirections=16] - When a MOVED or ASK error is received, client will redirect the * command to another node. This option limits the max redirections allowed to send a command. * @param {function} [options.clusterRetryStrategy] - See "Quick Start" section @@ -35,9 +38,19 @@ function Cluster(startupNodes, options) { EventEmitter.call(this); Commander.call(this); + this.options = _.defaults(this.options, options, Cluster.defaultOptions); + + // validate options + if (['all', 'masters', 'slaves'].indexOf(this.options.scaleReads) === -1) { + throw new Error('Invalid option scaleReads "' + this.options.scaleReads + + '". Expected "all", "masters" or "slaves"'); + } + if (!Array.isArray(startupNodes) || startupNodes.length === 0) { throw new Error('`startupNodes` should contain at least one node.'); } + + this.connectionPool = new ConnectionPool(_.defaults({}, this.options.redisOptions, Redis.defaultOptions)); this.startupNodes = startupNodes.map(function (node) { var options = {}; if (typeof node === 'object') { @@ -56,12 +69,23 @@ function Cluster(startupNodes, options) { return options; }); - this.nodes = {}; - this.masterNodes = {}; + var _this = this; + this.connectionPool.on('-node', function (redis) { + _this.emit('-node'); + if (_this.subscriber === redis) { + _this.selectSubscriber(); + } + }); + this.connectionPool.on('+node', function (redis) { + _this.emit('+node', redis); + }); + this.connectionPool.on('drain', function () { + _this.setStatus('close'); + }); + this.slots = []; this.retryAttempts = 0; - this.options = _.defaults({}, options || {}, this.options || {}, Cluster.defaultOptions); this.resetOfflineQueue(); this.resetFailoverQueue(); @@ -78,15 +102,16 @@ function Cluster(startupNodes, options) { * @var defaultOptions * @protected */ -Cluster.defaultOptions = _.assign({}, Redis.defaultOptions, { +Cluster.defaultOptions = { maxRedirections: 16, retryDelayOnFailover: 2000, retryDelayOnClusterDown: 1000, - readOnly: false, + scaleReads: 'masters', + enableOfflineQueue: true, clusterRetryStrategy: function (times) { return Math.min(100 + times * 2, 2000); } -}); +}; util.inherits(Cluster, EventEmitter); _.assign(Cluster.prototype, Commander.prototype); @@ -111,6 +136,8 @@ Cluster.prototype.connect = function () { } this.setStatus('connecting'); + this.connectionPool.reset(this.startupNodes); + var closeListener; var refreshListener = function () { this.removeListener('close', closeListener); @@ -148,15 +175,12 @@ Cluster.prototype.connect = function () { } }); - this.startupNodes.forEach(function (options) { - this.createNode(options.port, options.host); - }, this); this.refreshSlotsCache(function (err) { if (err && err.message === 'Failed to refresh slots cache.') { Redis.prototype.silentEmit.call(this, 'error', err); - var keys = Object.keys(this.nodes); + var keys = Object.keys(this.connectionPool.nodes); for (var i = 0; i < keys.length; ++i) { - this.nodes[keys[i]].disconnect(); + this.connectionPool.nodes[keys[i]].disconnect(); } } }.bind(this)); @@ -177,81 +201,24 @@ Cluster.prototype.disconnect = function (reconnect) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } - var keys = Object.keys(this.nodes); - for (var i = 0; i < keys.length; ++i) { - this.nodes[keys[i]].disconnect(); - } + this.nodes().forEach(function (node) { + node.disconnect(); + }); }; -/** - * Create a connection and add it to the connection list - * - * @param {number} port - * @param {string} host - * @return {Redis} A redis instance - * @private - */ -Cluster.prototype.createNode = function (port, host) { - var nodeOpt = _.defaults({ - port: port, - host: host || '127.0.0.1', - retryStrategy: null - }, Redis.defaultOptions); - - var key = nodeOpt.host + ':' + nodeOpt.port; - - if (!this.nodes[key]) { - // Fetch password from startupNodes option - delete nodeOpt.password; - for (var i = 0; i < this.startupNodes.length; i++) { - var node = this.startupNodes[i]; - if (node.port === nodeOpt.port && node.host === nodeOpt.host) { - nodeOpt.password = node.password; - break; - } - } - - this.nodes[key] = new Redis(_.assign({}, this.options, nodeOpt)); - - var _this = this; - if (this.options.readOnly) { - this.nodes[key].once('ready', function () { - debug('sending readonly to %s', key); - _this.nodes[key].readonly(); - }); - } - this.nodes[key].once('end', function () { - var deadNode = _this.nodes[key]; - delete _this.nodes[key]; - delete _this.masterNodes[key]; - if (_this.subscriber === deadNode) { - _this.selectSubscriber(); - } - if (Object.keys(_this.nodes).length === 0) { - _this.setStatus('close'); - } - }); +Cluster.prototype.nodes = function (type) { + if (!type) { + type = 'all'; } - - return this.nodes[key]; -}; - -Cluster.prototype.selectRandomMasterNode = function () { - return this.nodes[_.sample(Object.keys(this.masterNodes))]; -}; - -Cluster.prototype.selectRandomNode = function () { - var keys = Object.keys(this.nodes); - return (keys.length > 0) ? this.nodes[_.sample(keys)] : null; -}; - -Cluster.prototype.selectRandomNodeForSlot = function (targetSlot) { - return _.sample(this.slots[targetSlot].allNodes); + if (type !== 'all' && type !== 'masters' && type !== 'slaves') { + throw new Error('Invalid type "' + type + '". Expected "all", "masters" or "slaves"'); + } + return _.values(this.connectionPool[type === 'all' ? 'nodes' : type]); }; Cluster.prototype.selectSubscriber = function () { - this.subscriber = this.selectRandomNode(); - if (this.subscriber === null) { + this.subscriber = _.sample(this.connectionPool.nodes); + if (!this.subscriber) { return; } // Re-subscribe previous channels @@ -319,7 +286,7 @@ Cluster.prototype.refreshSlotsCache = function (callback) { } }; - var keys = _.shuffle(Object.keys(this.nodes)); + var keys = _.shuffle(Object.keys(this.connectionPool.nodes)); var lastNodeError = null; @@ -330,7 +297,7 @@ Cluster.prototype.refreshSlotsCache = function (callback) { return wrapper(error); } debug('getting slot cache from %s', keys[index]); - _this.getInfoFromNode(_this.nodes[keys[index]], function (err) { + _this.getInfoFromNode(_this.connectionPool.nodes[keys[index]], function (err) { if (_this.status === 'end') { return wrapper(new Error('Cluster is disconnected.')); } @@ -398,87 +365,47 @@ Cluster.prototype.executeClusterDownCommands = function () { } }; -Cluster.prototype.to = function (name) { - var fnName = '_select' + name[0].toUpperCase() + name.slice(1); - if (typeof this[fnName] !== 'function') { - // programmatic error, can't happen in prod, so throw - throw new Error('to ' + name + ' is not a valid group of nodes'); - } - - // could be 0 nodes just as well - var nodes = this[fnName](); - return { - nodes: nodes, - call: this._generateCallNodes(nodes, 'call'), - callBuffer: this._generateCallNodes(nodes, 'callBuffer') - }; -}; - -Cluster.prototype._generateCallNodes = function (nodes, op, _opts) { - var opts = _opts || {}; - - return function callNode() { - var argLength = arguments.length; - var hasCb = typeof arguments[argLength - 1] === 'function'; - var args = new Array(argLength); - for (var i = 0; i < argLength; ++i) { - args[i] = arguments[i]; - } - - var callback = hasCb ? args.pop() : null; - var promise = Promise.map(nodes, function (node) { - return node[op].apply(node, args); - }, opts); - - if (callback) { - return promise.nodeify(callback); - } - - return promise; - }; -}; - -Cluster.prototype._selectAll = function () { - return _.values(this.nodes); -}; - -Cluster.prototype._selectMasters = function () { - return _.values(this.masterNodes); -}; - -Cluster.prototype._selectSlaves = function () { - return _.difference(this._selectAll(), this._selectMasters()); -}; - Cluster.prototype.sendCommand = function (command, stream, node) { if (this.status === 'end') { command.reject(new Error('Connection is closed.')); return command.promise; } + var to = this.options.scaleReads; + if (to !== 'masters') { + var flags = commands[command.name] && commands[command.name].flags; + var isCommandReadOnly = false; + for (var i = 0; i < flags.length; i++) { + if (flags[i] === 'readonly') { + isCommandReadOnly = true; + break; + } + } + if (!isCommandReadOnly) { + to = 'masters'; + } + } var targetSlot = node ? node.slot : command.getSlot(); var ttl = {}; - var reject = command.reject; var _this = this; if (!node) { + var reject = command.reject; + var partialTry = _.partial(tryConnection, true); command.reject = function (err) { - var partialTry = _.partial(tryConnection, true); - _this.handleError(err, ttl, { - moved: function (node, slot, hostPort) { - debug('command %s is moved to %s:%s', command.name, hostPort[0], hostPort[1]); - var coveredSlot = _this.slots[slot]; - if (!coveredSlot) { - _this.slots[slot] = { masterNode: node, allNodes: [node] }; + moved: function (slot, key) { + debug('command %s is moved to %s', command.name, key); + if (_this.slots[slot]) { + _this.slots[slot][0] = key; } else { - coveredSlot.masterNode = node; + _this.slots[slot] = [key]; } tryConnection(); _this.refreshSlotsCache(); }, - ask: function (node, slot, hostPort) { - debug('command %s is required to ask %s:%s', command.name, hostPort[0], hostPort[1]); - tryConnection(false, node); + ask: function (slot, key) { + debug('command %s is required to ask %s:%s', command.name, key); + tryConnection(false, key); }, clusterDown: partialTry, connectionClosed: partialTry, @@ -506,19 +433,27 @@ Cluster.prototype.sendCommand = function (command, stream, node) { _.includes(Command.FLAGS.EXIT_SUBSCRIBER_MODE, command.name)) { redis = _this.subscriber; } else { - if (typeof targetSlot === 'number' && _this.slots[targetSlot]) { - if (_this.options.readOnly) { - redis = _this.selectRandomNodeForSlot(targetSlot); - } else { - redis = _this.slots[targetSlot].masterNode; + if (!random) { + if (typeof targetSlot === 'number' && _this.slots[targetSlot]) { + var nodeKeys = _this.slots[targetSlot]; + var key; + if (to === 'all') { + key = utils.sample(nodeKeys); + } else if (to === 'slaves' && nodeKeys.length > 1) { + key = utils.sample(nodeKeys, 1); + } else { + key = nodeKeys[0]; + } + redis = _this.connectionPool.nodes[key]; + } + if (asking) { + redis = _this.connectionPool.nodes[asking]; + redis.asking(); } } - if (asking && !random) { - redis = asking; - redis.asking(); - } - if (random || !redis) { - redis = _this.selectRandomMasterNode(); + if (!redis) { + redis = _.sample(_this.connectionPool[to === 'all' ? 'nodes' : to]) || + _.sample(_this.connectionPool.nodes); } } if (node && !node.redis) { @@ -553,13 +488,7 @@ Cluster.prototype.handleError = function (error, ttl, handlers) { } var errv = error.message.split(' '); if (errv[0] === 'MOVED' || errv[0] === 'ASK') { - var hostPort = errv[2].split(':'); - var node = this.createNode(hostPort[1], hostPort[0]); - if (errv[0] === 'MOVED') { - handlers.moved(node, errv[1], hostPort); - } else { - handlers.ask(node, errv[1], hostPort); - } + handlers[errv[0] === 'MOVED' ? 'moved' : 'ask'](errv[1], errv[2]); } else if (errv[0] === 'CLUSTERDOWN' && this.options.retryDelayOnClusterDown > 0) { this.clusterDownQueue.push(handlers.clusterDown); if (!this.clusterDownTimeout) { @@ -595,40 +524,27 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) { redis.disconnect(); return callback(err); } - var i; - var oldNodes = {}; - var keys = Object.keys(_this.nodes); - for (i = 0; i < keys.length; ++i) { - oldNodes[keys[i]] = true; - } - _this.masterNodes = {}; - for (i = 0; i < result.length; ++i) { - var allNodes = []; + var nodes = []; + + for (var i = 0; i < result.length; ++i) { var items = result[i]; - var slotRangeStart = items.shift(); - var slotRangeEnd = items.shift(); - var master = items.shift(); - var masterNodeKey = master[0] + ':' + master[1]; - var masterNode = _this.createNode(master[1], master[0]); - _this.masterNodes[masterNodeKey] = masterNode; - allNodes.push(masterNode); - delete oldNodes[masterNodeKey]; - if (_this.options.readOnly) { - items.forEach(function (item) { - var host = item[0]; - var port = item[1]; - allNodes.push(_this.createNode(port, host)); - delete oldNodes[host + ':' + port]; - }); + var slotRangeStart = items[0]; + var slotRangeEnd = items[1]; + + var keys = []; + for (var j = 2; j < items.length; j++) { + items[j] = { host: items[j][0], port: items[j][1] }; + items[j].readOnly = j !== 2; + nodes.push(items[j]); + keys.push(items[j].host + ':' + items[j].port); } - for (var slot = slotRangeStart; slot <= slotRangeEnd; ++slot) { - _this.slots[slot] = { masterNode: masterNode, allNodes: allNodes }; + + for (var slot = slotRangeStart; slot <= slotRangeEnd; slot++) { + _this.slots[slot] = keys; } } - Object.keys(oldNodes).forEach(function (key) { - _this.nodes[key].disconnect(); - }); + _this.connectionPool.reset(nodes); callback(); }, 1000)); }; @@ -645,7 +561,7 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) { }; }); -require('./transaction').addTransactionSupport(Cluster.prototype); +require('../transaction').addTransactionSupport(Cluster.prototype); function noop() {} diff --git a/lib/commander.js b/lib/commander.js index 88b4a9aa..9ef38d0c 100644 --- a/lib/commander.js +++ b/lib/commander.js @@ -9,7 +9,8 @@ var Script = require('./script'); * * This is the base class of Redis, Redis.Cluster and Pipeline * - * @param {boolean} [options.showFriendlyErrorStack=false] - Whether to show a friendly error stack. Will decrease the performance significantly. + * @param {boolean} [options.showFriendlyErrorStack=false] - Whether to show a friendly error stack. + * Will decrease the performance significantly. * @constructor */ function Commander() { diff --git a/lib/pipeline.js b/lib/pipeline.js index 963ced1d..37b9946b 100644 --- a/lib/pipeline.js +++ b/lib/pipeline.js @@ -104,14 +104,14 @@ Pipeline.prototype.fillResult = function (value, position) { this.leftRedirections = {}; } this.redis.handleError(commonError, this.leftRedirections, { - moved: function (node) { - _this.preferNode = node; - _this.redis.slots[errv[1]] = node; + moved: function (slot, key) { + _this.preferKey = key; + _this.redis.slots[errv[1]] = [key]; _this.redis.refreshSlotsCache(); _this.exec(); }, - ask: function (node) { - _this.preferNode = node; + ask: function (slot, key) { + _this.preferKey = key; _this.exec(); }, clusterDown: function () { @@ -254,7 +254,10 @@ Pipeline.prototype.exec = function (callback) { var data = ''; var writePending = _this.replyPending = _this._queue.length; - var node = { slot: pipelineSlot, redis: _this.preferNode }; + var node; + if (_this.isCluster) { + node = { slot: pipelineSlot, redis: _this.redis.connectionPool.nodes[_this.preferKey] }; + } var bufferMode = false; var stream = { write: function (writable) { diff --git a/lib/utils/index.js b/lib/utils/index.js index 9022f166..00b4e97a 100644 --- a/lib/utils/index.js +++ b/lib/utils/index.js @@ -1,6 +1,5 @@ 'use strict'; var urllib = require('url'); -var util = require('util'); var _ = require('lodash'); /** @@ -290,3 +289,14 @@ exports.parseURL = function (url) { return result; }; + +exports.sample = function (array, from) { + var length = array.length; + if (typeof from !== 'number') { + from = 0; + } + if (from >= length) { + return; + } + return array[from + Math.floor(Math.random() * (length - from))]; +}; diff --git a/test/functional/cluster.js b/test/functional/cluster.js index ff40905c..9402eb6a 100644 --- a/test/functional/cluster.js +++ b/test/functional/cluster.js @@ -2,6 +2,7 @@ var utils = require('../../lib/utils'); var Promise = require('bluebird'); +var _ = require('lodash'); describe('cluster', function () { describe('connect', function () { @@ -291,8 +292,8 @@ describe('cluster', function () { var cluster = new Redis.Cluster([ { host: '127.0.0.1', port: '30001', password: 'other password' }, - { host: '127.0.0.1', port: '30002' } - ], { lazyConnect: false, password: 'default password' }); + { host: '127.0.0.1', port: '30002', password: null } + ], { lazyConnect: false, redisOptions: { password: 'default password' } }); }); }); @@ -933,59 +934,101 @@ describe('cluster', function () { }); }); - describe('readonly', function () { - it('should connect all nodes and issue a readonly', function (done) { - var setReadOnlyNode1 = false; - var setReadOnlyNode2 = false; - var setReadOnlyNode3 = false; - var slotTable = [ - [0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]], - [5461, 10922, ['127.0.0.1', 30002]] - ]; - var node1 = new MockServer(30001, function (argv) { - if (argv[0] === 'cluster' && argv[1] === 'slots') { - return slotTable; - } - if (argv[0] === 'readonly') { - setReadOnlyNode1 = true; - return 'OK'; - } - }); - var node2 = new MockServer(30002, function (argv) { + describe('scaleReads', function () { + beforeEach(function () { + function handler(port, argv) { if (argv[0] === 'cluster' && argv[1] === 'slots') { - return slotTable; - } - if (argv[0] === 'readonly') { - setReadOnlyNode2 = true; - return 'OK'; + return [ + [0, 16381, ['127.0.0.1', 30001], ['127.0.0.1', 30003]], + [16382, 16383, ['127.0.0.1', 30002]] + ]; } + return port; + } + this.node1 = new MockServer(30001, handler.bind(null, 30001)); + this.node2 = new MockServer(30002, handler.bind(null, 30002)); + this.node3 = new MockServer(30003, handler.bind(null, 30003)); + }); + + afterEach(function (done) { + disconnect([this.node1, this.node2, this.node3], done); + }); + + context('masters', function () { + it('should only send reads to masters', function (done) { + var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]); + cluster.on('ready', function () { + stub(utils, 'sample').throws('sample is called'); + cluster.get('foo', function (err, res) { + utils.sample.restore(); + expect(res).to.eql(30001); + cluster.disconnect(); + done(); + }); + }); }); + }); - var node3 = new MockServer(30003, function (argv) { - if (argv[0] === 'cluster' && argv[1] === 'slots') { - return slotTable; - } - if (argv[0] === 'readonly') { - setReadOnlyNode3 = true; - return 'OK'; - } + context('slaves', function () { + it('should only send reads to slaves', function (done) { + var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { + scaleReads: 'slaves' + }); + cluster.on('ready', function () { + stub(utils, 'sample', function (array, from) { + expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003']); + expect(from).to.eql(1); + return '127.0.0.1:30003'; + }); + cluster.get('foo', function (err, res) { + utils.sample.restore(); + expect(res).to.eql(30003); + cluster.disconnect(); + done(); + }); + }); }); - var cluster = new Redis.Cluster( - [{ host: '127.0.0.1', port: '30001' }], - { readOnly: true } - ); - cluster.on('ready', function () { - expect(setReadOnlyNode1 || setReadOnlyNode2 || setReadOnlyNode3).to.eql(true); - cluster.disconnect(); - disconnect([node1, node2, node3], done); + it('should send writes to masters', function (done) { + var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { + scaleReads: 'slaves' + }); + cluster.on('ready', function () { + stub(utils, 'sample').throws('sample is called'); + cluster.set('foo', 'bar', function (err, res) { + utils.sample.restore(); + expect(res).to.eql(30001); + cluster.disconnect(); + done(); + }); + }); }); + }); + context('all', function () { + it('should send reads to all nodes randomly', function (done) { + var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { + scaleReads: 'all' + }); + cluster.on('ready', function () { + stub(utils, 'sample', function (array, from) { + expect(array).to.eql(['127.0.0.1:30001', '127.0.0.1:30003']); + expect(from).to.eql(undefined); + return '127.0.0.1:30003'; + }); + cluster.get('foo', function (err, res) { + utils.sample.restore(); + expect(res).to.eql(30003); + cluster.disconnect(); + done(); + }); + }); + }); }); }); - describe('#masterNodes', function () { - it('should contains master nodes', function (done) { + describe('#nodes()', function () { + it('should return the corrent nodes', function (done) { var slotTable = [ [0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]], [5461, 10922, ['127.0.0.1', 30002]] @@ -1009,8 +1052,16 @@ describe('cluster', function () { var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]); cluster.on('ready', function () { - cluster.nodes['127.0.0.1:30001'].on('end', function () { - expect(Object.keys(cluster.masterNodes).length).to.eql(1); + expect(cluster.nodes()).to.have.lengthOf(3); + expect(cluster.nodes('all')).to.have.lengthOf(3); + expect(cluster.nodes('masters')).to.have.lengthOf(2); + expect(cluster.nodes('slaves')).to.have.lengthOf(1); + + cluster.on('-node', function () { + expect(cluster.nodes()).to.have.lengthOf(2); + expect(cluster.nodes('all')).to.have.lengthOf(2); + expect(cluster.nodes('masters')).to.have.lengthOf(1); + expect(cluster.nodes('slaves')).to.have.lengthOf(1); cluster.disconnect(); disconnect([node2, node3], done); }); @@ -1043,73 +1094,27 @@ describe('cluster', function () { } }); - var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]); + var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { + redisOptions: { showFriendlyErrorStack: true } + }); cluster.on('ready', function () { - expect(Object.keys(cluster.masterNodes).length).to.eql(2); + expect(cluster.nodes('masters')).to.have.lengthOf(2); slotTable = [ [0, 5460, ['127.0.0.1', 30003]], [5461, 10922, ['127.0.0.1', 30002]] ]; cluster.refreshSlotsCache(function () { - expect(Object.keys(cluster.masterNodes).length).to.eql(2); - expect(cluster.masterNodes).to.have.property('127.0.0.1:30003'); - expect(cluster.masterNodes).to.have.property('127.0.0.1:30002'); + expect(cluster.nodes('masters')).to.have.lengthOf(2); + expect([ + cluster.nodes('masters')[0].options.port, + cluster.nodes('masters')[1].options.port + ].sort()).to.eql([30002, 30003]); cluster.disconnect(); disconnect([node1, node2, node3], done); }); }); }); }); - - describe('#to', function () { - it('should throw when the group does not exist', function () { - stub(Redis.Cluster.prototype, 'connect', function () { - return Promise.resolve(); - }); - var cluster = new Redis.Cluster([{}]); - expect(function () { - cluster.to('non-exist'); - }).to.throw(/is not a valid group of nodes/); - Redis.Cluster.prototype.connect.restore(); - }); - - it('should return the correct nodes', function (done) { - var slotTable = [ - [0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]], - [5461, 16383, ['127.0.0.1', 30002]] - ]; - var argvHandler = function (argv) { - if (argv[0] === 'cluster' && argv[1] === 'slots') { - return slotTable; - } else if (argv[0] === 'keys') { - return ['key' + this.port]; - } - }; - var node1 = new MockServer(30001, argvHandler); - var node2 = new MockServer(30002, argvHandler); - var node3 = new MockServer(30003, argvHandler); - var pending = 3; - [node1, node2, node3].forEach(function (node) { - node.on('connect', function () { - if (!--pending) { - run(); - } - }); - }); - var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { readOnly: true }); - function run() { - expect(cluster.to('masters').nodes).to.have.lengthOf(2); - expect(cluster.to('slaves').nodes).to.have.lengthOf(1); - expect(cluster.to('all').nodes).to.have.lengthOf(3); - cluster.to('masters').call('keys', function (err, keys) { - expect(keys).to.have.lengthOf(2); - expect([].concat.apply([], keys).sort()).to.eql(['key30001', 'key30002']); - cluster.disconnect(); - disconnect([node1, node2, node3], done); - }); - } - }); - }); }); function disconnect(clients, callback) {