Browse files

pmessage support from Redis 1.3.10 c8d0ea0ef1df7b64a23f992f370db5f70f…

…343891
  • Loading branch information...
1 parent 7138e80 commit 555fa6b59360220b180ab2092acbd2708cdc7cff @fictorial fictorial committed May 2, 2010
Showing with 42 additions and 56 deletions.
  1. +6 −2 examples/subscriber.js
  2. +36 −54 lib/redis-client.js
View
8 examples/subscriber.js
@@ -10,6 +10,10 @@ var
sys.puts("waiting for messages...");
client.subscribeTo("*",
- function (channel, message) {
- sys.puts("[" + channel + "]: " + message);
+ function (channel, message, subscriptionPattern) {
+ var output = "[" + channel;
+ if (subscriptionPattern)
+ output += " (from pattern '" + subscriptionPattern + "')";
+ output += "]: " + message;
+ sys.puts(output);
});
View
90 lib/redis-client.js
@@ -72,20 +72,6 @@ function debugFilter(buffer, len) {
return filtered;
}
-// fnmatch mirrors (mostly) the functionality of fnmatch(3) at least
-// in the same way as Redis.
-
-var qmarkRE = /\?/g;
-var starRE = /\*/g;
-var dotRE = /\./g;
-
-function fnmatch (pattern, test) {
- var newPattern = pattern.replace(dotRE, '(\\.)')
- .replace(qmarkRE, '(.)')
- .replace(starRE, '(.*?)');
- return (new RegExp(newPattern)).test(test);
-}
-
// A fully interruptable, binary-safe Redis reply parser.
// 'callback' is called with each reply parsed in 'feed'.
// 'thisArg' is the "thisArg" for the callback "call".
@@ -409,14 +395,26 @@ Client.prototype.onReply_ = function (reply) {
};
Client.prototype.handlePublishedMessage_ = function (reply) {
- // We're looking for a multibulk like:
- // ["message", "channelName", messageBuffer]
-
- if (reply.type != MULTIBULK ||
- !(reply.value instanceof Array) ||
- reply.value.length != 3 ||
- reply.value[0].value.length != 7 ||
- reply.value[0].value.asciiSlice(0, 7) != 'message')
+ // We're looking for a multibulk resembling
+ // ["message", "channelName", messageBuffer]; or
+ // ["pmessage", "matchingPattern", "channelName", messageBuffer]
+ // The latter is sent when the client subscribed to a channel by a pattern;
+ // the former when subscribed to a channel by name.
+ // If the client subscribes by name -and- by pattern and there's some
+ // overlap, the client -will- receive multiple p/message notifications.
+
+ if (reply.type != MULTIBULK || !(reply.value instanceof Array))
+ return false;
+
+ var isMessage = (reply.value.length == 3 &&
+ reply.value[0].value.length == 7 &&
+ reply.value[0].value.asciiSlice(0, 7) == 'message');
+
+ var isPMessage = (reply.value.length == 4 &&
+ reply.value[0].value.length == 8 &&
+ reply.value[0].value.asciiSlice(0, 8) == 'pmessage');
+
+ if (!isMessage && !isPMessage)
return false;
// This is tricky. We are returning true even though there
@@ -430,39 +428,23 @@ Client.prototype.handlePublishedMessage_ = function (reply) {
if (Object.getOwnPropertyNames(this.channelCallbacks).length == 0)
return true;
- var channelNameOrPattern = reply.value[1].value;
- var channelCallback = this.channelCallbacks[channelNameOrPattern];
- if (typeof channelCallback == 'undefined') {
- // No 1:1 channel name match.
- //
- // Perhaps the subscription was for a pattern (PSUBSCRIBE)?
- // Redis does not send the pattern that matched from an
- // original PSUBSCRIBE request. It sends the (fn)matching
- // channel name instead. Thus, let's try to fnmatch the
- // channel the message was published to/on to a subscribed
- // pattern, and callback the associated function.
- //
- // A -> Redis PSUBSCRIBE foo.*
- // B -> Redis PUBLISH foo.bar hello
- // Redis -> A MESSAGE foo.bar hello (no pattern specified)
-
- var channelNamesOrPatterns =
- Object.getOwnPropertyNames(this.channelCallbacks);
-
- for (var i=0; i < channelNamesOrPatterns.length; ++i) {
- var thisNameOrPattern = channelNamesOrPatterns[i];
- if (fnmatch(thisNameOrPattern, channelNameOrPattern)) {
- channelCallback = this.channelCallbacks[thisNameOrPattern];
- break;
- }
- }
+ var channelName, channelPattern, channelCallback, payload;
+
+ if (isMessage) {
+ channelName = reply.value[1].value;
+ channelCallback = this.channelCallbacks[channelName];
+ payload = reply.value[2].value;
+ } else if (isPMessage) {
+ channelPattern = reply.value[1].value;
+ channelName = reply.value[2].value;
+ channelCallback = this.channelCallbacks[channelPattern];
+ payload = reply.value[3].value;
+ } else {
+ return false;
}
- if (typeof(channelCallback) === 'function') {
- // Good, we found a function to callback.
-
- var payload = reply.value[2].value;
- channelCallback(channelNameOrPattern, payload);
+ if (typeof channelCallback == "function") {
+ channelCallback(channelName, payload, channelPattern);
return true;
}
@@ -871,7 +853,7 @@ Client.prototype.maybeReconnect = function () {
// issue other commands, use a second client instance.
Client.prototype.subscribeTo = function (nameOrPattern, callback) {
- if (typeof this.channelCallbacks[nameOrPattern] === 'function')
+ if (typeof this.channelCallbacks[nameOrPattern] === 'function')
return;
if (typeof(callback) !== 'function')

0 comments on commit 555fa6b

Please sign in to comment.