Permalink
Browse files

Notifications: synchronous fanout queue pushes events to subscribers …

…rather than having them concurrently pull
  • Loading branch information...
jeremy committed Nov 29, 2009
1 parent bb84cab commit 327545c3ae904d1a9c67de3e280c182ed6418023
Showing with 49 additions and 22 deletions.
  1. +39 −22 activesupport/lib/active_support/notifications/fanout.rb
  2. +10 −0 activesupport/test/notifications_test.rb
@@ -6,7 +6,8 @@ module Notifications
# consumes events in a thread and publish them to all registered subscribers.
#
class Fanout
- def initialize
+ def initialize(sync = false)
+ @subscriber_klass = sync ? Subscriber : AsyncSubscriber
@subscribers = []
end
@@ -15,7 +16,7 @@ def bind(pattern)
end
def subscribe(pattern = nil, &block)
- @subscribers << Subscriber.new(pattern, &block)
+ @subscribers << @subscriber_klass.new(pattern, &block)
end
def publish(*args)
@@ -29,54 +30,70 @@ def wait
# Used for internal implementation only.
class Binding #:nodoc:
def initialize(queue, pattern)
- @queue, @pattern = queue, pattern
+ @queue = queue
+ @pattern =
+ case pattern
+ when Regexp, NilClass
+ pattern
+ else
+ /^#{Regexp.escape(pattern.to_s)}/
+ end
end
def subscribe(&block)
@queue.subscribe(@pattern, &block)
end
end
- # Used for internal implementation only.
class Subscriber #:nodoc:
def initialize(pattern, &block)
- @pattern =
- case pattern
- when Regexp, NilClass
- pattern
- else
- /^#{Regexp.escape(pattern.to_s)}/
- end
+ @pattern = pattern
@block = block
- @events = Queue.new
- start_consumer
end
- def publish(name, *args)
- push(name, args) if matches?(name)
+ def publish(*args)
+ push(*args) if matches?(args.first)
end
- def consume
- while args = @events.shift
+ def drained?
+ true
+ end
+
+ private
+ def matches?(name)
+ !@pattern || @pattern =~ name.to_s
+ end
+
+ def push(*args)
@block.call(*args)
end
+ end
+
+ # Used for internal implementation only.
+ class AsyncSubscriber < Subscriber #:nodoc:
+ def initialize(pattern, &block)
+ super
+ @events = Queue.new
+ start_consumer
end
def drained?
- @events.size.zero?
+ @events.empty?
end
private
def start_consumer
Thread.new { consume }
end
- def matches?(name)
- !@pattern || @pattern =~ name.to_s
+ def consume
+ while args = @events.shift
+ @block.call(*args)
+ end
end
- def push(name, args)
- @events << args.unshift(name)
+ def push(*args)
+ @events << args
end
end
end
@@ -71,6 +71,16 @@ def event(*args)
end
end
+ class SyncPubSubTest < PubSubTest
+ def setup
+ Thread.abort_on_exception = true
+
+ @notifier = ActiveSupport::Notifications::Notifier.new(ActiveSupport::Notifications::Fanout.new(true))
+ @events = []
+ @notifier.subscribe { |*args| @events << event(*args) }
+ end
+ end
+
class InstrumentationTest < TestCase
def test_instrument_returns_block_result
assert_equal 2, @notifier.instrument(:awesome) { 1 + 1 }

0 comments on commit 327545c

Please sign in to comment.