Permalink
Browse files

Merge pull request #55 from joenoon/master

shutdown waits for all work to finish processing instead of just waiting for the queue to be drained before returning
  • Loading branch information...
2 parents 3c96d20 + 4a468dc commit 5f2416456141e288d8b5a44728da150819898ae4 @mperham committed Jul 16, 2012
Showing with 50 additions and 10 deletions.
  1. +24 −10 lib/girl_friday/work_queue.rb
  2. +26 −0 test/test_girl_friday_queue.rb
@@ -14,6 +14,7 @@ def initialize(name, options={}, &block)
@error_handlers = (Array(options[:error_handler] || ErrorHandler.default)).map(&:new)
@shutdown = false
+ @shutting_down = false
@busy_workers = []
@ready_workers = nil
@created_at = Time.now.to_i
@@ -76,6 +77,10 @@ def shutdown(&block)
@supervisor << Shutdown[block]
end
+ def working?
+ @busy_workers.size > 0
+ end
+
private
def running?
@@ -89,7 +94,7 @@ def handle_error(ex)
def on_ready(who)
@total_processed += 1
- if running? && work = @persister.pop
+ if !shutting_down? && running? && work = @persister.pop
who.this << work
drain
else
@@ -108,10 +113,13 @@ def shutdown_complete
end
end
+ def shutting_down?
+ !!@shutting_down
+ end
+
def on_work(work)
@total_queued += 1
-
- if running? && worker = ready_workers.pop
+ if !shutting_down? && running? && worker = ready_workers.pop
@busy_workers << worker
worker << work
drain
@@ -174,13 +182,19 @@ def supervisor_loop
on_work(work)
end
f.when(Shutdown) do |stop|
- @shutdown = true
- @when_shutdown = stop.callback
- @busy_workers.each { |w| w << stop }
- ready_workers.each { |w| w << stop }
- shutdown_complete
- GirlFriday.remove_queue @weakref
- return
+ @shutting_down = true
+ if !working?
+ @shutdown = true
+ @when_shutdown = stop.callback
+ @busy_workers.each { |w| w << stop }
+ ready_workers.each { |w| w << stop }
+ shutdown_complete
+ GirlFriday.remove_queue @weakref
+ return
+ else
+ Thread.pass
+ shutdown(&stop.callback)
+ end
end
f.when(Actor::DeadActorError) do |ex|
if running?
@@ -189,6 +189,32 @@ def test_should_allow_graceful_shutdown
end
end
+ def test_should_allow_in_progress_work_to_finish
+ mutex = Mutex.new
+ total = 8
+ count = 0
+ incr = Proc.new do
+ mutex.synchronize do
+ count += 1
+ end
+ end
+
+ async_test(10) do |cb|
+ queue = GirlFriday::WorkQueue.new('finish', :size => 10) do |msg|
+ sleep 1
+ incr.call
+ end
+ total.times do
+ queue.push(:text => 'foo')
+ end
+
+ GirlFriday.shutdown!
+ assert_equal total, queue.instance_variable_get("@total_processed")
+ assert_equal total, count
+ cb.call
+ end
+ end
+
def test_should_create_workers_lazily
async_test do |cb|
queue = GirlFriday::Queue.new('lazy', :size => 2) do |msg|

0 comments on commit 5f24164

Please sign in to comment.