Skip to content

Commit

Permalink
Add messageBuffer event to handle binary message
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Apr 4, 2015
1 parent f789f3b commit 81fe1ca
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 3 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ redis.on('message', function (channel, message) {
// Receive message Hello again! from channel music
console.log('Receive message %s from channel %s', message, channel);
});

// There's also a event called 'messageBuffer', which is same to 'message' except
// return buffers instead of strings.
redis.on('messageBuffer', function (channel, message) {
// Both `channel` and `message` are buffers.
});
```
When a client issues a SUBSCRIBE or PSUBSCRIBE, that connection is put into a "subscriber" mode.
At that point, only commands that modify the subscription set are valid.
Expand Down
20 changes: 17 additions & 3 deletions lib/redis/mixin/prototype/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ exports._initParser = function () {
}
});
this.replyParser.on('reply', function (reply) {
self.returnReply(reply);
// Prevent the exception be caught by the parser.
process.nextTick(function () {
self.returnReply(reply);
});
});
// "error" is bad. Somehow the parser got confused. It'll try to reset and continue.
this.replyParser.on('error', function (err) {
Expand Down Expand Up @@ -83,10 +86,21 @@ exports.returnReply = function (reply) {
var replyType = Array.isArray(reply) ? reply[0].toString() : null;
switch (replyType) {
case 'message':
this.emit('message', reply[1].toString(), reply[2]); // channel, message
if (this.listeners('message').length > 0) {
this.emit('message', reply[1].toString(), reply[2].toString());
}
if (this.listeners('messageBuffer').length > 0) {
this.emit('messageBuffer', reply[1], reply[2]);
}
break;
case 'pmessage':
this.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message
var pattern = reply[1].toString();
if (this.listeners('pmessage').length > 0) {
this.emit('pmessage', pattern, reply[2].toString(), reply[3].toString());
}
if (this.listeners('pmessageBuffer').length > 0) {
this.emit('pmessageBuffer', pattern, reply[2], reply[3]);
}
break;
case 'subscribe':
case 'psubscribe':
Expand Down
52 changes: 52 additions & 0 deletions test/functional/pub_sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,58 @@ describe('pub/sub', function () {
});
});

it('should receive messages when subscribe a channel', function (done) {
var redis = new Redis();
var pub = new Redis();
var pending = 2;
redis.subscribe('foo', function () {
pub.publish('foo', 'bar');
});
redis.on('message', function (channel, message) {
expect(channel).to.eql('foo');
expect(message).to.eql('bar');
if (!--pending) {
done();
}
});
redis.on('messageBuffer', function (channel, message) {
expect(channel).to.be.instanceof(Buffer);
expect(channel.toString()).to.eql('foo');
expect(message).to.be.instanceof(Buffer);
expect(message.toString()).to.eql('bar');
if (!--pending) {
done();
}
});
});

it('should receive messages when psubscribe a pattern', function (done) {
var redis = new Redis();
var pub = new Redis();
var pending = 2;
redis.psubscribe('f?oo', function () {
pub.publish('fzoo', 'bar');
});
redis.on('pmessage', function (pattern, channel, message) {
expect(pattern).to.eql('f?oo');
expect(channel).to.eql('fzoo');
expect(message).to.eql('bar');
if (!--pending) {
done();
}
});
redis.on('pmessageBuffer', function (pattern, channel, message) {
expect(pattern).to.eql('f?oo');
expect(channel).to.be.instanceof(Buffer);
expect(channel.toString()).to.eql('fzoo');
expect(message).to.be.instanceof(Buffer);
expect(message.toString()).to.eql('bar');
if (!--pending) {
done();
}
});
});

it('should exit subscriber mode using punsubscribe', function (done) {
var redis = new Redis();
redis.psubscribe('f?oo', 'b?ar', function () {
Expand Down

0 comments on commit 81fe1ca

Please sign in to comment.