Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some useful methods #168

Merged
merged 5 commits into from
Jan 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 65 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,75 @@ that a regular `Adapter` does not

Returns the list of client IDs connected to `rooms` across all nodes. See [Namespace#clients(fn:Function)](https://github.com/socketio/socket.io#namespaceclientsfnfunction)

```js
io.adapter.clients(function (err, clients) {
console.log(clients); // an array containing all connected socket ids
});

io.adapter.clients(['room1', 'room2'], function (err, clients) {
console.log(clients); // an array containing socket ids in 'room1' and/or 'room2'
});
```

### RedisAdapter#clientRooms(id:String, fn:Function)

Returns the list of rooms the client with the given ID has joined (even on another node).

```js
io.adapter.clientRooms('<my-id>', function (err, rooms) {
if (err) { /* unknown id */ }
console.log(rooms); // an array containing every room a given id has joined.
});
```

### RedisAdapter#allRooms(fn:Function)

Returns the list of all rooms.

```js
io.adapter.allRooms(function (err, rooms) {
console.log(rooms); // an array containing all rooms (accross every node)
});
```

### RedisAdapter#remoteJoin(id:String, room:String, fn:Function)

Makes the socket with the given id join the room. The callback will be called once the socket has joined the room, or with an `err` argument if the socket was not found.

```js
io.adapter.remoteJoin('<my-id>', 'room1', function (err) {
if (err) { /* unknown id */ }
// success
});
```

### RedisAdapter#remoteLeave(id:String, room:String, fn:Function)

Makes the socket with the given id leave the room. The callback will be called once the socket has left the room, or with an `err` argument if the socket was not found.

```js
io.adapter.remoteLeave('<my-id>', 'room1', function (err) {
if (err) { /* unknown id */ }
// success
});
```

### RedisAdapter#customRequest(data:Object, fn:Function)

Sends a request to every nodes, that will respond through the `customHook` method.

```js
// on every node
io.adapter.customHook = function (data) {
return 'hello ' + data;
}

// then
io.adapter.customRequest('john', function(err, replies){
console.log(replies); // an array ['hello john', ...] with one element per node
});
```

## Client error handling

Access the `pubClient` and `subClient` properties of the
Expand Down
278 changes: 278 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ module.exports = adapter;
var requestTypes = {
clients: 0,
clientRooms: 1,
allRooms: 2,
remoteJoin: 3,
remoteLeave: 4,
customRequest: 5,
};

/**
Expand Down Expand Up @@ -86,6 +90,7 @@ function adapter(uri, opts) {
this.requestChannel = prefix + '-request#' + this.nsp.name + '#';
this.responseChannel = prefix + '-response#' + this.nsp.name + '#';
this.requests = {};
this.customHook = function(){ return null; }

if (String.prototype.startsWith) {
this.channelMatches = function (messageChannel, subscribedChannel) {
Expand Down Expand Up @@ -212,6 +217,59 @@ function adapter(uri, opts) {
});
break;

case requestTypes.allRooms:

var response = JSON.stringify({
requestid: request.requestid,
rooms: Object.keys(this.rooms)
});

pub.publish(self.responseChannel, response);
break;

case requestTypes.remoteJoin:

var socket = this.nsp.connected[request.sid];
if (!socket) { return; }

function sendAck(){
var response = JSON.stringify({
requestid: request.requestid
});

pub.publish(self.responseChannel, response);
}

socket.join(request.room, sendAck);
break;

case requestTypes.remoteLeave:

var socket = this.nsp.connected[request.sid];
if (!socket) { return; }

function sendAck(){
var response = JSON.stringify({
requestid: request.requestid
});

pub.publish(self.responseChannel, response);
}

socket.leave(request.room, sendAck);
break;

case requestTypes.customRequest:
var data = this.customHook(request.data);

var response = JSON.stringify({
requestid: request.requestid,
data: data
});

pub.publish(self.responseChannel, response);
break;

default:
debug('ignoring unknown request type: %s', request.type);
}
Expand Down Expand Up @@ -268,6 +326,42 @@ function adapter(uri, opts) {
delete self.requests[request.requestid];
break;

case requestTypes.allRooms:
request.msgCount++;

// ignore if response does not contain 'rooms' key
if(!response.rooms || !Array.isArray(response.rooms)) return;

for(var i = 0; i < response.rooms.length; i++){
request.rooms[response.rooms[i]] = true;
}

if (request.msgCount === request.numsub) {
clearTimeout(request.timeout);
if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.rooms)));
delete self.requests[request.requestid];
}
break;

case requestTypes.remoteJoin:
case requestTypes.remoteLeave:
clearTimeout(request.timeout);
if (request.callback) process.nextTick(request.callback.bind(null, null));
delete self.requests[request.requestid];
break;

case requestTypes.customRequest:
request.msgCount++;

request.replies.push(response.data);

if (request.msgCount === request.numsub) {
clearTimeout(request.timeout);
if (request.callback) process.nextTick(request.callback.bind(null, null, request.replies));
delete self.requests[request.requestid];
}
break;

default:
debug('ignoring unknown request type: %s', request.type);
}
Expand Down Expand Up @@ -489,6 +583,190 @@ function adapter(uri, opts) {
pub.publish(self.requestChannel, request);
};

/**
* Gets the list of all rooms (accross every node)
*
* @param {Function} callback
* @api public
*/

Redis.prototype.allRooms = function(fn){

var self = this;
var requestid = uid2(6);

pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}

numsub = parseInt(numsub[1], 10);

var request = JSON.stringify({
requestid : requestid,
type: requestTypes.allRooms
});

// if there is no response for x second, return result
var timeout = setTimeout(function() {
var request = self.requests[requestid];
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for allRooms response'), Object.keys(request.rooms)));
delete self.requests[requestid];
}, self.requestsTimeout);

self.requests[requestid] = {
type: requestTypes.allRooms,
numsub: numsub,
msgCount: 0,
rooms: {},
callback: fn,
timeout: timeout
};

pub.publish(self.requestChannel, request);
});
};

/**
* Makes the socket with the given id join the room
*
* @param {String} socket id
* @param {String} room name
* @param {Function} callback
* @api public
*/

Redis.prototype.remoteJoin = function(id, room, fn){

var self = this;
var requestid = uid2(6);

var socket = this.nsp.connected[id];
if (socket) {
socket.join(room);
if (fn) process.nextTick(fn.bind(null, null));
return;
}

var request = JSON.stringify({
requestid : requestid,
type: requestTypes.remoteJoin,
sid: id,
room: room
});

// if there is no response for x second, return result
var timeout = setTimeout(function() {
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteJoin response')));
delete self.requests[requestid];
}, self.requestsTimeout);

self.requests[requestid] = {
type: requestTypes.remoteJoin,
callback: fn,
timeout: timeout
};

pub.publish(self.requestChannel, request);
};

/**
* Makes the socket with the given id leave the room
*
* @param {String} socket id
* @param {String} room name
* @param {Function} callback
* @api public
*/

Redis.prototype.remoteLeave = function(id, room, fn){

var self = this;
var requestid = uid2(6);

var socket = this.nsp.connected[id];
if (socket) {
socket.leave(room);
if (fn) process.nextTick(fn.bind(null, null));
return;
}

var request = JSON.stringify({
requestid : requestid,
type: requestTypes.remoteLeave,
sid: id,
room: room
});

// if there is no response for x second, return result
var timeout = setTimeout(function() {
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteLeave response')));
delete self.requests[requestid];
}, self.requestsTimeout);

self.requests[requestid] = {
type: requestTypes.remoteLeave,
callback: fn,
timeout: timeout
};

pub.publish(self.requestChannel, request);
};

/**
* Sends a new custom request to other nodes
*
* @param {Object} data (no binary)
* @param {Function} callback
* @api public
*/

Redis.prototype.customRequest = function(data, fn){
if (typeof data === 'function'){
fn = data;
data = null;
}

var self = this;
var requestid = uid2(6);

pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}

numsub = parseInt(numsub[1], 10);

var request = JSON.stringify({
requestid : requestid,
type: requestTypes.customRequest,
data: data
});

// if there is no response for x second, return result
var timeout = setTimeout(function() {
var request = self.requests[requestid];
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for customRequest response'), request.replies));
delete self.requests[requestid];
}, self.requestsTimeout);

self.requests[requestid] = {
type: requestTypes.customRequest,
numsub: numsub,
msgCount: 0,
replies: [],
callback: fn,
timeout: timeout
};

pub.publish(self.requestChannel, request);
});
};

Redis.uid = uid;
Redis.pubClient = pub;
Redis.subClient = sub;
Expand Down