Skip to content

Commit

Permalink
start / finish events are sent by the instrumenter
Browse files Browse the repository at this point in the history
  • Loading branch information
tenderlove committed Mar 21, 2012
1 parent 60736fe commit f08f875
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
42 changes: 38 additions & 4 deletions activesupport/lib/active_support/notifications/fanout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ def unsubscribe(subscriber)
@listeners_for.clear
end

def start(name, id, payload)
listeners_for(name).each { |s| s.start(name, id, payload) }
end

def finish(name, id, payload)
listeners_for(name).each { |s| s.finish(name, id, payload) }
end

def publish(name, *args)
listeners_for(name).each { |s| s.publish(name, *args) }
end
Expand All @@ -39,7 +47,7 @@ def wait
module Subscribers # :nodoc:
def self.new(pattern, block)
if pattern
Subscriber.new pattern, block
TimedSubscriber.new pattern, block
else
AllMessages.new pattern, block
end
Expand All @@ -51,8 +59,12 @@ def initialize(pattern, delegate)
@delegate = delegate
end

def publish(message, *args)
@delegate.call(message, *args)
def start(name, id, payload)
raise NotImplementedError
end

def finish(name, id, payload)
raise NotImplementedError
end

def subscribed_to?(name)
Expand All @@ -65,7 +77,29 @@ def matches?(subscriber_or_name)
end
end

class AllMessages < Subscriber # :nodoc:
class TimedSubscriber < Subscriber
def initialize(pattern, delegate)
@timestack = Hash.new { |h,id|
h[id] = Hash.new { |ids,name| ids[name] = [] }
}
super
end

def publish(name, *args)
@delegate.call name, *args
end

def start(name, id, payload)
@timestack[id][name].push Time.now
end

def finish(name, id, payload)
started = @timestack[id][name].pop
@delegate.call(name, started, Time.now, id, payload)
end
end

class AllMessages < TimedSubscriber # :nodoc:
def subscribed_to?(name)
true
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module ActiveSupport
module Notifications
# Instrumentors are stored in a thread local.
class Instrumenter
attr_reader :id

Expand All @@ -12,15 +13,14 @@ def initialize(notifier)
# and publish it. Notice that events get sent even if an error occurs
# in the passed-in block
def instrument(name, payload={})
started = Time.now

@notifier.start(name, @id, payload)
begin
yield
rescue Exception => e
payload[:exception] = [e.class.name, e.message]
raise e
ensure
@notifier.publish(name, started, Time.now, @id, payload)
@notifier.finish(name, @id, payload)
end
end

Expand Down

0 comments on commit f08f875

Please sign in to comment.