Skip to content

Commit

Permalink
Support scanStream in Cluster. Close #175
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Nov 19, 2015
1 parent 1819000 commit 755b71e
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 34 deletions.
5 changes: 5 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ This file is a manually maintained list of changes for each release. Feel free t

## Master Branch (Unreleased)

* Emits "select" event when the database changed.
* [Cluster] Supports scanStream ([#175](https://github.com/luin/ioredis/issues/175)).
* Update debug module to 2.2.0
* Update bluebird module to 2.9.34

## v1.10.0 - October 24, 2015

* [Cluster] Support redis schema url.
Expand Down
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,16 +523,22 @@ The Redis instance will emit some events about the state of the connection to th

Event | Description
:------------- | :-------------
connect | client will emit `connect` once a connection is established to the Redis server.
connect | emits when a connection is established to the Redis server.
ready | If `enableReadyCheck` is `true`, client will emit `ready` when the server reports that it is ready to receive commands (e.g. finish loading data from disk).<br>Otherwise, `ready` will be emitted immediately right after the `connect` event.
error | client will emit `error` when an error occurs while connecting.<br>However, ioredis emits all `error` events silently (only emits when there's at least one listener) so that your application won't crash if you're not listening to the `error` event.
close | client will emit `close` when an established Redis server connection has closed.
reconnecting | client will emit `reconnecting` after `close` when a reconnection will be made. The argument of the event is the time (in ms) before reconnecting.
end | client will emit `end` after `close` when no more reconnections will be made.
authError | client will emit `authError` when the password specified in the options is wrong or the server doesn't require a password.
error | emits when an error occurs while connecting.<br>However, ioredis emits all `error` events silently (only emits when there's at least one listener) so that your application won't crash if you're not listening to the `error` event.
close | emits when an established Redis server connection has closed.
reconnecting | emits after `close` when a reconnection will be made. The argument of the event is the time (in ms) before reconnecting.
end | emits after `close` when no more reconnections will be made.

You can also check out the `Redis#status` property to get the current connection status.

Besides the above connection events, there are several other custom events:

Event | Description
:------------- | :-------------
authError | emits when the password specified in the options is wrong or the server doesn't require a password.
select | emits when the database changed. The argument is the new db number.

## Offline Queue
When a command can't be processed by Redis (being sent before the `ready` event), by default, it's added to the offline queue and will be
executed when it can be processed. You can disable this feature by setting the `enableOfflineQueue`
Expand Down
13 changes: 13 additions & 0 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var util = require('util');
var EventEmitter = require('events').EventEmitter;
var debug = require('debug')('ioredis:cluster');
var _ = require('lodash');
var ScanStream = require('./scan_stream');
var Commander = require('./commander');
var Command = require('./command');

Expand Down Expand Up @@ -526,6 +527,18 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) {
}, 1000));
};

['sscan', 'hscan', 'zscan', 'sscanBuffer', 'hscanBuffer', 'zscanBuffer']
.forEach(function (command) {
Cluster.prototype[command + 'Stream'] = function (key, options) {
return new ScanStream(_.defaults({
objectMode: true,
key: key,
redis: this,
command: command
}, options));
};
});

require('./transaction').addTransactionSupport(Cluster.prototype);

function noop() {}
Expand Down
8 changes: 6 additions & 2 deletions lib/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,12 @@ Redis.prototype.sendCommand = function (command, stream) {
}

if (command.name === 'select' && utils.isInt(command.args[0])) {
this.condition.select = parseInt(command.args[0], 10);
debug('switch to db [%d]', this.condition.select);
var db = parseInt(command.args[0], 10);
if (this.condition.select !== db) {
this.condition.select = db;
this.emit('select', db);
debug('switch to db [%d]', this.condition.select);
}
}

return command.promise;
Expand Down
50 changes: 26 additions & 24 deletions test/functional/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ describe('cluster', function () {
it('should stop reconnecting when disconnected', function (done) {
var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
], { clusterRetryStrategy: function () { return 0; } });
], {
clusterRetryStrategy: function () {
return 0;
}
});

cluster.on('close', function () {
cluster.disconnect();
Expand Down Expand Up @@ -491,7 +495,7 @@ describe('cluster', function () {
var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
]);
cluster.get('foo', 'bar', function (err, result) {
cluster.get('foo', 'bar', function (err) {
expect(called).to.eql(true);
expect(err.message).to.match(/Wrong arguments count/);
cluster.disconnect();
Expand Down Expand Up @@ -529,7 +533,7 @@ describe('cluster', function () {
var slotTable = [
[0, 12181, ['127.0.0.1', 30001]],
[12182, 12183, ['127.0.0.1', 30002]],
[12184, 16383, ['127.0.0.1', 30001]],
[12184, 16383, ['127.0.0.1', 30001]]
];
var node1 = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
Expand Down Expand Up @@ -557,7 +561,7 @@ describe('cluster', function () {
var slotTable = [
[0, 12181, ['127.0.0.1', 30001]],
[12182, 12183, ['127.0.0.1', 30002]],
[12184, 16383, ['127.0.0.1', 30001]],
[12184, 16383, ['127.0.0.1', 30001]]
];
var node1 = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
Expand Down Expand Up @@ -597,7 +601,7 @@ describe('cluster', function () {
var slotTable = [
[0, 12181, ['127.0.0.1', 30001]],
[12182, 12183, ['127.0.0.1', 30002]],
[12184, 16383, ['127.0.0.1', 30001]],
[12184, 16383, ['127.0.0.1', 30001]]
];
var node1 = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
Expand Down Expand Up @@ -639,7 +643,7 @@ describe('cluster', function () {
var slotTable = [
[0, 12181, ['127.0.0.1', 30001]],
[12182, 12183, ['127.0.0.1', 30002]],
[12184, 16383, ['127.0.0.1', 30001]],
[12184, 16383, ['127.0.0.1', 30001]]
];
var node1 = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
Expand Down Expand Up @@ -674,7 +678,7 @@ describe('cluster', function () {
var slotTable = [
[0, 12181, ['127.0.0.1', 30001]],
[12182, 12183, ['127.0.0.1', 30002]],
[12184, 16383, ['127.0.0.1', 30001]],
[12184, 16383, ['127.0.0.1', 30001]]
];
var node1 = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
Expand Down Expand Up @@ -715,7 +719,7 @@ describe('cluster', function () {
var slotTable = [
[0, 12181, ['127.0.0.1', 30001]],
[12182, 12183, ['127.0.0.1', 30002]],
[12184, 16383, ['127.0.0.1', 30001]],
[12184, 16383, ['127.0.0.1', 30001]]
];
var node1 = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
Expand Down Expand Up @@ -759,7 +763,7 @@ describe('cluster', function () {
var slotTable = [
[0, 12181, ['127.0.0.1', 30001]],
[12182, 12183, ['127.0.0.1', 30002]],
[12184, 16383, ['127.0.0.1', 30001]],
[12184, 16383, ['127.0.0.1', 30001]]
];
var node1 = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
Expand Down Expand Up @@ -813,15 +817,15 @@ describe('cluster', function () {
var handler = function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return [
[0, 1, ['127.0.0.1', 30001]],
[2, 16383, ['127.0.0.1', 30002]]
];
[0, 1, ['127.0.0.1', 30001]],
[2, 16383, ['127.0.0.1', 30002]]
];
}
};
var node1 = new MockServer(30001, handler);
var node2 = new MockServer(30002, handler);

var options = [ { host: '127.0.0.1', port: '30001' } ];
var options = [{ host: '127.0.0.1', port: '30001' }];
var sub = new Redis.Cluster(options);

sub.subscribe('test cluster', function () {
Expand All @@ -848,7 +852,6 @@ describe('cluster', function () {
var client = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);

client.subscribe('test cluster', function () {
var subscribe = Redis.prototype.subscribe;
stub(Redis.prototype, 'subscribe', function (channels) {
expect(channels).to.eql(['test cluster']);
Redis.prototype.subscribe.restore();
Expand Down Expand Up @@ -876,7 +879,6 @@ describe('cluster', function () {
var client = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);

client.psubscribe('test?', function () {
var psubscribe = Redis.prototype.psubscribe;
stub(Redis.prototype, 'psubscribe', function (channels) {
expect(channels).to.eql(['test?']);
Redis.prototype.psubscribe.restore();
Expand All @@ -892,7 +894,7 @@ describe('cluster', function () {
});
});

describe('readonly', function() {
describe('readonly', function () {
it('should connect all nodes and issue a readonly', function (done) {
var setReadOnlyNode1 = false;
var setReadOnlyNode2 = false;
Expand Down Expand Up @@ -931,10 +933,10 @@ describe('cluster', function () {
});

var cluster = new Redis.Cluster(
[{ host: '127.0.0.1', port: '30001'}],
[{ host: '127.0.0.1', port: '30001' }],
{ readOnly: true }
);
cluster.on('ready', function() {
cluster.on('ready', function () {
expect(setReadOnlyNode1 || setReadOnlyNode2 || setReadOnlyNode3).to.eql(true);
cluster.disconnect();
disconnect([node1, node2, node3], done);
Expand Down Expand Up @@ -966,8 +968,8 @@ describe('cluster', function () {
}
});

var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001'}]);
cluster.on('ready', function() {
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);
cluster.on('ready', function () {
cluster.nodes['127.0.0.1:30001'].on('end', function () {
expect(Object.keys(cluster.masterNodes).length).to.eql(1);
cluster.disconnect();
Expand Down Expand Up @@ -1002,8 +1004,8 @@ describe('cluster', function () {
}
});

var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001'}]);
cluster.on('ready', function() {
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);
cluster.on('ready', function () {
expect(Object.keys(cluster.masterNodes).length).to.eql(2);
slotTable = [
[0, 5460, ['127.0.0.1', 30003]],
Expand All @@ -1021,15 +1023,15 @@ describe('cluster', function () {
});
});

function disconnect (clients, callback) {
function disconnect(clients, callback) {
var pending = 0;

for (var i = 0; i < clients.length; ++i) {
pending += 1;
clients[i].disconnect(check);
}

function check () {
function check() {
if (!--pending && callback) {
callback();
}
Expand Down
66 changes: 64 additions & 2 deletions test/functional/scan_stream.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
'use strict';

var Readable = require('stream').Readable;

describe('*scanStream', function () {
Expand Down Expand Up @@ -46,7 +48,7 @@ describe('*scanStream', function () {
stub(Redis.prototype, 'scan', function (args) {
var count;
for (var i = 0; i < args.length; ++i) {
if (typeof args[i] === 'string', args[i].toUpperCase() === 'COUNT') {
if (typeof args[i] === 'string' && args[i].toUpperCase() === 'COUNT') {
count = args[i + 1];
break;
}
Expand Down Expand Up @@ -80,7 +82,8 @@ describe('*scanStream', function () {
keys = keys.concat(data);
});
stream.on('end', function () {
expect(keys.sort()).to.eql([new Buffer('foo1'), new Buffer('foo10'), new Buffer('foo2'), new Buffer('foo3'), new Buffer('foo4')]);
expect(keys.sort()).to.eql([new Buffer('foo1'), new Buffer('foo10'),
new Buffer('foo2'), new Buffer('foo3'), new Buffer('foo4')]);
done();
});
});
Expand All @@ -103,4 +106,63 @@ describe('*scanStream', function () {
});
});
});

describe('Cluster', function () {
it('should work in cluster mode', function (done) {
var slotTable = [
[0, 5460, ['127.0.0.1', 30001]],
[5461, 10922, ['127.0.0.1', 30002]],
[10923, 16383, ['127.0.0.1', 30003]]
];
var serverKeys = ['foo1', 'foo2', 'foo3', 'foo4', 'foo10'];
var argvHandler = function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return slotTable;
}
if (argv[0] === 'sscan' && argv[1] === 'set') {
var cursor = Number(argv[2]);
if (cursor >= serverKeys.length) {
return ['0', []];
}
return [String(cursor + 1), [serverKeys[cursor]]];
}
};
var node1 = new MockServer(30001, argvHandler);
var node2 = new MockServer(30002, argvHandler);
var node3 = new MockServer(30003, argvHandler);

var cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
]);

var keys = [];
cluster.sadd('set', serverKeys, function () {
var stream = cluster.sscanStream('set');
stream.on('data', function (data) {
keys = keys.concat(data);
});
stream.on('end', function () {
expect(keys).to.eql(serverKeys);
cluster.disconnect();
disconnect([node1, node2, node3], done);
});
});

});
});
});

function disconnect(clients, callback) {
var pending = 0;

for (var i = 0; i < clients.length; ++i) {
pending += 1;
clients[i].disconnect(check);
}

function check() {
if (!--pending && callback) {
callback();
}
}
}
20 changes: 20 additions & 0 deletions test/functional/select.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,24 @@ describe('select', function () {
});
});
});

it('should emit "select" event when db changes', function (done) {
var changes = [];
var redis = new Redis();
redis.select('2', function () {
expect(changes).to.eql([2]);
redis.select('4', function () {
expect(changes).to.eql([2, 4]);
redis.select('4', function () {
expect(changes).to.eql([2, 4]);
done();
});
});
});

redis.on('select', function (db) {
console.log('select', db);
changes.push(db);
});
});
});

0 comments on commit 755b71e

Please sign in to comment.