Skip to content
This repository
Browse code

Detect is an incoming "reply" is in fact a pubsub message. If so, do …

…not pop the command queue.

This fixes an issue where the command queue gets popped prematurely by pubsub
messages, leading to callbacks for those commands not being invoked.

Close #360.

Signed-off-by: DTrejo <david.daniel.trejo@gmail.com>
  • Loading branch information...
commit 837cec36b642cf6ca5eace93bb59378d1c3e477e 1 parent f0ae664
Tom Leach authored December 31, 2012 DTrejo committed February 23, 2013

Showing 2 changed files with 49 additions and 3 deletions. Show diff stats Hide diff stats

  1. 14  index.js
  2. 38  test.js
14  index.js
@@ -562,8 +562,18 @@ function reply_to_strings(reply) {
562 562
 RedisClient.prototype.return_reply = function (reply) {
563 563
     var command_obj, len, type, timestamp, argindex, args, queue_len;
564 564
 
565  
-    command_obj = this.command_queue.shift(),
566  
-    queue_len   = this.command_queue.getLength();
  565
+    // If the "reply" here is actually a message received asynchronously due to a
  566
+    // pubsub subscription, don't pop the command queue as we'll only be consuming
  567
+    // the head command prematurely.
  568
+    if (Array.isArray(reply) && reply.length > 0 && reply[0]) {
  569
+        type = reply[0].toString();
  570
+    }
  571
+
  572
+    if (type !== 'message' && type !== 'pmessage') {
  573
+        command_obj = this.command_queue.shift();
  574
+    }
  575
+
  576
+    queue_len = this.command_queue.getLength();
567 577
 
568 578
     if (this.pub_sub_mode === false && queue_len === 0) {
569 579
         this.emit("idle");
38  test.js
@@ -16,6 +16,7 @@ var redis = require("./index"),
16 16
     ended = false,
17 17
     next, cur_start, run_next_test, all_tests, all_start, test_count;
18 18
 
  19
+
19 20
 // Set this to truthy to see the wire protocol and other debugging info
20 21
 redis.debug_mode = process.argv[2];
21 22
 
@@ -94,6 +95,18 @@ function last(name, fn) {
94 95
     };
95 96
 }
96 97
 
  98
+// Wraps the given callback in a timeout. If the returned function
  99
+// is not called within the timeout period, we fail the named test.
  100
+function with_timeout(name, cb, millis) {
  101
+    var timeoutId = setTimeout(function() {
  102
+        assert.fail("Callback timed out!", name);
  103
+    }, millis);
  104
+    return function() {
  105
+        clearTimeout(timeoutId);
  106
+        cb.apply(this, arguments);
  107
+    };
  108
+}
  109
+
97 110
 next = function next(name) {
98 111
     console.log(" \x1b[33m" + (Date.now() - cur_start) + "\x1b[0m ms");
99 112
     run_next_test();
@@ -720,10 +733,33 @@ tests.SUB_UNSUB_SUB = function () {
720 733
     client3.on('message', function (channel, message) {
721 734
         assert.strictEqual(channel, 'chan3');
722 735
         assert.strictEqual(message, 'foo');
  736
+        client3.removeAllListeners();
723 737
         next(name);
724 738
     });
725 739
 };
726 740
 
  741
+tests.SUB_UNSUB_MSG_SUB = function () {
  742
+    var name = "SUB_UNSUB_MSG_SUB";
  743
+    client3.subscribe('chan8');
  744
+    client3.subscribe('chan9');
  745
+    client3.unsubscribe('chan9');
  746
+    client2.publish('chan8', 'something');
  747
+    client3.subscribe('chan9', with_timeout(name, function (err, results) {
  748
+        next(name);
  749
+    }, 2000));
  750
+};
  751
+
  752
+tests.PSUB_UNSUB_PMSG_SUB = function () {
  753
+    var name = "PSUB_UNSUB_PMSG_SUB";
  754
+    client3.psubscribe('abc*');
  755
+    client3.subscribe('xyz');
  756
+    client3.unsubscribe('xyz');
  757
+    client2.publish('abcd', 'something');
  758
+    client3.subscribe('xyz', with_timeout(name, function (err, results) {
  759
+        next(name);
  760
+    }, 2000));
  761
+};
  762
+
727 763
 tests.SUBSCRIBE_QUIT = function () {
728 764
     var name = "SUBSCRIBE_QUIT";
729 765
     client3.on("end", function () {
@@ -764,7 +800,7 @@ tests.SUBSCRIBE_CLOSE_RESUBSCRIBE = function () {
764 800
             c2.quit();
765 801
             assert.fail("test failed");
766 802
         }
767  
-    })
  803
+    });
768 804
 
769 805
     c1.subscribe("chan1", "chan2");
770 806
 

0 notes on commit 837cec3

Please sign in to comment.
Something went wrong with that request. Please try again.