Skip to content

Commit

Permalink
Use ObjectSpace instead of class variables and weakrefs. It's just too
Browse files Browse the repository at this point in the history
easy to lead to a memory leak.
  • Loading branch information
mperham committed Jul 14, 2011
1 parent 369a496 commit 10486aa
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
13 changes: 6 additions & 7 deletions lib/girl_friday.rb
@@ -1,5 +1,4 @@
require 'thread' require 'thread'
require 'weakref'
begin begin
# Rubinius # Rubinius
require 'actor' require 'actor'
Expand All @@ -13,16 +12,16 @@
require 'girl_friday/work_queue' require 'girl_friday/work_queue'
require 'girl_friday/error_handler' require 'girl_friday/error_handler'
require 'girl_friday/persistence' require 'girl_friday/persistence'
require 'girl_friday/batch'


module GirlFriday module GirlFriday


@@queues = []
def self.queues def self.queues
@@queues ObjectSpace.each_object(GirlFriday::WorkQueue).to_a
end end


def self.status def self.status
queues.delete_if { |q| !q.weakref_alive? }.inject({}) { |memo, queue| queue.weakref_alive? ? memo.merge(queue.status) : memo } queues.inject({}) { |memo, queue| memo.merge(queue.status) }
end end


## ##
Expand All @@ -36,14 +35,14 @@ def self.status
# #
# WeakRefs make this method full of race conditions with GC. :-( # WeakRefs make this method full of race conditions with GC. :-(
def self.shutdown!(timeout=30) def self.shutdown!(timeout=30)
queues.delete_if { |q| !q.weakref_alive? } qs = queues
count = queues.size count = qs.size


if count > 0 if count > 0
m = Mutex.new m = Mutex.new
var = ConditionVariable.new var = ConditionVariable.new


queues.each do |q| qs.each do |q|
q.shutdown do |queue| q.shutdown do |queue|
m.synchronize do m.synchronize do
count -= 1 count -= 1
Expand Down
2 changes: 1 addition & 1 deletion lib/girl_friday/work_queue.rb
Expand Up @@ -7,6 +7,7 @@ class WorkQueue


attr_reader :name attr_reader :name
def initialize(name, options={}, &block) def initialize(name, options={}, &block)
raise ArgumentError, "#{self.class.name} requires a block" unless block_given?
@name = name.to_s @name = name.to_s
@size = options[:size] || 5 @size = options[:size] || 5
@processor = block @processor = block
Expand All @@ -18,7 +19,6 @@ def initialize(name, options={}, &block)
@total_processed = @total_errors = @total_queued = 0 @total_processed = @total_errors = @total_queued = 0
@persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || [])) @persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || []))
start start
GirlFriday.queues << WeakRef.new(self)
end end


if defined?(Rails) && Rails.env.development? if defined?(Rails) && Rails.env.development?
Expand Down

0 comments on commit 10486aa

Please sign in to comment.