Skip to content

Commit

Permalink
Merge branch 'remote'
Browse files Browse the repository at this point in the history
* remote:
  move fanout back to a global variable, add a mutex for safety
  • Loading branch information
tenderlove committed Jun 19, 2012
2 parents 7896f35 + bf8e205 commit 056dbf4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 33 deletions.
31 changes: 5 additions & 26 deletions activesupport/lib/active_support/notifications.rb
Expand Up @@ -135,24 +135,9 @@ module ActiveSupport
# to log subscribers in a thread. You can use any queue implementation you want. # to log subscribers in a thread. You can use any queue implementation you want.
# #
module Notifications module Notifications
class Registry # :nodoc:
def self.instance
Thread.current[name] ||= new
end

attr_reader :notifier, :instrumenter

def initialize
self.notifier = Fanout.new
end

def notifier=(notifier)
@notifier = notifier
@instrumenter = Instrumenter.new(notifier)
end
end

class << self class << self
attr_accessor :notifier

def publish(name, *args) def publish(name, *args)
notifier.publish(name, *args) notifier.publish(name, *args)
end end
Expand Down Expand Up @@ -181,16 +166,10 @@ def unsubscribe(args)
end end


def instrumenter def instrumenter
Registry.instance.instrumenter Thread.current[:"instrumentation_#{notifier.object_id}"] ||= Instrumenter.new(notifier)
end

def notifier
Registry.instance.notifier
end

def notifier=(notifier)
Registry.instance.notifier = notifier
end end
end end

self.notifier = Fanout.new
end end
end end
24 changes: 17 additions & 7 deletions activesupport/lib/active_support/notifications/fanout.rb
@@ -1,26 +1,34 @@
require 'mutex_m'

module ActiveSupport module ActiveSupport
module Notifications module Notifications
# This is a default queue implementation that ships with Notifications. # This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers. # It just pushes events to all registered log subscribers.
# #
# Only one of these objects should instantiated per thread. Concurrent # This class is thread safe. All methods are reentrant.
# access to this class is not allowed.
class Fanout class Fanout
include Mutex_m

def initialize def initialize
@subscribers = [] @subscribers = []
@listeners_for = {} @listeners_for = {}
super
end end


def subscribe(pattern = nil, block = Proc.new) def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block subscriber = Subscribers.new pattern, block
@subscribers << subscriber synchronize do
@listeners_for.clear @subscribers << subscriber
@listeners_for.clear
end
subscriber subscriber
end end


def unsubscribe(subscriber) def unsubscribe(subscriber)
@subscribers.reject! { |s| s.matches?(subscriber) } synchronize do
@listeners_for.clear @subscribers.reject! { |s| s.matches?(subscriber) }
@listeners_for.clear
end
end end


def start(name, id, payload) def start(name, id, payload)
Expand All @@ -36,7 +44,9 @@ def publish(name, *args)
end end


def listeners_for(name) def listeners_for(name)
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } synchronize do
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
end
end end


def listening?(name) def listening?(name)
Expand Down

0 comments on commit 056dbf4

Please sign in to comment.