Permalink
Browse files

Ensure queues are shutdown and GC'd correctly.

I believe this will fix the issue of GirlFriday running out
of threads on Heroku.  Fixes GH-30.
  • Loading branch information...
1 parent 0488c35 commit 0c2cc10bda3e3e6d05f2cd6d638fe7a6086c5e51 @mperham committed Sep 21, 2011
View
@@ -45,14 +45,17 @@ In your Rails app, create a `config/initializers/girl_friday.rb` which defines y
VideoProcessor.transcode(msg)
end
+:size is the number of workers to spin up and defaults to 5. Keep in mind, ActiveRecord defaults to a connection pool size of 5 so if your workers are accessing the database you'll want to ensure that the connection pool is large enough by modifying `config/database.yml`.
+
+You can use a connection pool to share a set of Redis connections with
+other threads and GirlFriday queues using the `connection\_pool` gem:
+
require 'connection_pool'
- redis_pool = ConnectionPool.new(:size => 5, :timeout => 5){ Redis.new }
- CLEAN_FILTER_QUEUE =GirlFriday::WorkQueue.new(:clean_filter, :store => GirlFriday::Store::Redis, :store_config => [{ :redis => redis_pool}]) do |msg|
+ redis_pool = ConnectionPool.new(:size => 5, :timeout => 5) { Redis.new }
+ CLEAN_FILTER_QUEUE = GirlFriday::WorkQueue.new(:clean_filter, :store => GirlFriday::Store::Redis, :store_config => [{ :redis => redis_pool}]) do |msg|
Filter.clean(msg)
end
-:size is the number of workers to spin up and defaults to 5. Keep in mind, ActiveRecord defaults to a connection pool size of 5 so if your workers are accessing the database you'll want to ensure that the connection pool is large enough by modifying `config/database.yml`.
-
In your controller action or model, you can call `#push(msg)`
EMAIL_QUEUE.push(:email => @user.email, :name => @user.name)
@@ -63,6 +66,11 @@ Your message processing block should **not** access any instance data or variabl
You can call `GirlFriday::WorkQueue.immediate!` to process jobs immediately, which is helpful when testing. `GirlFriday::WorkQueue.queue!` will revert this & jobs will be processed by actors.
+Queues are not garbage collected until they are shutdown, even if you
+have no reference to them. Make sure you call `WorkQueue#shutdown` if you are
+dynamically creating them so you don't leak memory. `GirlFriday.shutdown!` will shut down all
+running queues in the process.
+
More Detail
--------------------
View
@@ -27,12 +27,24 @@ def self.add_queue(ref)
end
end
+ def self.remove_queue(ref)
+ @@lock.synchronize do
+ @queues.delete ref
+ end
+ end
+
def self.queues
@queues || []
end
def self.status
- queues.inject({}) { |memo, queue| queue.weakref_alive? ? memo.merge(queue.__getobj__.status) : memo }
+ queues.inject({}) do |memo, queue|
+ begin
+ memo.merge(queue.__getobj__.status)
+ rescue WeakRef::RefError
+ end
+ memo
+ end
end
##
@@ -53,7 +65,14 @@ def self.shutdown!(timeout=30)
qs.each do |q|
next if !q.weakref_alive?
- q.__getobj__.shutdown do |queue|
+ begin
+ q.__getobj__.shutdown do |queue|
+ m.synchronize do
+ count -= 1
+ var.signal if count == 0
+ end
+ end
+ rescue WeakRef::RefError
m.synchronize do
count -= 1
var.signal if count == 0
@@ -70,6 +89,9 @@ def self.shutdown!(timeout=30)
end
-at_exit do
- GirlFriday.shutdown!
+
+unless defined?($testing)
+ at_exit do
+ GirlFriday.shutdown!
+ end
end
@@ -1,48 +0,0 @@
-require 'thread'
-require 'timeout'
-
-class TimedQueue
- def initialize
- @que = []
- @waiting = []
- @mutex = Mutex.new
- @resource = ConditionVariable.new
- end
-
- def push(obj)
- @mutex.synchronize do
- @que.push obj
- @resource.signal
- end
- end
- alias << push
-
- def timed_pop(timeout=0.5)
- while true
- @mutex.synchronize do
- @waiting.delete(Thread.current)
- if @que.empty?
- @waiting.push Thread.current
- @resource.wait(@mutex, timeout)
- raise TimeoutError if @que.empty?
- else
- retval = @que.shift
- @resource.signal
- return retval
- end
- end
- end
- end
-
- def empty?
- @que.empty?
- end
-
- def clear
- @que.clear
- end
-
- def length
- @que.length
- end
-end
@@ -18,8 +18,9 @@ def initialize(name, options={}, &block)
@created_at = Time.now.to_i
@total_processed = @total_errors = @total_queued = 0
@persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || []))
+ @weakref = WeakRef.new(self)
start
- GirlFriday.add_queue WeakRef.new(self)
+ GirlFriday.add_queue @weakref
end
def self.immediate!
@@ -165,16 +166,18 @@ def supervisor_loop
Actor.receive do |f|
f.when(Ready) do |who|
on_ready(who)
- shutdown_complete and return if @shutdown && @busy_workers.size == 0
end
f.when(Work) do |work|
on_work(work)
end
f.when(Shutdown) do |stop|
@shutdown = true
@when_shutdown = stop.callback
+ @busy_workers.each { |w| w << stop }
ready_workers.each { |w| w << stop }
- shutdown_complete and return if @busy_workers.size == 0
+ shutdown_complete
+ GirlFriday.remove_queue @weakref
+ return
end
f.when(Actor::DeadActorError) do |ex|
if !@shutdown
View
@@ -1,19 +1,28 @@
$testing = true
+puts RUBY_DESCRIPTION
+
+at_exit do
+ if Thread.list.size > 1
+ Thread.list.each do |thread|
+ next if thread.status == 'run'
+ puts "WARNING: lingering threads found. All threads should be shutdown and garbage collected."
+ p [thread, thread['name']]
+# puts thread.backtrace.join("\n")
+ end
+ end
+end
# require 'simplecov'
# SimpleCov.start do
# add_filter "/actor.rb"
# end
-# rbx is 1.8-mode for another month...
require 'rubygems'
require 'minitest/autorun'
require 'connection_pool'
require 'girl_friday'
require 'flexmock/minitest'
-puts RUBY_DESCRIPTION
-
class MiniTest::Unit::TestCase
def async_test(time=0.5)
View
@@ -3,7 +3,7 @@
class TestBatch < MiniTest::Unit::TestCase
def test_simple_batch_operation
- work = [1] * 10
+ work = [0.5] * 10
a = Time.now
batch = GirlFriday::Batch.new(work, :size => 10) do |msg|
sleep msg
@@ -14,9 +14,10 @@ def test_simple_batch_operation
assert_in_delta(0.0, (b - a), 0.1)
# asking for the results should block
- results = batch.results(2.0)
+ results = batch.results(1.0)
c = Time.now
- assert_in_delta(1.0, (c - b), 0.1)
+ assert_in_delta(0.5, (c - b), 0.1)
+
assert_equal 10, results.size
assert_kind_of Time, results[0]
end
@@ -33,5 +34,9 @@ def test_batch_timeout
assert_equal 'x', results[1]
assert_nil results[2]
assert_equal 'x', results[3]
+
+ # Necessary to work around a Ruby 1.9.2 bug
+ # http://redmine.ruby-lang.org/issues/5342
+ sleep 0.1
end
end
Oops, something went wrong. Retry.

0 comments on commit 0c2cc10

Please sign in to comment.