Permalink
Browse files

Implement pattern pubsub with an idential API to normal pubsub

  • Loading branch information...
1 parent c8553a1 commit 0f4df0526e4d18a33303ce906c7ea39600831f99 @mloughran committed Sep 16, 2011
Showing with 214 additions and 10 deletions.
  1. +54 −9 lib/em-hiredis/pubsub_client.rb
  2. +160 −1 spec/pubsub_spec.rb
@@ -10,6 +10,7 @@ def initialize(host, port, password = nil, db = nil)
def connect
@sub_callbacks = Hash.new { |h, k| h[k] = [] }
+ @psub_callbacks = Hash.new { |h, k| h[k] = [] }
# Resubsubscribe to channels on reconnect
on(:reconnected) {
@@ -62,7 +63,7 @@ def unsubscribe_proc(channel, proc)
# Succeed deferrable immediately - no need to unsubscribe
df.succeed
else
- unsubscribe(channel).callback { |count|
+ unsubscribe(channel).callback { |_|
df.succeed
}
end
@@ -72,16 +73,57 @@ def unsubscribe_proc(channel, proc)
return df
end
- def psubscribe(channel)
- @psubs << channel
- raw_send_command(:punsubscribe, channel)
- return pubsub_deferrable(channel)
+ # Pattern subscribe to a pubsub channel
+ #
+ # If an optional proc / block is provided then it will be called (with the
+ # channel name and message) when a message is received on a matching
+ # channel
+ #
+ # @return [Deferrable] Redis psubscribe call
+ #
+ def psubscribe(pattern, proc = nil, &block)
+ if cb = proc || block
+ @psub_callbacks[pattern] << cb
+ end
+ @psubs << pattern
+ raw_send_command(:psubscribe, pattern)
+ return pubsub_deferrable(pattern)
end
- def punsubscribe(channel)
- @psubs.delete(channel)
- raw_send_command(:punsubscribe, channel)
- return pubsub_deferrable(channel)
+ # Pattern unsubscribe all callbacks for a given pattern
+ #
+ # @return [Deferrable] Redis punsubscribe call
+ #
+ def punsubscribe(pattern)
+ @psub_callbacks.delete(pattern)
+ @psubs.delete(pattern)
+ raw_send_command(:punsubscribe, pattern)
+ return pubsub_deferrable(pattern)
+ end
+
+ # Unsubscribe a given callback from a pattern. Will unsubscribe from redis
+ # if there are no remaining subscriptions on this pattern
+ #
+ # @return [Deferrable] Succeeds when the punsubscribe has completed or
+ # fails if callback could not be found. Note that success may happen
+ # immediately in the case that there are other callbacks for the same
+ # pattern (and therefore no punsubscription from redis is necessary)
+ #
+ def punsubscribe_proc(pattern, proc)
+ df = EM::DefaultDeferrable.new
+ if @psub_callbacks[pattern].delete(proc)
+ if @psub_callbacks[pattern].any?
+ # Succeed deferrable immediately - no need to punsubscribe
+ df.succeed
+ else
+ punsubscribe(pattern).callback { |_|
+ df.succeed
+ }
+ end
+ else
+ df.fail
+ end
+ return df
end
private
@@ -119,6 +161,9 @@ def handle_reply(reply)
# Arguments are channel, message payload
emit(:message, subscription, d1)
when :pmessage
+ if @psub_callbacks.has_key?(subscription)
+ @psub_callbacks[subscription].each { |cb| cb.call(d1, d2) }
+ end
# Arguments are original pattern, channel, message payload
emit(:pmessage, subscription, d1, d2)
else
View
@@ -1,6 +1,6 @@
require 'spec_helper'
-describe EventMachine::Hiredis::PubsubClient do
+describe EventMachine::Hiredis::PubsubClient, '(un)subscribe' do
describe "subscribing" do
it "should return deferrable which succeeds with subscribe call result" do
connect do |redis|
@@ -153,3 +153,162 @@
end
end
end
+
+describe EventMachine::Hiredis::PubsubClient, 'p(un)subscribe' do
+ describe "psubscribing" do
+ it "should return deferrable which succeeds with psubscribe call result" do
+ connect do |redis|
+ df = redis.pubsub.psubscribe("channel") { }
+ df.should be_kind_of(EventMachine::DefaultDeferrable)
+ df.callback { |subscription_count|
+ # Subscribe response from redis - indicates that subscription has
+ # succeeded and that the current connection has a single
+ # subscription
+ subscription_count.should == 1
+ done
+ }
+ end
+ end
+
+ it "should run the passed block when message received" do
+ connect do |redis|
+ redis.pubsub.psubscribe("channel:*") { |channel, message|
+ channel.should == 'channel:foo'
+ message.should == 'hello'
+ done
+ }.callback {
+ redis.publish('channel:foo', 'hello')
+ }
+ end
+ end
+
+ it "should run the passed proc when message received on channel" do
+ connect do |redis|
+ proc = Proc.new { |channel, message|
+ channel.should == 'channel:foo'
+ message.should == 'hello'
+ done
+ }
+ redis.pubsub.psubscribe("channel:*", proc).callback {
+ redis.publish('channel:foo', 'hello')
+ }
+ end
+ end
+ end
+
+ describe "punsubscribing" do
+ it "should allow punsubscribing a single callback without punsubscribing from redis" do
+ connect do |redis|
+ proc1 = Proc.new { |channel, message| fail }
+ proc2 = Proc.new { |channel, message|
+ channel.should == 'channel:foo'
+ message.should == 'hello'
+ done
+ }
+ redis.pubsub.psubscribe("channel:*", proc1)
+ redis.pubsub.psubscribe("channel:*", proc2).callback {
+ redis.pubsub.punsubscribe_proc("channel:*", proc1)
+ redis.publish("channel:foo", "hello")
+ }
+ end
+ end
+
+ it "should punsubscribe from redis on last proc punsubscription" do
+ connect do |redis|
+ proc = Proc.new { |message| }
+ redis.pubsub.psubscribe("channel:*", proc).callback { |subs_count|
+ subs_count.should == 1
+ redis.pubsub.punsubscribe_proc("channel:*", proc).callback {
+ # Slightly awkward way to check that unsubscribe happened:
+ redis.pubsub.psubscribe('channel2').callback { |count|
+ # If count is 1 this implies that channel unsubscribed
+ count.should == 1
+ done
+ }
+ }
+ }
+ end
+ end
+
+ it "should allow punsubscribing from redis channel, including all callbacks, and return deferrable for redis punsubscribe" do
+ connect do |redis|
+ # Raw pubsub event
+ redis.pubsub.on('pmessage') { |pattern, channel, message| fail }
+ # Block subscription
+ redis.pubsub.psubscribe("channel") { |c, m| fail } # block
+ # Proc example
+ df = redis.pubsub.psubscribe("channel", Proc.new { |c, m| fail })
+
+ df.callback {
+ redis.pubsub.punsubscribe("channel").callback { |remaining_subs|
+ remaining_subs.should == 0
+ redis.publish("channel", "hello") {
+ done
+ }
+ }
+ }
+ end
+ end
+ end
+
+ it "should expose raw pattern pubsub events from redis" do
+ callback_count = 0
+ connect do |redis|
+ redis.pubsub.on(:psubscribe) { |pattern, subscription_count|
+ # 2. Get subscribe callback
+ callback_count += 1
+ pattern.should == "channel:*"
+ subscription_count.should == 1
+
+ # 3. Publish on channel
+ redis.publish('channel:foo', 'foo')
+ }
+
+ redis.pubsub.on(:pmessage) { |pattern, channel, message|
+ # 4. Get message callback
+ callback_count += 1
+ pattern.should == 'channel:*'
+ channel.should == 'channel:foo'
+ message.should == 'foo'
+
+ callback_count.should == 2
+ done
+ }
+
+ # 1. Subscribe to channel
+ redis.pubsub.psubscribe('channel:*')
+ end
+ end
+
+ it "should resubscribe to all pattern subscriptions on reconnect" do
+ callback_count = 0
+ connect do |redis|
+ # 1. Subscribe to channels
+ redis.pubsub.psubscribe('foo:*') { |channel, message|
+ channel.should == 'foo:a'
+ message.should == 'hello foo'
+ callback_count += 1
+ }
+ redis.pubsub.psubscribe('bar:*') { |channel, message|
+ channel.should == 'bar:b'
+ message.should == 'hello bar'
+ callback_count += 1
+ EM.next_tick {
+ # 4. Success if both messages have been received
+ callback_count.should == 2
+ done
+ }
+ }.callback { |subscription_count|
+ subscription_count.should == 2
+ # 2. Subscriptions complete. Now force disconnect
+ redis.pubsub.instance_variable_get(:@connection).close_connection
+
+ EM.add_timer(0.1) {
+ # 3. After giving time to reconnect publish to both channels
+ redis.publish('foo:a', 'hello foo')
+ redis.publish('bar:b', 'hello bar')
+ }
+ }
+ end
+ end
+end

0 comments on commit 0f4df05

Please sign in to comment.