Skip to content

Commit

Permalink
Merge branch 'optimization' of github.com:koszta/socket.io-redis
Browse files Browse the repository at this point in the history
  • Loading branch information
rauchg committed Feb 14, 2015
2 parents a6cc2b1 + 278da75 commit ab9805a
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 13 deletions.
106 changes: 96 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var msgpack = require('msgpack-js');
var Adapter = require('socket.io-adapter');
var Emitter = require('events').EventEmitter;
var debug = require('debug')('socket.io-redis');
var async = require('async');

/**
* Module exports.
Expand Down Expand Up @@ -70,10 +71,11 @@ function adapter(uri, opts){
Adapter.call(this, nsp);

var self = this;
sub.psubscribe(prefix + '#*', function(err){

sub.subscribe(prefix + '#' + nsp.name + '#', function(err){
if (err) self.emit('error', err);
});
sub.on('pmessage', this.onmessage.bind(this));
sub.on('message', this.onmessage.bind(this));
}

/**
Expand All @@ -88,16 +90,20 @@ function adapter(uri, opts){
* @api private
*/

Redis.prototype.onmessage = function(pattern, channel, msg){
var pieces = channel.split('#');
if (uid == pieces.pop()) return debug('ignore same uid');
var args = msgpack.decode(msg);
Redis.prototype.onmessage = function(channel, msg){
var pieces = channel.split('#'),
args = msgpack.decode(msg),
packet;

if (uid == args.shift()) return debug('ignore same uid');

packet = args[0];

if (args[0] && args[0].nsp === undefined) {
args[0].nsp = '/';
if (packet && packet.nsp === undefined) {
packet.nsp = '/';
}

if (!args[0] || args[0].nsp != this.nsp.name) {
if (!packet || packet.nsp != this.nsp.name) {
return debug('ignore different namespace');
}

Expand All @@ -117,7 +123,87 @@ function adapter(uri, opts){

Redis.prototype.broadcast = function(packet, opts, remote){
Adapter.prototype.broadcast.call(this, packet, opts);
if (!remote) pub.publish(key, msgpack.encode([packet, opts]));
if (!remote) {
if (opts.rooms) {
opts.rooms.forEach(function(room) {
pub.publish(prefix + '#' + packet.nsp + '#' + room + '#', msgpack.encode([uid, packet, opts]));
});
} else {
pub.publish(prefix + '#' + packet.nsp + '#', msgpack.encode([uid, packet, opts]));
};
};
};

Redis.prototype.add = function(id, room, fn){
var self = this;

debug('adding ', id, ' to ', room);

this.sids[id] = this.sids[id] || {};
this.sids[id][room] = true;
this.rooms[room] = this.rooms[room] || {};
this.rooms[room][id] = true;

sub.subscribe(prefix + '#' + this.nsp.name + '#' + room + '#', function(err){
if (err) self.emit('error', err);

if (fn) process.nextTick(fn.bind(null, null));
});
};

Redis.prototype.del = function(id, room, fn){
var self = this;

debug('removing ', id, ' from ', room);

this.sids[id] = this.sids[id] || {};
this.rooms[room] = this.rooms[room] || {};
delete this.sids[id][room];
delete this.rooms[room][id];

if (this.rooms.hasOwnProperty(room) && !Object.keys(this.rooms[room]).length) {
delete this.rooms[room];

return sub.unsubscribe(prefix + '#' + this.nsp.name + '#' + room + '#', function(err){
if (err) self.emit('error', err);

if (fn) process.nextTick(fn.bind(null, null));
});
}

if (fn) process.nextTick(fn.bind(null, null));
};

Redis.prototype.delAll = function(id, fn){
var self = this,
rooms = this.sids[id];

debug('removing ', id, ' from all rooms');

if (!rooms) return process.nextTick(fn.bind(null, null));

async.forEach(Object.keys(rooms), function (room, next) {
if (rooms.hasOwnProperty(room)) {
delete self.rooms[room][id];
}

if (self.rooms.hasOwnProperty(room) && !Object.keys(self.rooms[room]).length) {
delete self.rooms[room];

return sub.unsubscribe(prefix + '#' + self.nsp.name + '#' + room + '#', function(err){
if (err) self.emit('error', err);
next();
});
}

next();
}, function(err) {
if (err) self.emit('error', err);

delete self.sids[id];

if (fn) process.nextTick(fn.bind(null, null));
});
};

return Redis;
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
"uid2": "0.0.3",
"redis": "0.10.1",
"msgpack-js": "0.3.0",
"socket.io-adapter": "0.3.1"
"socket.io-adapter": "0.3.1",
"async": "0.2.10"
},
"devDependencies": {
"socket.io": "1.0.2",
"socket.io-client": "1.0.2",
"mocha": "1.18.0",
"expect.js": "0.3.1",
"async": "0.2.10"
"expect.js": "0.3.1"
}
}
70 changes: 70 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,21 @@ describe('socket.io-redis', function(){
socket.join('room', callback);
});

socket.on('leave', function(callback){
socket.leave('room', callback);
});

socket.on('socket broadcast', function(data){
socket.broadcast.to('room').emit('broadcast', data);
});

socket.on('namespace broadcast', function(data){
sio.of('/nsp').in('room').emit('broadcast', data);
});

socket.on('request', function(data){
socket.emit('reply', data);
});
});
});

Expand Down Expand Up @@ -124,5 +132,67 @@ describe('socket.io-redis', function(){

this.sockets[0].emit('namespace broadcast', 'hi');
});

it('should reply to one client', function(done){
this.sockets.slice(1).forEach(function(socket){
socket.on('reply', function(message){
throw new Error('Called unexpectedly: other socket');
});
});

this.sockets[0].on('reply', function(message){
expect(message).to.equal('hi');
done();
});
this.sockets[0].emit('request', 'hi');
});

it('should not send message for clients left the room', function(done){
var self = this;

async.each(this.sockets, function(socket, next){
socket.on('broadcast', function(message){
throw new Error('Called unexpectedly: client already left the room');
});
socket.emit('leave', next);
}, function (err) {
self.sockets[0].emit('namespace broadcast', 'hi');
done();
});
});

it('should unsubscribe from the channel if there are no more room members', function(done){
var self = this;

async.each(this.sockets, function(socket, next){
socket.emit('leave', next);
}, function (err) {
var pub = self.redisClients[0];
pub.pubsub('numsub', 'socket.io#/nsp#room#', function (err, subscriptions) {
expect(parseInt(subscriptions[1])).to.be(0);
done(err);
});
});
});

it('should unsubscribe from the channel if clients have disconnected', function(done){
var self = this;

setTimeout(function () {
async.each(self.sockets, function(socket, next){
socket.once('disconnect', next);
socket.disconnect();
}, function (err) {
setTimeout(function () {
var pub = self.redisClients[0];

pub.pubsub('numsub', 'socket.io#/nsp#room#', function (err, subscriptions) {
expect(parseInt(subscriptions[1])).to.be(0);
done(err);
});
}, 20);
});
}, 20);
});
});
});

0 comments on commit ab9805a

Please sign in to comment.