Skip to content

Commit

Permalink
move fanout back to a global variable, add a mutex for safety
Browse files Browse the repository at this point in the history
  • Loading branch information
tenderlove committed Jun 19, 2012
1 parent ceba010 commit bf8e205
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 33 deletions.
31 changes: 5 additions & 26 deletions activesupport/lib/active_support/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,9 @@ module ActiveSupport
# to log subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
class Registry # :nodoc:
def self.instance
Thread.current[name] ||= new
end

attr_reader :notifier, :instrumenter

def initialize
self.notifier = Fanout.new
end

def notifier=(notifier)
@notifier = notifier
@instrumenter = Instrumenter.new(notifier)
end
end

class << self
attr_accessor :notifier

def publish(name, *args)
notifier.publish(name, *args)
end
Expand Down Expand Up @@ -181,16 +166,10 @@ def unsubscribe(args)
end

def instrumenter
Registry.instance.instrumenter
end

def notifier
Registry.instance.notifier
end

def notifier=(notifier)
Registry.instance.notifier = notifier
Thread.current[:"instrumentation_#{notifier.object_id}"] ||= Instrumenter.new(notifier)
end
end

self.notifier = Fanout.new
end
end
24 changes: 17 additions & 7 deletions activesupport/lib/active_support/notifications/fanout.rb
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
require 'mutex_m'

module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
# Only one of these objects should instantiated per thread. Concurrent
# access to this class is not allowed.
# This class is thread safe. All methods are reentrant.
class Fanout
include Mutex_m

def initialize
@subscribers = []
@listeners_for = {}
super
end

def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
@subscribers << subscriber
@listeners_for.clear
synchronize do
@subscribers << subscriber
@listeners_for.clear
end
subscriber
end

def unsubscribe(subscriber)
@subscribers.reject! { |s| s.matches?(subscriber) }
@listeners_for.clear
synchronize do
@subscribers.reject! { |s| s.matches?(subscriber) }
@listeners_for.clear
end
end

def start(name, id, payload)
Expand All @@ -36,7 +44,9 @@ def publish(name, *args)
end

def listeners_for(name)
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
synchronize do
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
end
end

def listening?(name)
Expand Down

0 comments on commit bf8e205

Please sign in to comment.