Permalink
Browse files

Revert "Create SyncListener. Since they do not rely on Thread, they c…

…an be used on Google App Engine."

Take a step back on this API direction.

This reverts commit 8104f65.
  • Loading branch information...
1 parent eeb1afa commit 6f7fc5824f2033c0f674b002dbee7f1c3f3384ac @jeremy jeremy committed Nov 25, 2009
Showing with 35 additions and 74 deletions.
  1. +34 −58 activesupport/lib/active_support/notifications.rb
  2. +1 −16 activesupport/test/notifications_test.rb
@@ -41,7 +41,7 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
- mattr_accessor :queue, :listener
+ mattr_accessor :queue
class << self
delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
@@ -54,13 +54,8 @@ def publisher
@publisher ||= Publisher.new(queue)
end
- def subscriber
- @subscriber ||= Subscriber.new(queue)
- end
-
- def subscribe(pattern=nil, options={}, &block)
- with = options[:with] || listener
- subscriber.bind(with, pattern).subscribe(&block)
+ def subscribe(pattern=nil, &block)
+ Subscriber.new(queue).bind(pattern).subscribe(&block)
end
end
@@ -109,14 +104,13 @@ def initialize(queue)
@queue = queue
end
- def bind(listener, pattern)
- @listener = listener
- @pattern = pattern
+ def bind(pattern)
+ @pattern = pattern
self
end
def subscribe
- @queue.subscribe(@listener, @pattern) do |*args|
+ @queue.subscribe(@pattern) do |*args|
yield(*args)
end
end
@@ -144,48 +138,6 @@ def parent_of?(event)
end
end
- class AsyncListener
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- @queue = Queue.new
- Thread.new { consume }
- end
-
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @queue << args.unshift(name)
- end
- end
-
- def consume
- while args = @queue.shift
- @subscriber.call(*args)
- end
- end
-
- def drained?
- @queue.size.zero?
- end
- end
-
- class SyncListener
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- end
-
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @subscriber.call(*args.unshift(name))
- end
- end
-
- def drained?
- true
- end
- end
-
# This is a default queue implementation that ships with Notifications. It
# consumes events in a thread and publish them to all registered subscribers.
#
@@ -198,16 +150,40 @@ def publish(*args)
@listeners.each { |l| l.publish(*args) }
end
- def subscribe(listener, pattern=nil, &block)
- @listeners << listener.new(pattern, &block)
+ def subscribe(pattern=nil, &block)
+ @listeners << Listener.new(pattern, &block)
end
def drained?
@listeners.all? &:drained?
end
+
+ class Listener
+ def initialize(pattern, &block)
+ @pattern = pattern
+ @subscriber = block
+ @queue = Queue.new
+ Thread.new { consume }
+ end
+
+ def publish(name, *args)
+ if !@pattern || @pattern === name.to_s
+ @queue << args.unshift(name)
+ end
+ end
+
+ def consume
+ while args = @queue.shift
+ @subscriber.call(*args)
+ end
+ end
+
+ def drained?
+ @queue.size.zero?
+ end
+ end
end
end
- Notifications.queue = Notifications::LittleFanout.new
- Notifications.listener = Notifications::AsyncListener
+ Notifications.queue = Notifications::LittleFanout.new
end
@@ -176,21 +176,6 @@ def test_subscriber_with_pattern_as_regexp
assert_equal 1, @another.first.result
end
- def test_subscriber_allows_sync_listeners
- @another = []
- ActiveSupport::Notifications.subscribe(/cache/, :with => ActiveSupport::Notifications::SyncListener) do |*args|
- @another << ActiveSupport::Notifications::Event.new(*args)
- end
-
- Thread.expects(:new).never
- ActiveSupport::Notifications.instrument(:something){ 0 }
- ActiveSupport::Notifications.instrument(:cache){ 1 }
-
- assert_equal 1, @another.size
- assert_equal :cache, @another.first.name
- assert_equal 1, @another.first.result
- end
-
def test_with_several_consumers_and_several_events
@another = []
ActiveSupport::Notifications.subscribe do |*args|
@@ -216,6 +201,6 @@ def test_with_several_consumers_and_several_events
private
def drain
- sleep(0.05) until ActiveSupport::Notifications.queue.drained?
+ sleep(0.1) until ActiveSupport::Notifications.queue.drained?
end
end

0 comments on commit 6f7fc58

Please sign in to comment.