Skip to content

Commit

Permalink
Prefer weakrefs to ObjectSpace for tracking created queues for shutdown,
Browse files Browse the repository at this point in the history
to make JRuby happy.  Breaks on rbx.
  • Loading branch information
mperham committed May 3, 2011
1 parent fad3318 commit da3d14f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
18 changes: 13 additions & 5 deletions lib/girl_friday.rb
@@ -1,4 +1,5 @@
require 'thread'
require 'weakref'
begin
# Rubinius
require 'actor'
Expand All @@ -15,8 +16,13 @@

module GirlFriday

@@queues = []
def self.queues
@@queues
end

def self.status
ObjectSpace.each_object(WorkQueue).inject({}) { |memo, queue| memo.merge(queue.status) }
queues.delete_if { |q| !q.weakref_alive? }.inject({}) { |memo, queue| queue.weakref_alive? ? memo.merge(queue.status) : memo }
end

##
Expand All @@ -27,14 +33,16 @@ def self.status
#
# Note that shutdown! just works with existing queues. If you create a
# new queue, it will act as normal.
#
# WeakRefs make this method full of race conditions with GC. :-(
def self.shutdown!(timeout=30)
queues = []
ObjectSpace.each_object(WorkQueue).each { |q| queues << q }
queues.delete_if { |q| !q.weakref_alive? }
count = queues.size
m = Mutex.new
var = ConditionVariable.new

if count > 0
m = Mutex.new
var = ConditionVariable.new

queues.each do |q|
q.shutdown do |queue|
m.synchronize do
Expand Down
2 changes: 2 additions & 0 deletions lib/girl_friday/work_queue.rb
Expand Up @@ -5,6 +5,7 @@ class WorkQueue
Work = Struct.new(:msg, :callback)
Shutdown = Struct.new(:callback)


attr_reader :name
def initialize(name, options={}, &block)
@name = name.to_s
Expand All @@ -18,6 +19,7 @@ def initialize(name, options={}, &block)
@total_processed = @total_errors = @total_queued = 0
@persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || []))
start
GirlFriday.queues << WeakRef.new(self)
end

def push(work, &block)
Expand Down
3 changes: 2 additions & 1 deletion test/test_girl_friday.rb
Expand Up @@ -128,7 +128,8 @@ def test_should_allow_graceful_shutdown
queue.push(:text => 'foo')
end

GirlFriday.shutdown!
count = GirlFriday.shutdown!
assert_equal 0, count
s = queue.status
assert_equal 0, s['shutdown'][:busy]
assert_equal 2, s['shutdown'][:ready]
Expand Down

0 comments on commit da3d14f

Please sign in to comment.